This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 77e90cb7d2ef3a626a55febc27deb1c2eb872b2a
Author: zhouxh <[email protected]>
AuthorDate: Wed Apr 18 22:12:40 2018 -0700

    GEODE-4624: add a stats to trace total event removed by PQRM
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  3 ++
 .../internal/cache/AbstractBucketRegionQueue.java  |  4 +++
 .../internal/cache/wan/AbstractGatewaySender.java  |  1 +
 .../internal/cache/wan/GatewaySenderStats.java     | 37 ++++++++++++++++++++--
 .../wan/parallel/ParallelQueueRemovalMessage.java  |  1 +
 .../SerialAsyncEventQueueImplJUnitTest.java        |  3 ++
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |  5 +++
 .../geode/internal/cache/wan/WANTestBase.java      |  1 +
 .../parallel/ParallelWANConflationDUnitTest.java   | 17 +++++++++-
 9 files changed, 69 insertions(+), 3 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..e74e0d3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -44,6 +44,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", 
"operations", false),
             f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the 
secondary event queue.",
                 "operations", false),
+            f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE,
+                "Total number of events processed by queue removal message.", 
"operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary 
events queue.",
                 "operations", false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -111,6 +113,7 @@ public class AsyncEventQueueStats extends 
GatewaySenderStats {
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
     eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    eventProcessedByQueueRemovalMessageId = 
type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = 
type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..755d60a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
     this.gatewaySenderStats.incSecondaryQueueSize(size);
   }
 
+  public void incEventProcessedByQueueRemovalMessage(int size) {
+    this.gatewaySenderStats.incEventProcessedByQueueRemovalMessage(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1afee67 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 
     statistics.setQueueSize(0);
     statistics.setSecondaryQueueSize(0);
+    statistics.setEventProcessedByQueueRemovalMessage(0);
     statistics.setTempQueueSize(0);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a6ba185 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -48,6 +48,9 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
   /** Name of the event secondary queue size statistic */
   protected static final String EVENT_SECONDARY_QUEUE_SIZE = 
"eventSecondaryQueueSize";
+  /** Total number of events processed by queue removal message statistic */
+  protected static final String EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE =
+      "eventProcessedByQueueRemovalMessage";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -108,6 +111,8 @@ public class GatewaySenderStats {
   protected static int eventQueueSizeId;
   /** Id of the event in secondary queue size statistic */
   protected static int eventSecondaryQueueSizeId;
+  /** Id of the event processed by queue removal message statistic */
+  protected static int eventProcessedByQueueRemovalMessageId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -174,6 +179,8 @@ public class GatewaySenderStats {
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", 
"operations", false),
             f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary 
event queue.",
                 "operations", false),
+            f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE,
+                "Total number of events processed by queue removal message.", 
"operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary 
events.", "operations",
                 false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -245,6 +252,7 @@ public class GatewaySenderStats {
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
     eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    eventProcessedByQueueRemovalMessageId = 
type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = 
type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -374,6 +382,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Returns the current value of the "eventProcessedByQueueRemovalMessage" 
stat.
+   *
+   * @return the current value of the "eventProcessedByQueueRemovalMessage" 
stat
+   */
+  public int getEventProcessedByQueueRemovalMessage() {
+    return this.stats.getInt(eventProcessedByQueueRemovalMessageId);
+  }
+
+  /**
    * Returns the current value of the "tempQueueSize" stat.
    *
    * @return the current value of the "tempQueueSize" stat.
@@ -487,6 +504,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Sets the "eventProcessedByQueueRemovalMessage" stat.
+   *
+   * @param size The total number of the events processed by queue removal 
message
+   */
+  public void setEventProcessedByQueueRemovalMessage(int size) {
+    this.stats.setInt(eventProcessedByQueueRemovalMessageId, size);
+  }
+
+  /**
    * Sets the "tempQueueSize" stat.
    *
    * @param size The size of the temp queue
@@ -508,7 +534,6 @@ public class GatewaySenderStats {
    */
   public void incSecondaryQueueSize() {
     this.stats.incInt(eventSecondaryQueueSizeId, 1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
   }
 
   /**
@@ -534,7 +559,15 @@ public class GatewaySenderStats {
    */
   public void incSecondaryQueueSize(int delta) {
     this.stats.incInt(eventSecondaryQueueSizeId, delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
+   * Increments the "eventProcessedByQueueRemovalMessage" stat by given delta.
+   *
+   * @param delta an integer by which events are processed by queue removal 
message
+   */
+  public void incEventProcessedByQueueRemovalMessage(int delta) {
+    this.stats.incInt(eventProcessedByQueueRemovalMessageId, delta);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..ca3d785 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -185,6 +185,7 @@ public class ParallelQueueRemovalMessage extends 
PooledDistributionMessage {
       brq.destroyKey(key);
       if (!brq.getBucketAdvisor().isPrimary()) {
         prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+        
prQ.getParallelGatewaySender().getStatistics().incEventProcessedByQueueRemovalMessage(1);
       }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", 
key, prQ.getName(),
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..a1976c7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
     queue.getStatistics().incQueueSize(5);
     queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
+    queue.getStatistics().incEventProcessedByQueueRemovalMessage(3);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
     assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(3, 
queue.getStatistics().getEventProcessedByQueueRemovalMessage());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
     assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(0, 
queue.getStatistics().getEventProcessedByQueueRemovalMessage());
   }
 
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..2e4797e 100644
--- 
a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends 
MBeanStatsTestCase {
     StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
     int notQueueEvents = 0;
     int notQueueToPrimary = 0;
+    int eventProcessedByQueueRemovalMessage = 0;
     for (StatisticDescriptor s : sds) {
       if (s.getName().equals("notQueuedEvent")) {
         notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends 
MBeanStatsTestCase {
       if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
         notQueueToPrimary++;
       }
+      if (s.getName().equals("eventProcessedByQueueRemovalMessage")) {
+        eventProcessedByQueueRemovalMessage++;
+      }
     }
     assertEquals(1, notQueueEvents);
     assertEquals(1, notQueueToPrimary);
+    assertEquals(1, eventProcessedByQueueRemovalMessage);
   }
 
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f989405..54d4306 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1202,6 +1202,7 @@ public class WANTestBase extends DistributedTestCase {
     stats.add(statistics.getEventsConflatedFromBatches());
     stats.add(statistics.getConflationIndexesMapSize());
     stats.add(statistics.getEventSecondaryQueueSize());
+    stats.add(statistics.getEventProcessedByQueueRemovalMessage());
     return stats;
   }
 
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 41cd89a..aac2997 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -205,6 +205,7 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> checkQueueSize("ln", 0));
     verifyEventSecondaryQueuesDrained("ln");
     validateEventSecondaryQueueSize(0, redundancy);
+    
validateEventProcessedByQueueRemovalMessage(expectedEventNumAfterConflation, 
redundancy);
   }
 
   private void validateEventSecondaryQueueSize(int expectedNum, int 
redundancy) {
@@ -216,11 +217,25 @@ public class ParallelWANConflationDUnitTest extends 
WANTestBase {
         (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
     ArrayList<Integer> v7List =
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
-    assertTrue("Event in secondary queue should be 100",
+    assertTrue("Event in secondary queue should be " + (expectedNum * 
redundancy),
         (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 
expectedNum
             * redundancy);
   }
 
+  private void validateEventProcessedByQueueRemovalMessage(int expectedNum, 
int redundancy) {
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
expectedNum));
+    assertTrue("Event processed by queue removal message should be " + 
(expectedNum * redundancy),
+        (v4List.get(11) + v5List.get(11) + v6List.get(11) + v7List.get(11)) == 
expectedNum
+            * redundancy);
+  }
+
   @Test
   public void testParallelPropagationConflationOfRandomKeys() throws Exception 
{
     initialSetUp();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to