This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 77e90cb7d2ef3a626a55febc27deb1c2eb872b2a Author: zhouxh <[email protected]> AuthorDate: Wed Apr 18 22:12:40 2018 -0700 GEODE-4624: add a stats to trace total event removed by PQRM --- .../asyncqueue/internal/AsyncEventQueueStats.java | 3 ++ .../internal/cache/AbstractBucketRegionQueue.java | 4 +++ .../internal/cache/wan/AbstractGatewaySender.java | 1 + .../internal/cache/wan/GatewaySenderStats.java | 37 ++++++++++++++++++++-- .../wan/parallel/ParallelQueueRemovalMessage.java | 1 + .../SerialAsyncEventQueueImplJUnitTest.java | 3 ++ .../bean/stats/AsyncEventQueueStatsJUnitTest.java | 5 +++ .../geode/internal/cache/wan/WANTestBase.java | 1 + .../parallel/ParallelWANConflationDUnitTest.java | 17 +++++++++- 9 files changed, 69 insertions(+), 3 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index 8d68cee..e74e0d3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -44,6 +44,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.", "operations", false), + f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, + "Total number of events processed by queue removal message.", "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -111,6 +113,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats { eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); + eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index 2406b18..755d60a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { this.gatewaySenderStats.incSecondaryQueueSize(size); } + public void incEventProcessedByQueueRemovalMessage(int size) { + this.gatewaySenderStats.incEventProcessedByQueueRemovalMessage(size); + } + public void incQueueSize() { this.gatewaySenderStats.incQueueSize(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 59547b2..1afee67 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi statistics.setQueueSize(0); statistics.setSecondaryQueueSize(0); + statistics.setEventProcessedByQueueRemovalMessage(0); statistics.setTempQueueSize(0); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java index 15ff18e..a6ba185 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java @@ -48,6 +48,9 @@ public class GatewaySenderStats { protected static final String EVENT_QUEUE_SIZE = "eventQueueSize"; /** Name of the event secondary queue size statistic */ protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize"; + /** Total number of events processed by queue removal message statistic */ + protected static final String EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE = + "eventProcessedByQueueRemovalMessage"; /** Name of the event temporary queue size statistic */ protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize"; /** Name of the events distributed statistic */ @@ -108,6 +111,8 @@ public class GatewaySenderStats { protected static int eventQueueSizeId; /** Id of the event in secondary queue size statistic */ protected static int eventSecondaryQueueSizeId; + /** Id of the event processed by queue removal message statistic */ + protected static int eventProcessedByQueueRemovalMessageId; /** Id of the temp event queue size statistic */ protected static int eventTmpQueueSizeId; /** Id of the events distributed statistic */ @@ -174,6 +179,8 @@ public class GatewaySenderStats { f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false), f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.", "operations", false), + f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, + "Total number of events processed by queue removal message.", "operations", false), f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations", false), f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, @@ -245,6 +252,7 @@ public class GatewaySenderStats { eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME); eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE); eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE); + eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE); eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE); eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED); eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD); @@ -374,6 +382,15 @@ public class GatewaySenderStats { } /** + * Returns the current value of the "eventProcessedByQueueRemovalMessage" stat. + * + * @return the current value of the "eventProcessedByQueueRemovalMessage" stat + */ + public int getEventProcessedByQueueRemovalMessage() { + return this.stats.getInt(eventProcessedByQueueRemovalMessageId); + } + + /** * Returns the current value of the "tempQueueSize" stat. * * @return the current value of the "tempQueueSize" stat. @@ -487,6 +504,15 @@ public class GatewaySenderStats { } /** + * Sets the "eventProcessedByQueueRemovalMessage" stat. + * + * @param size The total number of the events processed by queue removal message + */ + public void setEventProcessedByQueueRemovalMessage(int size) { + this.stats.setInt(eventProcessedByQueueRemovalMessageId, size); + } + + /** * Sets the "tempQueueSize" stat. * * @param size The size of the temp queue @@ -508,7 +534,6 @@ public class GatewaySenderStats { */ public void incSecondaryQueueSize() { this.stats.incInt(eventSecondaryQueueSizeId, 1); - assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; } /** @@ -534,7 +559,15 @@ public class GatewaySenderStats { */ public void incSecondaryQueueSize(int delta) { this.stats.incInt(eventSecondaryQueueSizeId, delta); - assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0; + } + + /** + * Increments the "eventProcessedByQueueRemovalMessage" stat by given delta. + * + * @param delta an integer by which events are processed by queue removal message + */ + public void incEventProcessedByQueueRemovalMessage(int delta) { + this.stats.incInt(eventProcessedByQueueRemovalMessageId, delta); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index df89e36..ca3d785 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -185,6 +185,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { brq.destroyKey(key); if (!brq.getBucketAdvisor().isPrimary()) { prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize(); + prQ.getParallelGatewaySender().getStatistics().incEventProcessedByQueueRemovalMessage(1); } if (isDebugEnabled) { logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(), diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java index 4c5caa2..a1976c7 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java @@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest { queue.getStatistics().incQueueSize(5); queue.getStatistics().incSecondaryQueueSize(6); queue.getStatistics().incTempQueueSize(10); + queue.getStatistics().incEventProcessedByQueueRemovalMessage(3); assertEquals(5, queue.getStatistics().getEventQueueSize()); assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(10, queue.getStatistics().getTempEventQueueSize()); + assertEquals(3, queue.getStatistics().getEventProcessedByQueueRemovalMessage()); queue.stop(); assertEquals(0, queue.getStatistics().getEventQueueSize()); assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize()); assertEquals(0, queue.getStatistics().getTempEventQueueSize()); + assertEquals(0, queue.getStatistics().getEventProcessedByQueueRemovalMessage()); } } diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java index 7c485be..2e4797e 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java @@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics(); int notQueueEvents = 0; int notQueueToPrimary = 0; + int eventProcessedByQueueRemovalMessage = 0; for (StatisticDescriptor s : sds) { if (s.getName().equals("notQueuedEvent")) { notQueueEvents++; @@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase { if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) { notQueueToPrimary++; } + if (s.getName().equals("eventProcessedByQueueRemovalMessage")) { + eventProcessedByQueueRemovalMessage++; + } } assertEquals(1, notQueueEvents); assertEquals(1, notQueueToPrimary); + assertEquals(1, eventProcessedByQueueRemovalMessage); } } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index f989405..54d4306 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1202,6 +1202,7 @@ public class WANTestBase extends DistributedTestCase { stats.add(statistics.getEventsConflatedFromBatches()); stats.add(statistics.getConflationIndexesMapSize()); stats.add(statistics.getEventSecondaryQueueSize()); + stats.add(statistics.getEventProcessedByQueueRemovalMessage()); return stats; } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java index 41cd89a..aac2997 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -205,6 +205,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> checkQueueSize("ln", 0)); verifyEventSecondaryQueuesDrained("ln"); validateEventSecondaryQueueSize(0, redundancy); + validateEventProcessedByQueueRemovalMessage(expectedEventNumAfterConflation, redundancy); } private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) { @@ -216,11 +217,25 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); ArrayList<Integer> v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); - assertTrue("Event in secondary queue should be 100", + assertTrue("Event in secondary queue should be " + (expectedNum * redundancy), (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum * redundancy); } + private void validateEventProcessedByQueueRemovalMessage(int expectedNum, int redundancy) { + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + assertTrue("Event processed by queue removal message should be " + (expectedNum * redundancy), + (v4List.get(11) + v5List.get(11) + v6List.get(11) + v7List.get(11)) == expectedNum + * redundancy); + } + @Test public void testParallelPropagationConflationOfRandomKeys() throws Exception { initialSetUp(); -- To stop receiving notification emails like this one, please contact [email protected].
