This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 2805dfa31c0e17b02117a2d9f19104244ab691e0 Author: zhouxh <[email protected]> AuthorDate: Wed Apr 18 22:12:40 2018 -0700 GEODE-4624: add a stats to trace total event removed by PQRM --- .../asyncqueue/internal/AsyncEventQueueStats.java | 3 ++ .../internal/cache/AbstractBucketRegionQueue.java | 4 +++ .../internal/cache/wan/AbstractGatewaySender.java | 1 + .../internal/cache/wan/GatewaySenderStats.java | 37 ++++++++++++++++++++-- .../wan/parallel/ParallelQueueRemovalMessage.java | 1 + .../SerialAsyncEventQueueImplJUnitTest.java | 3 ++ .../bean/stats/AsyncEventQueueStatsJUnitTest.java | 5 +++ 7 files changed, 52 insertions(+), 2 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index 8d68cee..c4d7baa 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -44,6 +44,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.", "operations", false), + f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, "Total number of events processed by queue removal message.", + "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -111,6 +113,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats { eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); + eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index 2406b18..891df46 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.incSecondaryQueueSize(size); } + public void incEventProcessedByQueueRemovalMessage(int size) { + this.gatewaySenderStats.incEventProcessedByQueueRemovalMessage(size); + } + public void incQueueSize() { this.gatewaySenderStats.incQueueSize(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 59547b2..1afee67 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi statistics.setQueueSize(0); statistics.setSecondaryQueueSize(0); + statistics.setEventProcessedByQueueRemovalMessage(0); statistics.setTempQueueSize(0); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index 15ff18e..a6ba185 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -48,6 +48,9 @@ public class GatewaySenderStats { protected static final String EVENT_QUEUE_SIZE = "eventQueueSize"; /** Name of the event secondary queue size statistic */ protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize"; + /** Total number of events processed by queue removal message statistic */ + protected static final String EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE = + "eventProcessedByQueueRemovalMessage"; /** Name of the event temporary queue size statistic */ protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize"; /** Name of the events distributed statistic */ @@ -108,6 +111,8 @@ public class GatewaySenderStats { protected static int eventQueueSizeId; /** Id of the event in secondary queue size statistic */ protected static int eventSecondaryQueueSizeId; + /** Id of the event processed by queue removal message statistic */ + protected static int eventProcessedByQueueRemovalMessageId; /** Id of the temp event queue size statistic */ protected static int eventTmpQueueSizeId; /** Id of the events distributed statistic */ @@ -174,6 +179,8 @@ public class GatewaySenderStats { f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.", "operations", false), + f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, + "Total number of events processed by queue removal message.", "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -245,6 +252,7 @@ public class GatewaySenderStats { eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); + eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); @@ -374,6 +382,15 @@ public class GatewaySenderStats { } /** + * Returns the current value of the "eventProcessedByQueueRemovalMessage" stat. + * + * @return the current value of the "eventProcessedByQueueRemovalMessage" stat + */ + public int getEventProcessedByQueueRemovalMessage() { + return this.stats.getInt(eventProcessedByQueueRemovalMessageId); + } + + /** * Returns the current value of the "tempQueueSize" stat. * * @return the current value of the "tempQueueSize" stat. @@ -487,6 +504,15 @@ public class GatewaySenderStats { } /** + * Sets the "eventProcessedByQueueRemovalMessage" stat. + * + * @param size The total number of the events processed by queue removal message + */ + public void setEventProcessedByQueueRemovalMessage(int size) { + this.stats.setInt(eventProcessedByQueueRemovalMessageId, size); + } + + /** * Sets the "tempQueueSize" stat. * * @param size The size of the temp queue @@ -508,7 +534,6 @@ public class GatewaySenderStats { */ public void incSecondaryQueueSize() { this.stats.incInt(eventSecondaryQueueSizeId, 1); - assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; } /** @@ -534,7 +559,15 @@ public class GatewaySenderStats { */ public void incSecondaryQueueSize(int delta) { this.stats.incInt(eventSecondaryQueueSizeId, delta); - assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** + * Increments the "eventProcessedByQueueRemovalMessage" stat by given delta. + * + * @param delta an integer by which events are processed by queue removal message + */ + public void incEventProcessedByQueueRemovalMessage(int delta) { + this.stats.incInt(eventProcessedByQueueRemovalMessageId, delta); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index df89e36..ca3d785 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -185,6 +185,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { brq.destroyKey(key); if (!brq.getBucketAdvisor().isPrimary()) { prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize(); + prQ.getParallelGatewaySender().getStatistics().incEventProcessedByQueueRemovalMessage(1); } if (isDebugEnabled) { logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(), diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java index 4c5caa2..a1976c7 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java @@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest { queue.getStatistics().incQueueSize(5); queue.getStatistics().incSecondaryQueueSize(6); queue.getStatistics().incTempQueueSize(10); + queue.getStatistics().incEventProcessedByQueueRemovalMessage(3); assertEquals(5, queue.getStatistics().getEventQueueSize()); assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(10, queue.getStatistics().getTempEventQueueSize()); + assertEquals(3, queue.getStatistics().getEventProcessedByQueueRemovalMessage()); queue.stop(); assertEquals(0, queue.getStatistics().getEventQueueSize()); assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(0, queue.getStatistics().getTempEventQueueSize()); + assertEquals(0, queue.getStatistics().getEventProcessedByQueueRemovalMessage()); } } diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java index 7c485be..1113d51 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java @@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics(); int notQueueEvents = 0; int notQueueToPrimary = 0; + int eventProcessedByQueueRemovalMessage = 0; for (StatisticDescriptor s : sds) { if (s.getName().equals("notQueuedEvent")) { notQueueEvents++; @@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) { notQueueToPrimary++; } + if (s.getName().equals("eventProcessedByQueueRemovalMessage")) { + eventProcessedByQueueRemovalMessage++; + } } assertEquals(1, notQueueEvents); assertEquals(1, notQueueToPrimary); + assertEquals(1, eventProcessedByQueueRemovalMessage); } } -- To stop receiving notification emails like this one, please contact [email protected].
