This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3cf97755a8cda26d90d563a82b196639805cdc37
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Mar 21 23:20:27 2018 -0700

    GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the 
processing of queueRemovals
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  5 ++
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +++++-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 62 ++++++++++++++++------
 .../internal/cache/wan/GatewaySenderStats.java     | 16 ++++++
 .../ConcurrentParallelGatewaySenderQueue.java      |  9 ++++
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 ++++++-
 .../geode/internal/cache/wan/WANTestBase.java      | 41 +++++++++++++-
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 38 ++++++++++---
 8 files changed, 179 insertions(+), 25 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..b8259a3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
                 "Number of events received but not added to the event queue 
because the queue already contains an event with the event's key.",
                 "operations"),
+            f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added 
to queue.", "events"),
+            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+                "Number of events not added to primary queue due to sender yet 
runing.", "events"),
             f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
                 "Number of events conflated from batches.", "operations"),
             f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends 
GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+    notQueuedEventsAtYetRunningPrimarySenderId =
+        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = 
type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..76c1e24 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     // If this gateway is not running, return
     if (!isRunning()) {
       if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender 
queue");
+        logger.debug("Returning back without putting into the gateway sender 
queue" + event);
+      }
+      if (this.eventProcessor != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
       }
       return;
     }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
         // The sender may have stopped, after we have checked the status in 
the beginning.
         if (!isRunning()) {
           if (isDebugEnabled) {
-            logger.debug("Returning back without putting into the gateway 
sender queue");
+            logger.debug("Returning back without putting into the gateway 
sender queue" + event);
+          }
+          if (this.eventProcessor != null) {
+            this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
           }
           return;
         }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+    AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+    return localProcessor == null ? 0 : 
localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean 
enqueuedAllTempQueueEvents) {
     this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..9fe0c22 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -31,25 +31,12 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.*;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.Conflatable;
-import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.EnumListenerEvent;
-import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.*;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
@@ -270,6 +257,51 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
     return this.queue.size();
   }
 
+  public int eventSecondaryQueueSize() {
+    if (queue == null) {
+      return 0;
+    }
+
+    // if parallel, get both primary and secondary queues' size, then 
substract primary queue's size
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+          - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+      return size;
+    }
+    return this.queue.size();
+  }
+
+  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
+    if (queue == null) {
+      return;
+    }
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue cpgsq = 
(ConcurrentParallelGatewaySenderQueue) queue;
+      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
+      if (prQ == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("shadow partitioned region " + 
event.getRegion().getFullPath()
+              + " is not created yet.");
+        }
+        return;
+      }
+      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) 
event);
+      long shadowKey = event.getTailKey();
+
+      ParallelGatewaySenderQueue pgsq =
+          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+      boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+      if (isPrimary) {
+        pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+        
this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
+        if (logger.isDebugEnabled()) {
+          logger.debug("register dropped event for primary queue. BucketId is 
" + bucketId
+              + ", shadowKey is " + shadowKey + ", prQ is " + 
prQ.getFullPath());
+        }
+      }
+    }
+  }
+
   /**
    * @return the sender
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..adaf928 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -84,6 +84,8 @@ public class GatewaySenderStats {
 
   protected static final String EVENTS_FILTERED = "eventsFiltered";
   protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent";
+  protected static final String 
NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER =
+      "notQueuedEventAtYetRunningPrimarySender";
 
   protected static final String LOAD_BALANCES_COMPLETED = 
"loadBalancesCompleted";
   protected static final String LOAD_BALANCES_IN_PROGRESS = 
"loadBalancesInProgress";
@@ -135,6 +137,8 @@ public class GatewaySenderStats {
   protected static int eventsFilteredId;
   /** Id of not queued events */
   protected static int notQueuedEventsId;
+  /** Id of not queued events due to the primary sender is yet running */
+  protected static int notQueuedEventsAtYetRunningPrimarySenderId;
   /** Id of events conflated in batch */
   protected static int eventsConflatedFromBatchesId;
   /** Id of load balances completed */
@@ -213,6 +217,8 @@ public class GatewaySenderStats {
             f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
                 "Current number of entries in the conflation indexes map.", 
"events"),
             f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added 
to queue.", "events"),
+            f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+                "Number of events not added to primary queue due to sender yet 
runing.", "events"),
             f.createIntCounter(EVENTS_FILTERED,
                 "Number of events filtered through GatewayEventFilter.", 
"events"),
             f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load 
balances completed",
@@ -249,6 +255,8 @@ public class GatewaySenderStats {
     unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
     conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
     notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+    notQueuedEventsAtYetRunningPrimarySenderId =
+        type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
     eventsFilteredId = type.nameToId(EVENTS_FILTERED);
     eventsConflatedFromBatchesId = 
type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
     loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
@@ -599,6 +607,14 @@ public class GatewaySenderStats {
     return this.stats.getInt(notQueuedEventsId);
   }
 
+  public void incEventsNotQueuedAtYetRunningPrimarySender() {
+    this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1);
+  }
+
+  public int getEventsNotQueuedAtYetRunningPrimarySender() {
+    return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId);
+  }
+
   public void incEventsFiltered() {
     this.stats.incInt(eventsFilteredId, 1);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 4fc940c..e556910 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue 
implements RegionQueue {
     return this.processors[0].getQueue().size();
   }
 
+  public String displayContent() {
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) 
(processors[0].getQueue());
+    return pgsq.displayContent();
+  }
+
   public int localSize() {
     return localSize(false);
   }
@@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue 
implements RegionQueue {
     return processors[index];
   }
 
+  public RegionQueue getQueueByBucket(int bucketId) {
+    return getPGSProcessor(bucketId).getQueue();
+  }
+
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) 
{
     return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 3aa8534..907a265 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   // This method may need synchronization in case it is used by
   // ConcurrentParallelGatewaySender
-  protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object 
key) {
+  public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) 
{
     StoppableReentrantLock lock = buckToDispatchLock;
     if (lock != null) {
       lock.lock();
@@ -1401,6 +1401,22 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
   }
 
+  public String displayContent() {
+    int size = 0;
+    StringBuffer sb = new StringBuffer();
+    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+      if (prQ != null && prQ.getDataStore() != null) {
+        Set<BucketRegion> allLocalBuckets = 
prQ.getDataStore().getAllLocalBucketRegions();
+        for (BucketRegion br : allLocalBuckets) {
+          if (br.size() > 0) {
+            sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
+          }
+        }
+      }
+    }
+    return sb.toString();
+  }
+
   public int localSize() {
     return localSize(false);
   }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 226595b..426ad36 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase {
     }
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    String logLevel = System.getProperty(LOG_LEVEL, "info");
+    props.setProperty(LOG_LEVEL, logLevel);
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
@@ -1155,6 +1157,21 @@ public class WANTestBase extends DistributedTestCase {
     return stats;
   }
 
+  public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
+    AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    ArrayList<Integer> stats = new ArrayList<Integer>();
+    int eventNotQueued = 
statistics.getEventsNotQueuedAtYetRunningPrimarySender();
+    if (eventNotQueued > 0) {
+      logger.info(
+          "Found " + eventNotQueued + " not queued events due to primary 
sender is yet running");
+    }
+    stats.add(eventNotQueued);
+    stats.add(statistics.getEventsNotQueued());
+    stats.add(statistics.getEventsNotQueuedConflated());
+    return stats;
+  }
+
   public static void checkQueueStats(String senderId, final int queueSize, 
final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -2746,7 +2763,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender) 
cache.getGatewaySender(id);
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
         .until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
     assertEquals(queueSize, sender.getEventQueueSize());
   }
@@ -3053,6 +3070,17 @@ public class WANTestBase extends DistributedTestCase {
     });
   }
 
+  public static String displayQueueContent(final RegionQueue queue) {
+    if (queue instanceof ParallelGatewaySenderQueue) {
+      ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
+      return pgsq.displayContent();
+    } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue pgsq = 
(ConcurrentParallelGatewaySenderQueue) queue;
+      return pgsq.displayContent();
+    }
+    return null;
+  }
+
   public static Integer getQueueContentSize(final String senderId) {
     return getQueueContentSize(senderId, false);
   }
@@ -3135,6 +3163,7 @@ public class WANTestBase extends DistributedTestCase {
           ((AbstractGatewaySender) sender).getQueues().toArray(new 
RegionQueue[1])[0];
       Set<BucketRegion> buckets = ((PartitionedRegion) 
regionQueue.getRegion()).getDataStore()
           .getAllLocalPrimaryBucketRegions();
+      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) 
sender;
       for (final BucketRegion bucket : buckets) {
         Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
           assertEquals("Expected bucket entries for bucket: " + bucket.getId()
@@ -3143,6 +3172,16 @@ public class WANTestBase extends DistributedTestCase {
               bucket.keySet().size());
         });
       } // for loop ends
+      assertEquals("Except events in all primary queues after drain is 0", 0,
+          abstractSender.getEventQueueSize());
+
+      Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+        assertEquals("Expected events in all secondary queues are drained but 
actual is "
+            + abstractSender.getEventSecondaryQueueSize() + ". Queue content 
is: "
+            + displayQueueContent(regionQueue), 0, 
abstractSender.getEventSecondaryQueueSize());
+      });
+      assertEquals("Except events in all secondary queues after drain is 0", 0,
+          abstractSender.getEventSecondaryQueueSize());
     } finally {
       exp.remove();
       exp1.remove();
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index eaef4f9..f5b98b7 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -20,6 +20,8 @@ import static 
org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.ArrayList;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -407,18 +409,42 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + 
"_PR", 200));
 
     // SECOND RUN: start async puts on region
-    AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + 
"_PR", 5000));
-
-    // when puts are happening by another thread, start the senders
-    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
-    async.join();
+    ArrayList<Integer> vm4List = null;
+    ArrayList<Integer> vm5List = null;
+    ArrayList<Integer> vm6List = null;
+    ArrayList<Integer> vm7List = null;
+    boolean foundDroppedAtYetStartedPrimarySender = false;
+    int count = 0;
+
+    do {
+      stopSenders();
+      AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() 
+ "_PR", 5000));
+
+      // when puts are happening by another thread, start the senders
+      startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+      async.join();
+      vm4List =
+          (ArrayList<Integer>) vm4.invoke(() -> 
WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm5List =
+          (ArrayList<Integer>) vm5.invoke(() -> 
WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm6List =
+          (ArrayList<Integer>) vm6.invoke(() -> 
WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      vm7List =
+          (ArrayList<Integer>) vm7.invoke(() -> 
WANTestBase.getSenderStatsForDroppedEvents("ln"));
+      if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 
0) {
+        foundDroppedAtYetStartedPrimarySender = true;
+      }
+      count++;
+    } while (foundDroppedAtYetStartedPrimarySender == false && count < 5);
+    assertThat(foundDroppedAtYetStartedPrimarySender);
 
     // verify all the buckets on all the sender nodes are drained
     validateParallelSenderQueueAllBucketsDrained();
 
     // verify that the queue size ultimately becomes zero. That means all the 
events propagate to
     // remote site.
+
     vm4.invoke(() -> validateQueueContents("ln", 0));
   }
 

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to