http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
new file mode 100644
index 0000000..47dee96
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class SerialWANPropagation_PartitionedRegionDUnitTest extends 
WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANPropagation_PartitionedRegionDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void testPartitionedSerialPropagation() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    //vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testBothReplicatedAndPartitionedSerialPropagation()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testSerialReplicatedAndPartitionedPropagation() throws Exception 
{
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
+        2, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "lnSerial" ));
+    vm5.invoke(() -> WANTestBase.startSender( "lnSerial" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testSerialReplicatedAndSerialPartitionedPropagation()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        2, false, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("lnSerial1", vm4, vm5);
+
+    startSenderInVMs("lnSerial2", vm5, vm6);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  @Test
+  public void testPartitionedSerialPropagationToTwoWanSites() throws Exception 
{
+
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort ));
+    Integer tkPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(3,lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    createCacheInVMs(tkPort, vm3);
+    vm3.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        3, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        3, false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    startSenderInVMs("lnSerial1", vm4, vm5);
+    startSenderInVMs("lnSerial2", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, 
isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, 
isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, 
isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, 
isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testPartitionedSerialPropagationHA() throws Exception {
+    IgnoredException.addIgnoredException("Broken pipe");
+    IgnoredException.addIgnoredException("Connection reset");
+    IgnoredException.addIgnoredException("Unexpected IOException");
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //do initial 100 puts to create all the buckets
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+    
+    IgnoredException.addIgnoredException(CancelException.class.getName());
+    IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+    
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+    //start async puts
+    AsyncInvocation inv = vm5.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
+    //close the cache on vm4 in between the puts
+    vm4.invoke(() -> WANTestBase.killSender());
+
+    inv.join();
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testPartitionedSerialPropagationWithParallelThreads() throws 
Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    createReceiverInVMs(vm2, vm3);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+
+
+    vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( // TODO: eats exceptions
+        getTestMethodName() + "_PR", 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
new file mode 100644
index 0000000..f3b6765
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+
+@Category(DistributedTest.class)
+public class SerialWANPropagationsFeatureDUnitTest extends WANTestBase{
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANPropagationsFeatureDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void testSerialReplicatedWanWithOverflow() {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 10, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 10, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+    vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, 
getTestMethodName() + "_RR"));
+    vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, 
getTestMethodName() + "_RR"));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doHeavyPuts(
+        getTestMethodName() + "_RR", 15 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 15, 240000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 15, 240000 ));
+  }
+
+  @Test
+  public void testSerialReplicatedWanWithPersistence() {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+
+  @Test
+  public void testReplicatedSerialPropagationWithConflation() throws Exception 
{
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 1000, true, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 1000, true, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+  }
+  
+  @Test
+  public void testReplicatedSerialPropagationWithParallelThreads()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doMultiThreadedPuts(
+        getTestMethodName() + "_RR", 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+  }
+  
+  @Test
+  public void testSerialPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2,lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 800 ));
+  }
+
+  @Test
+  public void testReplicatedSerialPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 800 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 800 ));
+  }
+  
+  @Test
+  public void testReplicatedSerialPropagationWithFilter_AfterAck()
+      throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm6, vm7);
+    createReceiverInVMs(vm6, vm7);
+
+    createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, 
false,
+            new MyGatewayEventFilter_AfterAck(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, 
false,
+            new MyGatewayEventFilter_AfterAck(), true ));
+
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), null, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap() ));
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName(), "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+    vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+
+    Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( 
"ln"));
+    Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( 
"ln"));
+
+    assertEquals(2000, (vm4Acks + vm5Acks));
+
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
new file mode 100644
index 0000000..2019cac
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.jayway.awaitility.Awaitility;
+
+@Category(DistributedTest.class)
+public class SerialWANStatsDUnitTest extends WANTestBase {
+  
+  private static final long serialVersionUID = 1L;
+
+  private String testName;
+  
+  public SerialWANStatsDUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    this.testName = getTestMethodName();
+    addIgnoredException("java.net.ConnectException");
+    addIgnoredException("java.net.SocketException");
+    addIgnoredException("Unexpected IOException");
+  }
+  
+  @Test
+  public void testReplicatedSerialPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+
+    vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR", 1000 ));
+    
+    pause(2000);
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
+    
+    vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1000, 1000, 1000));
+    vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
+      100));
+    
+    vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1000, 0, 0));
+    vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
+      0));
+    
+  }
+  
+  @Test
+  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws 
Exception {
+        Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+        Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+       vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 
2,
+               false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY ));
+       vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 
2,
+               false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY ));
+
+       vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+               testName + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+               testName + "_RR", "ln", isOffHeap()  ));
+       vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+               testName + "_RR", "ln", isOffHeap()  ));
+       vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+               testName + "_RR", "ln", isOffHeap()  ));
+       vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+               testName + "_RR", "ln", isOffHeap()  ));
+
+       vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR",
+               1000 ));
+
+       vm2.invoke(() -> WANTestBase.validateRegionSize(
+               testName + "_RR", 1000 ));
+           
+       pause(2000);
+       vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 
));
+           
+       vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
+               0, 1000, 1000, 1000));
+       vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
+               100));
+           
+       vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
+               0, 1000, 0, 0));
+       vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
+               0));
+           
+         }
+  
+  @Test
+  public void testWANStatsTwoWanSites() throws Exception {
+
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    Integer tkPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    createCacheInVMs(tkPort, vm3);
+    vm3.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        3, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
+        3, false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("lnSerial1", vm4, vm5);
+    startSenderInVMs("lnSerial2", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR", 1000 ));
+    
+    pause(2000);
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
+    vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
+    
+    vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial1",
+      0, 1000, 1000, 1000));
+    vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial1",
+      100));
+    vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial2",
+      0, 1000, 1000, 1000));
+    vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial2",
+      100));
+    vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial1",
+      0, 1000, 0, 0));
+    vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial1",
+      0));
+    vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial2",
+      0, 1000, 0, 0));
+    vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial2",
+      0));
+    
+  }
+
+  @Test
+  public void testReplicatedSerialPropagationHA() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", null, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR", "ln", isOffHeap()  ));
+    
+    AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName 
+ "_RR", 10000 ));
+    pause(2000);
+    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender( "ln" 
));
+    Boolean isKilled = Boolean.FALSE;
+    try {
+      isKilled = (Boolean)inv2.getResult();
+    }
+    catch (Throwable e) {
+      fail("Unexpected exception while killing a sender");
+    }
+    AsyncInvocation inv3 = null; 
+    if(!isKilled){
+      inv3 = vm5.invokeAsync(() -> WANTestBase.killSender( "ln" ));
+      inv3.join();
+    }
+    inv1.join();
+    inv2.join();
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR", 10000 ));
+  
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(1000, 10000, 
10000 ));
+    
+    vm5.invoke(() -> WANTestBase.checkStats_Failover("ln", 10000));
+  }
+  
+  @Test
+  public void testReplicatedSerialPropagationUnprocessedEvents() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //these are part of remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //these are part of local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //senders are created on local site
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 20, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 20, false, false, null, true ));
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", null, isOffHeap()  ));
+
+    //create another RR (RR_2) on remote site
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", null, isOffHeap()  ));
+    
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5);
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_2", "ln", isOffHeap()  ));
+    
+    //start puts in RR_1 in another thread
+    vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_1", 1000 ));
+    //do puts in RR_2 in main thread
+    vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_2", 500 ));
+    //sleep for some time to let all the events propagate to remote site
+    Thread.sleep(20);
+    //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR_1", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        testName + "_RR_2", 500 ));
+    
+    pause(2000);
+    vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1500, 1500, 1500));
+    vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
+      75));
+    vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0));
+    
+    
+    vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1500, 0, 0));
+    vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
+      0));
+    vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 1500));
+  }
+  
+  /**
+   * 
+   * Not Disabled - see ticket #52118
+   *
+   * NOTE: The test failure is avoided by having a larger number of puts 
operation so
+   * that WANTestBase.verifyRegionQueueNotEmpty("ln" )) is successful as there 
is a
+   * significant delay during the high number of puts.
+   *
+   * In future if this failure reappears, the put operations must be increase 
or a better fix must be found.
+   * 
+   * 1 region and sender configured on local site and 1 region and a 
+   * receiver configured on remote site. Puts to the local region are in 
progress.
+   * Remote region is destroyed in the middle.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws 
Exception {
+  int numEntries = 20000;
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //these are part of remote site
+    vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    //these are part of local site
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    //senders are created on local site
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 100, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 100, false, false, null, true ));
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", null, isOffHeap()  ));
+    //This is to cause a scenario where we have received at least X events and 
want to slow the receiver
+    vm2.invoke(() -> WANTestBase.longPauseAfterNumEvents(500, 200));
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5);
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        testName + "_RR_1", "ln", isOffHeap()  ));
+
+    //start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName 
+ "_RR_1", numEntries ));
+    //destroy RR_1 in remote site
+    vm2.invoke(() -> WANTestBase.destroyRegion( testName + "_RR_1", 500));
+
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    //assuming some events might have been dispatched before the remote region 
was destroyed,
+    //sender's region queue will have events less than 1000 but the queue will 
not be empty.
+    //NOTE: this much verification might be sufficient in DUnit. Hydra will 
take care of 
+    //more in depth validations.
+    vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" ));
+
+    //verify that all is well in local site. All the events should be present 
in local region
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      testName + "_RR_1", numEntries ));
+
+    //like a latch to guarantee at least one exception returned
+    vm4.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() 
-> WANTestBase.verifyQueueSize("ln", 0)));
+    
+    vm4.invoke(() -> WANTestBase.checkBatchStats("ln", true, true));
+    
+    vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", numEntries));
+    
+    vm2.invoke(() -> WANTestBase.checkExceptionStats(1));
+    
+  }
+  
+  @Test
+  public void testSerialPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2,lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, null, 1, 100, isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 800 ));
+    
+    pause(2000);
+    vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1000, 900, 800));
+    vm4.invoke(() -> WANTestBase.checkEventFilteredStats("ln",
+      200));
+    vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
+      80));
+    vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0));
+    
+    
+    vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 1000, 0, 0));
+    vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
+      0));
+    vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln",900));
+  }
+  
+  @Test
+  public void testSerialPropagationConflation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, true, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 0, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 0, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 0, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", 0, 100, isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, null,1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, null,1, 100, isOffHeap()  ));
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues ));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() ));
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues 
));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size()  + 
updateKeyValues.size() ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 0 ));
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues 
));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size()  + 
updateKeyValues.size() ));
+
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, keyValues.size() ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      testName, keyValues.size() ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionContents(
+        testName, keyValues ));
+    vm3.invoke(() -> WANTestBase.validateRegionContents(
+        testName, keyValues ));
+    
+    pause(2000);
+    vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
+      0, 2000, 2000, 1500));
+    vm4.invoke(() -> WANTestBase.checkConflatedStats("ln",
+      500));
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java
new file mode 100644
index 0000000..f9b46ef
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.wancommand;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.wan.*;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.management.ManagementService;
+import 
com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase;
+import com.gemstone.gemfire.test.dunit.*;
+
+import javax.management.remote.JMXConnectorServer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.assertEquals;
+import static com.gemstone.gemfire.test.dunit.Assert.fail;
+
+public abstract class WANCommandTestBase extends CliCommandTestBase {
+
+  static Cache cache;
+  private JMXConnectorServer jmxConnectorServer;
+  private ManagementService managementService;
+//  public String jmxHost;
+//  public int jmxPort;
+
+  static VM vm0;
+  static VM vm1;
+  static VM vm2;
+  static VM vm3;
+  static VM vm4;
+  static VM vm5;
+  static VM vm6;
+  static VM vm7;
+
+  @Override
+  public final void postSetUpCliCommandTestBase() throws Exception {
+    final Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm2 = host.getVM(2);
+    vm3 = host.getVM(3);
+    vm4 = host.getVM(4);
+    vm5 = host.getVM(5);
+    vm6 = host.getVM(6);
+    vm7 = host.getVM(7);
+    enableManagement();
+  }
+
+  public Integer createFirstLocatorWithDSId(int dsId) {
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+    props.setProperty(LOCATORS, "localhost[" + port + "]");
+    props.setProperty(START_LOCATOR, "localhost[" + port + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    return port;
+  }
+
+  public Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+    props.setProperty(LOCATORS, "localhost[" + port + "]");
+    props.setProperty(START_LOCATOR, "localhost[" + port + 
"],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+    getSystem(props);
+    return port;
+  }
+
+  public void createCache(Integer locPort){
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public void createCacheWithGroups(Integer locPort, String groups){
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    props.setProperty(GROUPS, groups);
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      GatewayEventFilter filter, boolean isManualStart) {
+    File persistentDirectory = new File(dsName 
+"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {persistentDirectory};
+    if(isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    }else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  public void startSender(String senderId){
+    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.start();
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public void pauseSender(String senderId){
+    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      GatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = s;
+          break;
+        }
+      }
+      sender.pause();
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public int createAndStartReceiver(int locPort) {
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+         receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + getName() + " failed to start GatewayReceiver");
+    }
+    return port;
+  }
+
+  public int createReceiver(int locPort) {
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND);
+    fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    return receiver.getPort();
+  }
+
+  public int createReceiverWithGroup(int locPort, String groups) {
+    Properties props =  getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+    props.setProperty(GROUPS, groups);
+
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND);
+    fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    return receiver.getPort();
+
+  }
+
+  public void startReceiver() {
+    try {
+      Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+      for (GatewayReceiver receiver : receivers) {
+        receiver.start();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + getName() + " failed to start GatewayReceiver");
+    }
+  }
+
+  public void stopReceiver() {
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      receiver.stop();
+    }
+  }
+
+  public int createAndStartReceiverWithGroup(int locPort, String groups) {
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+    props.setProperty(GROUPS, groups);
+
+    InternalDistributedSystem ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + getName() + " failed to start GatewayReceiver on port " + 
port);
+    }
+    return port;
+  }
+
+  public DistributedMember getMember(){
+    return cache.getDistributedSystem().getDistributedMember();
+  }
+
+
+  public int getLocatorPort(){
+    return Locator.getLocators().get(0).getPort();
+  }
+
+  /**
+   * Enable system property gemfire.disableManagement false in each VM.
+   */
+  public void enableManagement() {
+    Invoke.invokeInEveryVM(new SerializableRunnable("Enable Management") {
+      public void run() {
+        
System.setProperty(InternalDistributedSystem.DISABLE_MANAGEMENT_PROPERTY, 
"false");
+      }
+    });
+
+  }
+
+  public void verifySenderState(String senderId, boolean isRunning, boolean 
isPaused) {
+    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      AbstractGatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = (AbstractGatewaySender) s;
+          break;
+        }
+      }
+
+      assertEquals(isRunning, sender.isRunning());
+      assertEquals(isPaused, sender.isPaused());
+    } finally {
+      exln.remove();
+    }
+  }
+
+  public void verifySenderAttributes(String senderId, int remoteDsID,
+      boolean isParallel, boolean manualStart, int socketBufferSize,
+      int socketReadTimeout, boolean enableBatchConflation, int batchSize,
+      int batchTimeInterval, boolean enablePersistence,
+      boolean diskSynchronous, int maxQueueMemory, int alertThreshold,
+      int dispatcherThreads, OrderPolicy orderPolicy,
+      List<String> expectedGatewayEventFilters,
+      List<String> expectedGatewayTransportFilters) {
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    assertEquals("remoteDistributedSystemId", remoteDsID, sender
+        .getRemoteDSId());
+    assertEquals("isParallel", isParallel, sender.isParallel());
+    assertEquals("manualStart", manualStart, sender.isManualStart());
+    assertEquals("socketBufferSize", socketBufferSize, sender
+        .getSocketBufferSize());
+    assertEquals("socketReadTimeout", socketReadTimeout, sender
+        .getSocketReadTimeout());
+    assertEquals("enableBatchConflation", enableBatchConflation, sender
+        .isBatchConflationEnabled());
+    assertEquals("batchSize", batchSize, sender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval, sender
+        .getBatchTimeInterval());
+    assertEquals("enablePersistence", enablePersistence, sender
+        .isPersistenceEnabled());
+    assertEquals("diskSynchronous", diskSynchronous, 
sender.isDiskSynchronous());
+    assertEquals("maxQueueMemory", maxQueueMemory, sender
+        .getMaximumQueueMemory());
+    assertEquals("alertThreshold", alertThreshold, sender.getAlertThreshold());
+    assertEquals("dispatcherThreads", dispatcherThreads, sender
+        .getDispatcherThreads());
+    assertEquals("orderPolicy", orderPolicy, sender.getOrderPolicy());
+
+    // verify GatewayEventFilters
+    if (expectedGatewayEventFilters != null) {
+      assertEquals("gatewayEventFilters", expectedGatewayEventFilters.size(),
+          sender.getGatewayEventFilters().size());
+
+      List<GatewayEventFilter> actualGatewayEventFilters = sender
+          .getGatewayEventFilters();
+      List<String> actualEventFilterClassNames = new ArrayList<String>(
+          actualGatewayEventFilters.size());
+      for (GatewayEventFilter filter : actualGatewayEventFilters) {
+        actualEventFilterClassNames.add(filter.getClass().getName());
+      }
+
+      for (String expectedGatewayEventFilter : expectedGatewayEventFilters) {
+        if (!actualEventFilterClassNames.contains(expectedGatewayEventFilter)) 
{
+          fail("GatewayEventFilter " + expectedGatewayEventFilter
+              + " is not added to the GatewaySender");
+        }
+      }
+    }
+
+    // verify GatewayTransportFilters
+    if (expectedGatewayTransportFilters != null) {
+      assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
+          .size(), sender.getGatewayTransportFilters().size());
+      List<GatewayTransportFilter> actualGatewayTransportFilters = sender
+          .getGatewayTransportFilters();
+      List<String> actualTransportFilterClassNames = new ArrayList<String>(
+          actualGatewayTransportFilters.size());
+      for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
+        actualTransportFilterClassNames.add(filter.getClass().getName());
+      }
+
+      for (String expectedGatewayTransportFilter : 
expectedGatewayTransportFilters) {
+        if (!actualTransportFilterClassNames
+            .contains(expectedGatewayTransportFilter)) {
+          fail("GatewayTransportFilter " + expectedGatewayTransportFilter
+              + " is not added to the GatewaySender.");
+        }
+      }
+    }
+  }
+
+  public void verifyReceiverState(boolean isRunning) {
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      assertEquals(isRunning, receiver.isRunning());
+    }
+  }
+
+  public void verifyReceiverCreationWithAttributes(boolean isRunning,
+      int startPort, int endPort, String bindAddress, int maxTimeBetweenPings,
+      int socketBufferSize, List<String> expectedGatewayTransportFilters) {
+
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    assertEquals("Number of receivers is incorrect", 1, receivers.size());
+    for (GatewayReceiver receiver : receivers) {
+      assertEquals("isRunning", isRunning, receiver.isRunning());
+      assertEquals("startPort", startPort, receiver.getStartPort());
+      assertEquals("endPort", endPort, receiver.getEndPort());
+      assertEquals("bindAddress", bindAddress, receiver.getBindAddress());
+      assertEquals("maximumTimeBetweenPings", maxTimeBetweenPings, receiver
+          .getMaximumTimeBetweenPings());
+      assertEquals("socketBufferSize", socketBufferSize, receiver
+          .getSocketBufferSize());
+
+      // verify GatewayTransportFilters
+      if (expectedGatewayTransportFilters != null) {
+        assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
+            .size(), receiver.getGatewayTransportFilters().size());
+        List<GatewayTransportFilter> actualGatewayTransportFilters = receiver
+            .getGatewayTransportFilters();
+        List<String> actualTransportFilterClassNames = new ArrayList<String>(
+            actualGatewayTransportFilters.size());
+        for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
+          actualTransportFilterClassNames.add(filter.getClass().getName());
+        }
+
+        for (String expectedGatewayTransportFilter : 
expectedGatewayTransportFilters) {
+          if (!actualTransportFilterClassNames
+              .contains(expectedGatewayTransportFilter)) {
+            fail("GatewayTransportFilter " + expectedGatewayTransportFilter
+                + " is not added to the GatewayReceiver.");
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    closeCacheAndDisconnect();
+    vm0.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm1.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm2.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm3.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm4.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm5.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm6.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+    vm7.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
+  }
+
+  public static void closeCacheAndDisconnect() {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+}

Reply via email to