This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 10374d8 GEODE-8822: Clear statistics when clean queue option is used
(#5889)
10374d8 is described below
commit 10374d806b1a247474e5c78d170ee996532f8bb6
Author: Mario Ivanac <[email protected]>
AuthorDate: Fri Jan 15 09:22:24 2021 +0100
GEODE-8822: Clear statistics when clean queue option is used (#5889)
* GEODE-8822: Clear statistics when clean queue option is used
* GEODE-8822: added test
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 14 +++++
.../internal/beans/GatewaySenderMBeanBridge.java | 5 ++
.../beans/stats/GatewaySenderOverflowMonitor.java | 7 +++
...nderOverflowMBeanAttributesDistributedTest.java | 71 ++++++++++++++++++++++
4 files changed, 97 insertions(+)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index b7e2922..797494f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -487,6 +487,7 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
prQ = (PartitionedRegion) cache.getRegion(prQName);
if ((prQ != null) && (this.index == 0) && this.cleanQueues) {
+ cleanOverflowStats(cache);
prQ.destroyRegion(null);
prQ = null;
}
@@ -623,6 +624,19 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
}
}
+ private void cleanOverflowStats(Cache cache) {
+ ManagementService service = ManagementService.getManagementService(cache);
+ if (!this.asyncEvent) {
+ GatewaySenderMBean bean =
+ (GatewaySenderMBean)
service.getLocalGatewaySenderMXBean(this.sender.getId());
+ if (bean != null) {
+ bean.getBridge().clearOverflowStatistics();
+ }
+ }
+
+ }
+
+
/**
* This will be case when the sender is started again after stop operation.
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
index 0d38622..78bdaba 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -89,6 +89,11 @@ public class GatewaySenderMBeanBridge {
}
}
+ public void clearOverflowStatistics() {
+ overflowMonitor.stopListener();
+ overflowMonitor.clearCounters();
+ }
+
public void stopMonitor() {
monitor.stopListener();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
index fb38cd1..4cac084 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
@@ -177,6 +177,13 @@ public class GatewaySenderOverflowMonitor extends
MBeanStatsMonitor {
@Override
public void removeStatisticsFromMonitor(Statistics stats) {}
+ public void clearCounters() {
+ lruEvictions = 0;
+ bytesOverflowedToDisk = 0;
+ entriesOverflowedToDisk = 0;
+ bytesInUse = 0;
+ }
+
class GatewaySenderOverflowStatisticsListener implements StatisticsListener {
Map<String, Number> statsMap = new HashMap<>();
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
index f645749..0c70736 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
@@ -154,6 +154,59 @@ public class
GatewaySenderOverflowMBeanAttributesDistributedTest extends WANTest
vm4.invoke(() -> compareSerialOverflowStatsToMBeanAttributes(senderId));
}
+ @Test
+ @Parameters({"true", "false"})
+ public void testParallelGatewaySenderOverflowMBeanAttributesClear(boolean
createSenderFirst)
+ throws Exception {
+ // Start the locators
+ Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ // Create the cache
+ vm4.invoke(() -> createCache(lnPort));
+
+ String senderId = "ln";
+ if (createSenderFirst) {
+ // Create a gateway sender then a region (normal xml order)
+
+ // Create a gateway sender in paused state so it creates the queue, but
doesn't read any
+ // events from disk
+ vm4.invoke(() -> createSender(senderId, 2, true, 1, 10, false, false,
null, false));
+ vm4.invoke(() -> pauseSender(senderId));
+
+ // Create a partitioned region attached to the gateway sender
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), senderId,
1, 100, isOffHeap()));
+ } else {
+ // Create a partitioned region then a gateway sender
+
+ // Create a partitioned region attached to the gateway sender
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), senderId,
1, 100, isOffHeap()));
+
+ // Create a gateway sender in paused state so it creates the queue, but
doesn't read any
+ // events from disk
+ vm4.invoke(() -> createSender(senderId, 2, true, 1, 10, false, false,
null, false));
+ vm4.invoke(() -> pauseSender(senderId));
+ }
+
+ // Do some puts to cause overflow
+ int numPuts = 10;
+ vm4.invoke(() -> doHeavyPuts(getTestMethodName(), numPuts));
+
+ // Compare overflow stats to mbean attributes
+ vm4.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(senderId));
+
+ vm4.invoke(() -> stopSender(senderId));
+ vm4.invoke(() -> startSenderwithCleanQueues(senderId));
+
+ vm4.invoke(() -> checkParallelOverflowStatsAreZero(senderId));
+
+ // Check queue is clear
+ vm4.invoke(() -> checkQueueSize(senderId, 0));
+
+ // Compare overflow stats to mbean attributes
+ vm4.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(senderId));
+ }
+
private void compareParallelOverflowStatsToMBeanAttributes(String senderId)
throws Exception {
// Get disk region stats associated with the queue region
PartitionedRegion region =
@@ -216,4 +269,22 @@ public class
GatewaySenderOverflowMBeanAttributesDistributedTest extends WANTest
assertThat(ids.getStatSampler().waitForSample((60000))).isTrue();
}
}
+
+ private void checkParallelOverflowStatsAreZero(String senderId) throws
Exception {
+
+ // Get gateway sender mbean
+ ManagementService service = ManagementService.getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean(senderId);
+ assertThat(bean).isNotNull();
+
+ // Wait for the sampler to take a few samples
+ waitForSamplerToSample(5);
+
+ // Verify the bean attributes match the stat values
+ await().untilAsserted(() -> {
+ assertThat(bean.getEntriesOverflowedToDisk()).isEqualTo(0);
+ assertThat(bean.getBytesOverflowedToDisk()).isEqualTo(0);
+ assertThat(bean.getTotalQueueSizeBytesInUse()).isEqualTo(0);
+ });
+ }
}