This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 059cb7088fe390323570c6461a61bceec70bb670
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Feb 21 21:31:07 2018 -0800

    GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in 
secondary
                gateway sender queue
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |   3 +
 .../internal/cache/AbstractBucketRegionQueue.java  |   8 ++
 .../geode/internal/cache/AbstractRegionMap.java    |  12 ++
 .../apache/geode/internal/cache/BucketAdvisor.java |   2 +
 .../geode/internal/cache/BucketRegionQueue.java    |   2 +
 .../internal/cache/wan/AbstractGatewaySender.java  |   1 +
 .../internal/cache/wan/GatewaySenderStats.java     |  61 +++++++++
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   8 +-
 .../wan/parallel/ParallelQueueRemovalMessage.java  |   3 +
 .../SerialAsyncEventQueueImplJUnitTest.java        |   3 +
 .../cache/wan/AsyncEventQueueTestBase.java         |  18 ++-
 .../asyncqueue/AsyncEventListenerDUnitTest.java    |   8 +-
 .../asyncqueue/AsyncEventQueueStatsDUnitTest.java  |  45 ++++--
 .../ParallelQueueRemovalMessageJUnitTest.java      |  12 ++
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |   2 -
 .../geode/internal/cache/wan/WANTestBase.java      |  80 +++++++++--
 .../parallel/ParallelWANConflationDUnitTest.java   |  56 ++++++--
 .../wan/parallel/ParallelWANStatsDUnitTest.java    | 151 +++++++++++++++++++++
 .../serial/SerialGatewaySenderQueueDUnitTest.java  |  13 +-
 .../wan/serial/SerialWANConflationDUnitTest.java   |  73 +++++++++-
 .../wan/serial/SerialWANPropagationDUnitTest.java  |   1 +
 21 files changed, 517 insertions(+), 45 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 2f3029a..8d68cee 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing 
events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", 
"operations", false),
+            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the 
secondary event queue.",
+                "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary 
events queue.",
                 "operations", false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -108,6 +110,7 @@ public class AsyncEventQueueStats extends 
GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = 
type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 32dfb80..3607cda 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -220,6 +220,10 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
     this.gatewaySenderStats.decQueueSize(size);
   }
 
+  public void decSecondaryQueueSize(int size) {
+    this.gatewaySenderStats.decSecondaryQueueSize(size);
+  }
+
   public void decQueueSize() {
     this.gatewaySenderStats.decQueueSize();
   }
@@ -228,6 +232,10 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
     this.gatewaySenderStats.incQueueSize(size);
   }
 
+  public void incSecondaryQueueSize(int size) {
+    this.gatewaySenderStats.incSecondaryQueueSize(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 493410b..71ccf51 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -636,6 +636,10 @@ public abstract class AbstractRegionMap
             tombstones.put(tag, newRe);
           } else {
             _getOwner().updateSizeOnCreate(key, 
_getOwner().calculateRegionEntryValueSize(newRe));
+            if (_getOwner() instanceof BucketRegionQueue) {
+              BucketRegionQueue brq = (BucketRegionQueue) _getOwner();
+              brq.incSecondaryQueueSize(1);
+            }
           }
           incEntryCount(1);
           lruEntryUpdate(newRe);
@@ -661,6 +665,10 @@ public abstract class AbstractRegionMap
         } else {
           _getOwner().updateSizeOnCreate(re.getKey(),
               _getOwner().calculateRegionEntryValueSize(re));
+          if (_getOwner() instanceof BucketRegionQueue) {
+            BucketRegionQueue brq = (BucketRegionQueue) _getOwner();
+            brq.incSecondaryQueueSize(1);
+          }
         }
       }
       incEntryCount(size());
@@ -1044,6 +1052,10 @@ public abstract class AbstractRegionMap
           } finally {
             if (done && result) {
               initialImagePutEntry(newRe);
+              if (owner instanceof BucketRegionQueue) {
+                BucketRegionQueue brq = (BucketRegionQueue) owner;
+                brq.addToEventQueue(key, done, event);
+              }
             }
             if (!done) {
               removeEntry(key, newRe, false);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index bfe7472..b2c952b 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -314,6 +314,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
           if (b instanceof BucketRegionQueue) {
             BucketRegionQueue brq = (BucketRegionQueue) b;
             brq.decQueueSize(brq.size());
+            brq.incSecondaryQueueSize(brq.size());
           }
         }
       }
@@ -1191,6 +1192,7 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
           if (br instanceof BucketRegionQueue) { // Shouldn't it be 
AbstractBucketRegionQueue
             BucketRegionQueue brq = (BucketRegionQueue) br;
             brq.incQueueSize(brq.size());
+            brq.decSecondaryQueueSize(brq.size());
           }
           if (br != null && br instanceof BucketRegion) {
             ((BucketRegion) br).afterAcquiringPrimaryState();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 4e5451e..0baa204 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -449,6 +449,8 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     }
     if (this.getBucketAdvisor().isPrimary()) {
       incQueueSize(1);
+    } else {
+      incSecondaryQueueSize(1);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 034d810..b50f2e6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1101,6 +1101,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     }
 
     statistics.setQueueSize(0);
+    statistics.setSecondaryQueueSize(0);
     statistics.setTempQueueSize(0);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 2b93082..15ff18e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,6 +46,8 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
   /** Name of the event queue size statistic */
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
+  /** Name of the event secondary queue size statistic */
+  protected static final String EVENT_SECONDARY_QUEUE_SIZE = 
"eventSecondaryQueueSize";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -104,6 +106,8 @@ public class GatewaySenderStats {
   protected static int eventQueueTimeId;
   /** Id of the event queue size statistic */
   protected static int eventQueueSizeId;
+  /** Id of the event in secondary queue size statistic */
+  protected static int eventSecondaryQueueSizeId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -168,6 +172,8 @@ public class GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing 
events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", 
"operations", false),
+            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary 
event queue.",
+                "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary 
events.", "operations",
                 false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -238,6 +244,7 @@ public class GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = 
type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -358,6 +365,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Returns the current value of the "eventSecondaryQueueSize" stat.
+   *
+   * @return the current value of the "eventSecondaryQueueSize" stat
+   */
+  public int getEventSecondaryQueueSize() {
+    return this.stats.getInt(eventSecondaryQueueSizeId);
+  }
+
+  /**
    * Returns the current value of the "tempQueueSize" stat.
    *
    * @return the current value of the "tempQueueSize" stat.
@@ -462,6 +478,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Sets the "eventSecondaryQueueSize" stat.
+   *
+   * @param size The size of the secondary queue
+   */
+  public void setSecondaryQueueSize(int size) {
+    this.stats.setInt(eventSecondaryQueueSizeId, size);
+  }
+
+  /**
    * Sets the "tempQueueSize" stat.
    *
    * @param size The size of the temp queue
@@ -479,6 +504,14 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Increments the "eventSecondaryQueueSize" stat by 1.
+   */
+  public void incSecondaryQueueSize() {
+    this.stats.incInt(eventSecondaryQueueSizeId, 1);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Increments the "tempQueueSize" stat by 1.
    */
   public void incTempQueueSize() {
@@ -495,6 +528,16 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Increments the "eventSecondaryQueueSize" stat by given delta.
+   *
+   * @param delta an integer by which secondary queue size to be increased
+   */
+  public void incSecondaryQueueSize(int delta) {
+    this.stats.incInt(eventSecondaryQueueSizeId, delta);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Increments the "tempQueueSize" stat by given delta.
    *
    * @param delta an integer by which temp queue size to be increased
@@ -511,6 +554,14 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Decrements the "eventSecondaryQueueSize" stat by 1.
+   */
+  public void decSecondaryQueueSize() {
+    this.stats.incInt(eventSecondaryQueueSizeId, -1);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Decrements the "tempQueueSize" stat by 1.
    */
   public void decTempQueueSize() {
@@ -527,6 +578,16 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Decrements the "eventSecondaryQueueSize" stat by given delta.
+   *
+   * @param delta an integer by which secondary queue size to be increased
+   */
+  public void decSecondaryQueueSize(int delta) {
+    this.stats.incInt(eventSecondaryQueueSizeId, -delta);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Decrements the "tempQueueSize" stat by given delta.
    *
    * @param delta an integer by which temp queue size to be increased
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 89880fc..27d11a6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1401,10 +1401,6 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
   }
 
-  public int localSize() {
-    return localSize(false);
-  }
-
   public String displayContent() {
     int size = 0;
     StringBuffer sb = new StringBuffer();
@@ -1421,6 +1417,10 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return sb.toString();
   }
 
+  public int localSize() {
+    return localSize(false);
+  }
+
   public int localSize(boolean includeSecondary) {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 39fedbf..df89e36 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends 
PooledDistributionMessage {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
       brq.destroyKey(key);
+      if (!brq.getBucketAdvisor().isPrimary()) {
+        prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+      }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", 
key, prQ.getName(),
             brq.getId());
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index eb8ad01..4c5caa2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest {
     attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
     SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, 
attrs);
     queue.getStatistics().incQueueSize(5);
+    queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
+    assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 8366ca7..2074e9e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -718,19 +718,33 @@ public class AsyncEventQueueTestBase extends 
JUnit4DistributedTestCase {
   }
 
   public static void checkAsyncEventQueueStats(String queueId, final int 
queueSize,
-      final int eventsReceived, final int eventsQueued, final int 
eventsDistributed) {
+      int secondaryQueueSize, final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
     Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
     AsyncEventQueue queue = null;
+    boolean isParallel = false;
     for (AsyncEventQueue q : asyncQueues) {
+      isParallel = q.isParallel();
       if (q.getId().equals(queueId)) {
         queue = q;
         break;
       }
     }
     final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) 
queue).getStatistics();
-    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+    Awaitility.await().atMost(120, TimeUnit.SECONDS)
         .until(() -> assertEquals("Expected queue entries: " + queueSize + " 
but actual entries: "
             + statistics.getEventQueueSize(), queueSize, 
statistics.getEventQueueSize()));
+    if (isParallel) {
+      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        assertEquals(
+            "Expected events in the secondary queue is " + secondaryQueueSize 
+ ", but actual is "
+                + statistics.getEventSecondaryQueueSize(),
+            secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+      });
+    } else {
+      // for serial queue, evenvSecondaryQueueSize is not used
+      assertEquals(0, statistics.getEventSecondaryQueueSize());
+    }
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsQueued, statistics.getEventsQueued());
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index aa1db53..442ef9f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -1521,8 +1521,8 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     LogWriterUtils.getLogWriter().info("Primary buckets on vm2: " + 
primaryBucketsvm2);
 
     // before shutdown vm2, both vm1 and vm2 should have 40 events in primary 
queue
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 80, 80, 0));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 80, 80, 0));
+    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 40, 80, 80, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 40, 80, 80, 0));
 
     // ---------------------------- Kill vm2 --------------------------
     vm2.invoke(() -> AsyncEventQueueTestBase.killSender());
@@ -1549,8 +1549,8 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
         .info("After shutdown vm2, started vm3, Primary buckets on vm1: " + 
primaryBucketsvm1);
 
     // vm1.invoke(()->AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
80, 80, 80, 0));
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 80, 80, 0));
-    vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 0, 0, 0));
+    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 40, 80, 80, 0));
+    vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
40, 40, 0, 0, 0));
 
     vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
     vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
index 935a650..849b823 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -74,9 +74,10 @@ public class AsyncEventQueueStatsDUnitTest extends 
AsyncEventQueueTestBase {
                                                                                
      // sender
     Wait.pause(2000);// give some time for system to become stable
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 
1000, 1000, 1000));
     vm1.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 0, 1000, 0, 0));
     vm2.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0));
   }
 
@@ -119,19 +120,43 @@ public class AsyncEventQueueStatsDUnitTest extends 
AsyncEventQueueTestBase {
     vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue(
         getTestMethodName() + "_RR", "ln1,ln2", isOffHeap()));
 
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+
     vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + 
"_RR", 1000));
 
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 
0, 1000, 1000, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 
1000, 0, 1000, 0, 0));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 
1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 
1000));
     Wait.pause(2000);// give some time for system to become stable
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 
0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 
1000, 1000, 1000));
     vm1.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10));
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 
0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 
1000, 1000, 1000));
     vm1.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 
0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 
0, 0, 1000, 0, 0));
     vm2.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 
0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 
0, 0, 1000, 0, 0));
     vm2.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0));
   }
 
@@ -229,11 +254,12 @@ public class AsyncEventQueueStatsDUnitTest extends 
AsyncEventQueueTestBase {
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
1500));
 
     Wait.pause(2000);// give some time for system to become stable
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 1500, 1500, 1500));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 
1500, 1500, 1500));
     vm1.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0));
 
 
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 1500, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 0, 1500, 0, 0));
     vm2.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500));
   }
 
@@ -301,7 +327,8 @@ public class AsyncEventQueueStatsDUnitTest extends 
AsyncEventQueueTestBase {
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
1000));
 
     Wait.pause(2000);// give some time for system to become stable
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 
0, 2000, 2000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 
2000, 2000, 1000));
     vm1.invoke(() -> 
AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500));
   }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 5e0f704..d1ea59f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -65,6 +65,8 @@ import 
org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -81,6 +83,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
   private PartitionedRegion rootRegion;
   private BucketRegionQueue bucketRegionQueue;
   private BucketRegionQueueHelper bucketRegionQueueHelper;
+  private GatewaySenderStats stats;
 
   @Before
   public void setUpGemFire() {
@@ -116,6 +119,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
     when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
     when(this.sender.getQueues()).thenReturn(null);
     when(this.sender.getDispatcherThreads()).thenReturn(1);
+    stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln");
+    when(this.sender.getStatistics()).thenReturn(stats);
   }
 
   private void createRootRegion() {
@@ -183,6 +188,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Validate initial BucketRegionQueue state
     assertFalse(this.bucketRegionQueue.isInitialized());
     assertEquals(0, 
this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+    stats.setSecondaryQueueSize(1);
 
     // Create and process a ParallelQueueRemovalMessage (causes the 
failedBatchRemovalMessageKeys to
     // add a key)
@@ -190,6 +196,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
     assertEquals(1, 
this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+    // failed BatchRemovalMessage will not modify stats
+    assertEquals(1, stats.getEventSecondaryQueueSize());
   }
 
   @Test
@@ -201,6 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
+    assertEquals(1, stats.getEventSecondaryQueueSize());
 
     // Create and process a ParallelQueueRemovalMessage (causes the value of 
the entry to be set to
     // DESTROYED)
@@ -210,6 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Clean up destroyed tokens and validate BucketRegionQueue
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
     assertEquals(0, this.bucketRegionQueue.size());
+    assertEquals(0, stats.getEventSecondaryQueueSize());
   }
 
   @Test
@@ -247,6 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
+    assertEquals(1, stats.getEventSecondaryQueueSize());
 
     // Add a mock GatewaySenderEventImpl to the temp queue
     BlockingQueue<GatewaySenderEventImpl> tempQueue = 
createTempQueueAndAddEvent(processor, event);
@@ -259,6 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate temp queue is empty after processing 
ParallelQueueRemovalMessage
     assertEquals(0, tempQueue.size());
+    assertEquals(0, stats.getEventSecondaryQueueSize());
 
     // Clean up destroyed tokens
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index c4d0b7c..bdff063 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -53,8 +53,6 @@ public class AsyncEventQueueStatsJUnitTest extends 
MBeanStatsTestCase {
     sample();
 
     assertEquals(0, getEventQueueSize());
-
-
   }
 
   private int getEventQueueSize() {
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 3799083..6b3d2c5 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -124,9 +124,11 @@ import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.ServerLocation;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.CacheConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
@@ -142,6 +144,8 @@ import org.apache.geode.internal.cache.execute.data.Order;
 import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.execute.data.Shipment;
 import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import 
org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
@@ -932,9 +936,9 @@ public class WANTestBase extends DistributedTestCase {
       props.setProperty(JMX_MANAGER_HTTP_PORT, "0");
     }
     props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
     String logLevel = System.getProperty(LOG_LEVEL, "info");
     props.setProperty(LOG_LEVEL, logLevel);
+    props.setProperty(LOCATORS, "localhost[" + locPort + "]");
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
@@ -1132,12 +1136,57 @@ public class WANTestBase extends DistributedTestCase {
     return connectionInfo;
   }
 
+  public static void moveAllPrimaryBuckets(String senderId, final 
DistributedMember destination,
+      final String regionName) {
+
+    AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
+    final RegionQueue regionQueue;
+    regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+    if (sender.isParallel()) {
+      ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+          (ConcurrentParallelGatewaySenderQueue) regionQueue;
+      PartitionedRegion prQ =
+          parallelGatewaySenderQueue.getRegions().toArray(new 
PartitionedRegion[1])[0];
+
+      Set<Integer> primaryBucketIds = 
prQ.getDataStore().getAllLocalPrimaryBucketIds();
+      for (int bid : primaryBucketIds) {
+        movePrimary(destination, regionName, bid);
+      }
+
+      // double check after moved all primary buckets
+      primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+      assertTrue(primaryBucketIds.isEmpty());
+    }
+  }
+
+  public static void movePrimary(final DistributedMember destination, final 
String regionName,
+      final int bucketId) {
+    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+    BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
+        .send((InternalDistributedMember) destination, region, bucketId, true);
+    assertNotNull(response);
+    assertTrue(response.waitForResponse());
+  }
+
+  public static int getSecondaryQueueSizeInStats(String senderId) {
+    AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    return statistics.getEventSecondaryQueueSize();
+  }
+
   public static List<Integer> getSenderStats(String senderId, final int 
expectedQueueSize) {
     AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
     if (expectedQueueSize != -1) {
       final RegionQueue regionQueue;
       regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+      if (sender.isParallel()) {
+        ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+            (ConcurrentParallelGatewaySenderQueue) regionQueue;
+        PartitionedRegion pr =
+            parallelGatewaySenderQueue.getRegions().toArray(new 
PartitionedRegion[1])[0];
+      }
       Awaitility.await().atMost(120, TimeUnit.SECONDS)
           .until(() -> assertEquals("Expected queue entries: " + 
expectedQueueSize
               + " but actual entries: " + regionQueue.size(), 
expectedQueueSize,
@@ -1154,9 +1203,28 @@ public class WANTestBase extends DistributedTestCase {
     stats.add(statistics.getEventsNotQueuedConflated());
     stats.add(statistics.getEventsConflatedFromBatches());
     stats.add(statistics.getConflationIndexesMapSize());
+    stats.add(statistics.getEventSecondaryQueueSize());
     return stats;
   }
 
+  protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean 
isPrimary) {
+    int size = 0;
+    if (prQ != null) {
+      Set<Map.Entry<Integer, BucketRegion>> allBuckets = 
prQ.getDataStore().getAllLocalBuckets();
+      List<Integer> thisProcessorBuckets = new ArrayList<Integer>();
+
+      for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
+        BucketRegion bucket = bucketEntry.getValue();
+        int bId = bucket.getId();
+        if ((isPrimary && bucket.getBucketAdvisor().isPrimary())
+            || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) {
+          size += bucket.size();
+        }
+      }
+    }
+    return size;
+  }
+
   public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
     AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
@@ -3120,9 +3188,7 @@ public class WANTestBase extends DistributedTestCase {
     }
 
     if (!sender.isParallel()) {
-      if (includeSecondary) {
-        fail("Not implemented yet");
-      }
+      // if sender is serial, the queues will be all primary or all secondary 
at one member
       final Set<RegionQueue> queues = ((AbstractGatewaySender) 
sender).getQueues();
       int size = 0;
       for (RegionQueue q : queues) {
@@ -3137,11 +3203,7 @@ public class WANTestBase extends DistributedTestCase {
       } else if (regionQueue instanceof ParallelGatewaySenderQueue) {
         return ((ParallelGatewaySenderQueue) 
regionQueue).localSize(includeSecondary);
       } else {
-        if (includeSecondary) {
-          fail("Not Implemented yet");
-        }
-        regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new 
RegionQueue[1])[0];
-        return regionQueue.getRegion().size();
+        fail("Not implemented yet");
       }
     }
     fail("Not yet implemented?");
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 1613501..c9b968f 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -67,6 +67,11 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
 
     vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + 
updateKeyValues.size())));
 
+    vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
+
+    // Since no conflation, all updates are in queue
+    vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * 
updateKeyValues.size()));
+
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
     resumeSenders();
@@ -92,7 +97,7 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, 
true));
     vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, 
true));
 
-    createSenderPRs();
+    createSenderPRs(1);
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -109,24 +114,35 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
       vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
     }
 
+    // sender did not turn on conflation, so queue size will be 100 (otherwise 
it will be 20)
+    vm4.invoke(() -> checkQueueSize("ln", 100));
     vm4.invoke(() -> enableConflation("ln"));
     vm5.invoke(() -> enableConflation("ln"));
     vm6.invoke(() -> enableConflation("ln"));
     vm7.invoke(() -> enableConflation("ln"));
 
-    resumeSenders();
-
     ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
100));
     ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
100));
     ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
100));
     ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
100));
+    assertTrue("Event in secondary queue should be 100",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 
100);
+
+    resumeSenders();
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
 
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+    assertEquals("Event in secondary queue should be 0 after dispatched", 0,
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
@@ -161,12 +177,14 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 
updateKeyValues.size())); // creates
                                                                                
        // aren't
                                                                                
        // conflated
+    validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), 
redundancy);
 
     vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
 
-    vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 
updateKeyValues.size())); // creates
-                                                                               
        // aren't
-                                                                               
        // conflated
+    int expectedEventNumAfterConflation = keyValues.size() + 
updateKeyValues.size();
+    vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
+
+    validateEventSecondaryQueueSize(expectedEventNumAfterConflation, 
redundancy);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
@@ -174,6 +192,24 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
 
     keyValues.putAll(updateKeyValues);
     validateReceiverRegionSize(keyValues);
+
+    // after dispatch, both primary and secondary queues are empty
+    vm4.invoke(() -> checkQueueSize("ln", 0));
+    validateEventSecondaryQueueSize(0, redundancy);
+  }
+
+  private void validateEventSecondaryQueueSize(int expectedNum, int 
redundancy) {
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    assertTrue("Event in secondary queue should be 100",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 
expectedNum
+            * redundancy);
   }
 
   @Test
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index a54a67d..fb0f905 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
@@ -53,6 +54,156 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() 
throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(1);
+
+    startPausedSenders();
+
+    createReceiverPR(vm2, 1);
+    putKeyValues();
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue
+                                                                               
            // size
+    assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + 
v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+                                                                               
                // queued
+    assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+                                                                               
     // distributed
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + 
v7List.get(10)); // secondary
+                                                                               
                // queue
+                                                                               
                // size
+
+    // stop vm7 to trigger rebalance and move some primary buckets
+    System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" 
+ v5List.get(10)
+        + ":" + v6List.get(10) + ":" + v7List.get(10));
+    vm7.invoke(() -> WANTestBase.closeCache());
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+      int v4secondarySize = vm4.invoke(() -> 
WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      int v5secondarySize = vm5.invoke(() -> 
WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      int v6secondarySize = vm6.invoke(() -> 
WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + 
v6secondarySize); // secondary
+      // queue
+      // size
+    });
+    System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + 
v5List.get(10) + ":"
+        + v6List.get(10));
+
+    vm7.invoke(() -> WANTestBase.createCache(lnPort));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, 
false, null, true));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 
10, isOffHeap()));
+    startSenderInVMs("ln", vm7);
+    vm7.invoke(() -> pauseSender("ln"));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> 
WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> 
WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> 
WANTestBase.getSenderStats("ln", NUM_PUTS));
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + 
v7List.get(10)); // secondary
+    // queue
+    // size
+    System.out.println("After restart vm7, secondary queue sizes:" + 
v4List.get(10) + ":"
+        + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+
+    vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, 
NUM_PUTS));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+                                                                               
            // distributed
+    assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + 
v7List.get(10)); // secondary
+                                                                               
         // queue
+                                                                               
         // size
+  }
+
+  // TODO: add a test without redudency for primary switch
+  @Test
+  public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(1);
+
+    startPausedSenders();
+
+    createReceiverPR(vm2, 1);
+
+    putKeyValues();
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
NUM_PUTS));
+
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue
+                                                                               
            // size
+    assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + 
v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+                                                                               
                // queued
+    assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+                                                                               
     // distributed
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + 
v7List.get(10)); // secondary
+                                                                               
                // queue
+                                                                               
                // size
+
+    vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, 
NUM_PUTS));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+                                                                               
            // distributed
+    assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + 
v7List.get(10)); // secondary
+                                                                               
         // queue
+                                                                               
         // size
+  }
+
+  @Test
   public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws 
Exception {
     Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 63c715b..ec3ce4e 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -18,6 +18,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
@@ -46,6 +47,7 @@ import 
org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.VM;
@@ -103,7 +105,13 @@ public class SerialGatewaySenderQueueDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> WANTestBase.pauseSender("ln"));
 
     vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-    Wait.pause(5000);
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
1000));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
1000));
+    // secondary queue size stats in serial queue should be 0
+    assertEquals(0, v4List.get(10) + v5List.get(10));
+
     HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> 
WANTestBase.checkQueue());
     HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> 
WANTestBase.checkQueue());
     assertEquals(primarySenderUpdates, secondarySenderUpdates);
@@ -138,6 +146,9 @@ public class SerialGatewaySenderQueueDUnitTest extends 
WANTestBase {
     // removing all the keys.
     secondarySenderUpdates = (HashMap) vm5.invoke(() -> 
WANTestBase.checkQueue());
     assertEquals(secondarySenderUpdates.get("Destroy"), 
receiverUpdates.get("Create"));
+
+    vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
   }
 
   protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 7297179..091befd 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -44,10 +44,10 @@ public class SerialWANConflationDUnitTest extends 
WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, 
isOffHeap()));
-    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, 
isOffHeap()));
-    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, 
isOffHeap()));
-    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, 
isOffHeap()));
+    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
 
     vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, 
true));
     vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, 
true));
@@ -92,6 +92,71 @@ public class SerialWANConflationDUnitTest extends 
WANTestBase {
 
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+  }
+
+  @Test
+  public void testSerialPropagationPartitionRegionConflationDuringEnqueue() 
throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, 
isOffHeap()));
+
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, 
isOffHeap()));
+
+    vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, 
true));
+    vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, 
true));
+    vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, 
true));
+    vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, 
true));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> pauseSender("ln"));
+    vm5.invoke(() -> pauseSender("ln"));
+    vm6.invoke(() -> pauseSender("ln"));
+    vm7.invoke(() -> pauseSender("ln"));
+
+
+    final Map keyValues = new HashMap();
+
+    for (int i = 1; i <= 10; i++) {
+      for (int j = 1; j <= 10; j++) {
+        keyValues.put(j, i);
+      }
+      vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
20));
+    assertTrue("After conflation during enqueue, there should be only 20 
events",
+        v4List.get(0) == 20);
+
+    vm4.invoke(() -> resumeSender("ln"));
+    vm5.invoke(() -> resumeSender("ln"));
+    vm6.invoke(() -> resumeSender("ln"));
+    vm7.invoke(() -> resumeSender("ln"));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertTrue("No events in secondary queue stats since it's serial sender",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 
0);
+    assertTrue("Total queued events should be 100",
+        (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 
100);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
index e84fd89..87c90e0 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
@@ -163,6 +163,7 @@ public class SerialWANPropagationDUnitTest extends 
WANTestBase {
     IgnoredException.addIgnoredException(BatchException70.class.getName());
     
IgnoredException.addIgnoredException(ServerOperationException.class.getName());
     IgnoredException.addIgnoredException(IOException.class.getName());
+    
IgnoredException.addIgnoredException(java.net.SocketException.class.getName());
 
     vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000));
 

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to