http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
 
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
deleted file mode 100644
index 19edd7d..0000000
--- 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
+++ /dev/null
@@ -1,737 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.concurrent;
-
-import com.jayway.awaitility.Awaitility;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.cache.EntryExistsException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.BatchException70;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import 
com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-import java.net.SocketException;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test the functionality of ParallelGatewaySender with multiple dispatchers.
- *
- */
-@Category(DistributedTest.class)
-public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
-  
-  public ConcurrentParallelGatewaySenderDUnitTest() {
-    super();
-  }
-  
-  /**
-   * Normal happy scenario test case.
-   * checks that all the dispatchers have successfully 
-   * dispatched something individually.
-   * 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationConcurrentArtifacts() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    //before doing any puts, let the senders be running in order to ensure that
-    //not a single event will be lost
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    try {
-      // set the test hook to find out dispatched events by each of the
-      // concurrent dispatcher
-      vm4.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
-      vm5.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
-      vm6.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
-      vm7.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
-
-      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
-          1000 ));
-
-      // verify all buckets drained on all sender nodes.
-      vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-      vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-      vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-      vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-
-      vm2.invoke(() -> WANTestBase.validateRegionSize(
-          getTestMethodName() + "_PR", 1000 ));
-
-      int dispatched1 = (Integer)vm4.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
-      int dispatched2 = (Integer)vm5.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
-      int dispatched3 = (Integer)vm6.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
-      int dispatched4 = (Integer)vm7.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
-
-      assertEquals(1000, dispatched1 + dispatched2 + dispatched3 + 
dispatched4);
-    }
-    finally {
-      vm4.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
-      vm5.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
-      vm6.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
-      vm7.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
-    }
-  }
-  
-  /**
-   * Normal happy scenario test case.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagation() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    //before doing any puts, let the senders be running in order to ensure that
-    //not a single event will be lost
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
-        1000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName() + "_PR", 1000 ));
-  }
-  
-  
-  /**
-   * Normal happy scenario test case when bucket division tests boundary cases.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationWithUnEqualBucketDivision() throws 
Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    //before doing any puts, let the senders be running in order to ensure that
-    //not a single event will be lost
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
-        1000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName() + "_PR", 1000 ));
-  }
-  
-  
-  /**
-   * Initially the GatewayReceiver is not up but later it comes up.
-   * We verify that 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagation_withoutRemoteSite() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    //keep a larger batch to minimize number of exception occurrences in the 
log
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
-      1000 ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    createReceiverInVMs(vm2, vm3);
-
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    // Just making sure that though the remote site is started later,
-    // remote site is still able to get the data. Since the receivers are
-    // started before creating partition region it is quite possible that the
-    // region may loose some of the events. This needs to be handled by the 
code
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName() + "_PR", 1000 ));
-  }
-  
-  /**
-   * Testing for colocated region with orderPolicy Partition
-   */
-  @Test
-  public void testParallelPropagationColocatedPartitionedRegions() {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
-
-
-    vm4.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-    vm5.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-    vm6.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-    vm7.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-    vm3.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
-
-    //before doing any puts, let the senders be running in order to ensure that
-    //not a single event will be lost
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    vm4.invoke(() -> WANTestBase.putcolocatedPartitionedRegion( 1000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-      customerRegionName, 1000 ));
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-      orderRegionName, 1000 ));
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-      shipmentRegionName, 1000 ));
-  }
-  
-  /**
-   * Local and remote sites are up and running.
-   * Local site cache is closed and the site is built again.
-   * Puts are done to local site.
-   * Expected: Remote site should receive all the events put after the local
-   * site was built back.
-   * 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws 
Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    //before doing any puts, let the senders be running in order to ensure that
-    //not a single event will be lost
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
-      1000 ));
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-              getTestMethodName() + "_PR", 1000 ));
-    //-------------------Close and rebuild local site 
---------------------------------
-
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
-    
-    Integer regionSize = 
-      (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() 
+ "_PR" ));
-    LogWriterUtils.getLogWriter().info("Region size on remote is: " + 
regionSize);
-    
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
-    
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
-    
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
//------------------------------------------------------------------------------------
-    
-    IgnoredException.addIgnoredException(EntryExistsException.class.getName());
-    IgnoredException.addIgnoredException(BatchException70.class.getName());
-    
IgnoredException.addIgnoredException(ServerOperationException.class.getName());
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName() + "_PR", 10000 ));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 10000 ));
-  }
-  
-  /**
-   * Colocated regions using ConcurrentParallelGatewaySender.
-   * Normal scenario 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelColocatedPropagation() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
-
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName(), 1000 ));
-  }
-  
-  
-  /**
-   * Colocated regions using ConcurrentParallelGatewaySender.
-   * Normal scenario 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelColocatedPropagationOrderPolicyPartition() throws 
Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
-
-    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
-
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-        getTestMethodName(), 1000 ));
-  }
-  
-  @Test
-  public void testPartitionedParallelPropagationHA() throws Exception {
-    IgnoredException.addIgnoredException(SocketException.class.getName()); // 
for Connection reset
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 5000 ));
-    vm2.invoke(() -> Awaitility.await().atMost(30000, 
TimeUnit.MILLISECONDS).until(() ->
-      assertEquals("Failure in waiting for at least 10 events to be received 
by the receiver",
-        true, (getRegionSize(getTestMethodName() + "_PR") > 10 ))));
-    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
-    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 10000 ));
-    vm2.invoke(() -> Awaitility.await().atMost(30000, 
TimeUnit.MILLISECONDS).until(() ->
-      assertEquals("Failure in waiting for additional 2000 events to be 
received by the receiver ",
-        true,getRegionSize(getTestMethodName() + "_PR") > 7000 )));
-    AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
-    inv1.join();
-    inv2.join();
-    inv3.join();
-    inv4.join();
-
-    vm6.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 10000 ));
-    vm7.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 10000 ));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 10000 ));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 10000 ));
-    
-    //verify all buckets drained on the sender nodes that up and running.
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-  }
-  
-  @Test
-  public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender() {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2);
-    vm2.invoke(() -> WANTestBase.createReceiver());
-
-    createCacheInVMs(lnPort, vm3, vm4);
-
-    vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 0, 2, isOffHeap()));
-
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
-
-    startSenderInVMs("ln", vm3, vm4);
-
-    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
-        10 ));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(
-        getTestMethodName() + "_PR", 10 ));
-  }
-  
-  @Test
-  public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender_StartedLater() 
{
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort ));
-
-    vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort ));
-    vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort ));
-
-    vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 0, 2, isOffHeap()));
-
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
-    
-    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
-        10 ));
-
-    startSenderInVMsAsync("ln", vm3, vm4);
-
-    vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
-      40 ));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(
-        getTestMethodName() + "_PR", 40 ));
-  }
-
-  public static void setTestHook(String senderId, boolean hook) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
-    ConcurrentParallelGatewaySenderEventProcessor cProc = 
(ConcurrentParallelGatewaySenderEventProcessor)((AbstractGatewaySender)sender)
-        .getEventProcessor();
-    if (cProc == null) return;
-    cProc.TEST_HOOK = hook;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
 
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
deleted file mode 100644
index da8a631..0000000
--- 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.concurrent;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-@SuppressWarnings("serial")
-@Category(DistributedTest.class)
-public class ConcurrentParallelGatewaySenderOffHeapDUnitTest extends
-    ConcurrentParallelGatewaySenderDUnitTest {
-
-  public ConcurrentParallelGatewaySenderOffHeapDUnitTest() {
-    super();
-  }
-
-  @Override
-  public boolean isOffHeap() {
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
 
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
deleted file mode 100644
index ab1c06b..0000000
--- 
a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.concurrent;
-
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends 
WANTestBase {
-  private static final long serialVersionUID = 1L;
-  
-  public ConcurrentParallelGatewaySenderOperation_1_DUnitTest() {
-    super();
-  }
-
-  @Override
-  protected final void postSetUpWANTestBase() throws Exception {
-    IgnoredException.addIgnoredException("Broken pipe");
-    IgnoredException.addIgnoredException("Connection reset");
-    IgnoredException.addIgnoredException("Unexpected IOException");
-  }
-  
-  @Test
-  public void testParallelGatewaySenderWithoutStarting() {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    
-    vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
-    vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
-    vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
-    vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0 ));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0 ));
-  }
-  
-  /**
-   * Defect 44323 (ParallelGatewaySender should not be started on Accessor 
node)
-   */
-  @Test
-  public void testParallelGatewaySenderStartOnAccessorNode() {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
-        getTestMethodName() + "_PR", "ln", 1, 100 ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
-        getTestMethodName() + "_PR", "ln", 1, 100 ));
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    
-    //start the senders
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
-  }
-
-  
-  /**
-   * Normal scenario in which the sender is paused in between.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationSenderPause() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    //FIRST RUN: now, the senders are started. So, start the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
-    
-    //now, pause all of the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    Wait.pause(2000);
-    //SECOND RUN: keep one thread doing puts to the region
-    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
-    
-    //verify region size remains on remote vm and is restricted below a 
specified limit (i.e. number of puts in the first run)
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 ));
-  }
-
-  /**
-   * Normal scenario in which a paused sender is resumed.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationSenderResume() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    //now, the senders are started. So, start the puts
-    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
-    
-    //now, pause all of the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    
-    //sleep for a second or two
-    Wait.pause(2000);
-    
-    //resume the senders
-    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    
-    Wait.pause(2000);
-    
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    //find the region size on remote vm
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
- 
-  }
-  
-  /**
-   * Negative scenario in which a sender that is stopped (and not paused) is 
resumed.
-   * Expected: resume is only valid for pause. If a sender which is stopped is 
resumed,
-   * it will not be started again.
-   * 
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationSenderResumeNegativeScenario() throws 
Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(nyPort, vm4, vm5);
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5);
-
-    //wait till the senders are running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
-    //start the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
-
-    //let the queue drain completely
-    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 ));
-    
-    //stop the senders
-    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-    
-    //now, try to resume a stopped sender
-    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    
-    //do more puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    
-    //validate region size on remote vm to contain only the events put in 
local site 
-    //before the senders are stopped.
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 100 ));
-  }
-
-  /**
-   * Normal scenario in which a sender is stopped.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationSenderStop() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    //FIRST RUN: now, the senders are started. So, do some of the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
-    
-    //now, stop all of the senders
-    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
-    
-    //SECOND RUN: keep one thread doing puts
-    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
-    
-    //verify region size remains on remote vm and is restricted below a 
specified limit (number of puts in the first run)
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 ));
-  }
-
-  /**
-   * Normal scenario in which a sender is stopped and then started again.
-   */
-  @Test
-  public void testParallelPropagationSenderStartAfterStop() throws Throwable {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    //FIRST RUN: now, the senders are started. So, do some of the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
-    
-    //now, stop all of the senders
-    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
-    
-    Wait.pause(2000);
-
-    //SECOND RUN: do some of the puts after the senders are stopped
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    
-    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
-    
-    //start the senders again
-    AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    int START_TIMEOUT = 30000;
-    vm4start.getResult(START_TIMEOUT);
-    vm5start.getResult(START_TIMEOUT);
-    vm6start.getResult(START_TIMEOUT);
-    vm7start.getResult(START_TIMEOUT);
-
-    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
-
-    //SECOND RUN: do some more puts
-    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
-    async.join();
-    
-    //verify all the buckets on all the sender nodes are drained
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    //verify the events propagate to remote site
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
-    
-    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
-    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
-    vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
-    vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
-  }
-
-  /**
-   * Normal scenario in which a sender is stopped and then started again.
-   * Differs from above test case in the way that when the sender is starting 
from
-   * stopped state, puts are simultaneously happening on the region by another 
thread.
-   * @throws Exception
-   */
-  @Ignore("Bug47553")
-  @Test
-  public void testParallelPropagationSenderStartAfterStop_Scenario2() throws 
Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    //make sure all the senders are running before doing any puts
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    LogWriterUtils.getLogWriter().info("All the senders are now started");
-    
-    //FIRST RUN: now, the senders are started. So, do some of the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
-    
-    LogWriterUtils.getLogWriter().info("Done few puts");
-    
-    //now, stop all of the senders
-    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
-    
-    LogWriterUtils.getLogWriter().info("All the senders are stopped");
-    Wait.pause(2000);
-    
-    //SECOND RUN: do some of the puts after the senders are stopped
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    LogWriterUtils.getLogWriter().info("Done some more puts in second run");
-    
-    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
-    
-    //SECOND RUN: start async puts on region
-    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 5000 ));
-    LogWriterUtils.getLogWriter().info("Started high number of puts by async 
thread");
-
-    LogWriterUtils.getLogWriter().info("Starting the senders at the same 
time");
-    //when puts are happening by another thread, start the senders
-    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
-    LogWriterUtils.getLogWriter().info("All the senders are started");
-    
-    async.join();
-        
-    Wait.pause(2000);
-    
-    //verify all the buckets on all the sender nodes are drained
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    //verify that the queue size ultimately becomes zero. That means all the 
events propagate to remote site.
-    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 ));
-  }
-  
-  /**
-   * Normal scenario in which a sender is stopped and then started again on 
accessor node.
-   * @throws Exception
-   */
-  @Test
-  public void testParallelPropagationSenderStartAfterStopOnAccessorNode() 
throws Throwable {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
-        getTestMethodName() + "_PR", "ln", 1, 100));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
-        getTestMethodName() + "_PR", "ln", 1, 100));
-
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
-
-    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
-
-    createReceiverInVMs(vm2, vm3);
-
-    //make sure all the senders are not running on accessor nodes and running 
on non-accessor nodes
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    //FIRST RUN: now, the senders are started. So, do some of the puts
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
-
-    //now, stop all of the senders
-    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
-
-    //SECOND RUN: do some of the puts after the senders are stopped
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    
-    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
-    
-    //start the senders again
-    AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
-    vm4start.join();
-    vm5start.join();
-    vm6start.join();
-    vm7start.join();
-
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
-
-    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
-    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
-
-    //SECOND RUN: do some more puts
-    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
-    async.join();
-
-    //verify all buckets drained only on non-accessor nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    
-    //verify the events propagate to remote site
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
-  }
-
-  
-  /**
-   * Normal scenario in which a combinations of start, pause, resume operations
-   * is tested
-   */
-  @Test
-  public void testStartPauseResumeParallelGatewaySender() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
-    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-    createCacheInVMs(nyPort, vm2, vm3);
-    createReceiverInVMs(vm2, vm3);
-
-    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
-    LogWriterUtils.getLogWriter().info("Created cache on local site");
-    
-    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
-        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
-    
-    LogWriterUtils.getLogWriter().info("Created senders on local site");
-    
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
-    LogWriterUtils.getLogWriter().info("Created PRs on local site");
-    
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-    LogWriterUtils.getLogWriter().info("Created PRs on remote site");
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-    LogWriterUtils.getLogWriter().info("Done 1000 puts on local site");
-    
-    //Since puts are already done on userPR, it will have the buckets created. 
-    //During sender start, it will wait until those buckets are created for 
shadowPR as well.
-    //Start the senders in async threads, so colocation of shadowPR will be 
complete and 
-    //missing buckets will be created in 
PRHARedundancyProvider.createMissingBuckets().
-    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    
-    LogWriterUtils.getLogWriter().info("Started senders on local site");
-    
-    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 ));
-    LogWriterUtils.getLogWriter().info("Done 5000 puts on local site");
-    
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    LogWriterUtils.getLogWriter().info("Paused senders on local site");
-    
-    vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
-    vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
-    vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
-    vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
-    
-    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
-    LogWriterUtils.getLogWriter().info("Started 1000 async puts on local 
site");
-
-    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
-    LogWriterUtils.getLogWriter().info("Resumed senders on local site");
-
-    vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
-    vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
-    vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
-    vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
-
-    try {
-      inv1.join();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      fail("Interrupted the async invocation.");
-    }
-    
-    //verify all buckets drained on all sender nodes.
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 5000 ));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(
-      getTestMethodName() + "_PR", 5000 ));
-  }
-}

Reply via email to