This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 10374d8  GEODE-8822: Clear statistics when clean queue option is used 
(#5889)
10374d8 is described below

commit 10374d806b1a247474e5c78d170ee996532f8bb6
Author: Mario Ivanac <[email protected]>
AuthorDate: Fri Jan 15 09:22:24 2021 +0100

    GEODE-8822: Clear statistics when clean queue option is used (#5889)
    
    * GEODE-8822: Clear statistics when clean queue option is used
    
    * GEODE-8822: added test
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 14 +++++
 .../internal/beans/GatewaySenderMBeanBridge.java   |  5 ++
 .../beans/stats/GatewaySenderOverflowMonitor.java  |  7 +++
 ...nderOverflowMBeanAttributesDistributedTest.java | 71 ++++++++++++++++++++++
 4 files changed, 97 insertions(+)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index b7e2922..797494f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -487,6 +487,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       prQ = (PartitionedRegion) cache.getRegion(prQName);
 
       if ((prQ != null) && (this.index == 0) && this.cleanQueues) {
+        cleanOverflowStats(cache);
         prQ.destroyRegion(null);
         prQ = null;
       }
@@ -623,6 +624,19 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
   }
 
+  private void cleanOverflowStats(Cache cache) {
+    ManagementService service = ManagementService.getManagementService(cache);
+    if (!this.asyncEvent) {
+      GatewaySenderMBean bean =
+          (GatewaySenderMBean) 
service.getLocalGatewaySenderMXBean(this.sender.getId());
+      if (bean != null) {
+        bean.getBridge().clearOverflowStatistics();
+      }
+    }
+
+  }
+
+
   /**
    * This will be case when the sender is started again after stop operation.
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
index 0d38622..78bdaba 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -89,6 +89,11 @@ public class GatewaySenderMBeanBridge {
     }
   }
 
+  public void clearOverflowStatistics() {
+    overflowMonitor.stopListener();
+    overflowMonitor.clearCounters();
+  }
+
   public void stopMonitor() {
     monitor.stopListener();
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
index fb38cd1..4cac084 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
@@ -177,6 +177,13 @@ public class GatewaySenderOverflowMonitor extends 
MBeanStatsMonitor {
   @Override
   public void removeStatisticsFromMonitor(Statistics stats) {}
 
+  public void clearCounters() {
+    lruEvictions = 0;
+    bytesOverflowedToDisk = 0;
+    entriesOverflowedToDisk = 0;
+    bytesInUse = 0;
+  }
+
   class GatewaySenderOverflowStatisticsListener implements StatisticsListener {
     Map<String, Number> statsMap = new HashMap<>();
 
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
index f645749..0c70736 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java
@@ -154,6 +154,59 @@ public class 
GatewaySenderOverflowMBeanAttributesDistributedTest extends WANTest
     vm4.invoke(() -> compareSerialOverflowStatsToMBeanAttributes(senderId));
   }
 
+  @Test
+  @Parameters({"true", "false"})
+  public void testParallelGatewaySenderOverflowMBeanAttributesClear(boolean 
createSenderFirst)
+      throws Exception {
+    // Start the locators
+    Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    // Create the cache
+    vm4.invoke(() -> createCache(lnPort));
+
+    String senderId = "ln";
+    if (createSenderFirst) {
+      // Create a gateway sender then a region (normal xml order)
+
+      // Create a gateway sender in paused state so it creates the queue, but 
doesn't read any
+      // events from disk
+      vm4.invoke(() -> createSender(senderId, 2, true, 1, 10, false, false, 
null, false));
+      vm4.invoke(() -> pauseSender(senderId));
+
+      // Create a partitioned region attached to the gateway sender
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), senderId, 
1, 100, isOffHeap()));
+    } else {
+      // Create a partitioned region then a gateway sender
+
+      // Create a partitioned region attached to the gateway sender
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), senderId, 
1, 100, isOffHeap()));
+
+      // Create a gateway sender in paused state so it creates the queue, but 
doesn't read any
+      // events from disk
+      vm4.invoke(() -> createSender(senderId, 2, true, 1, 10, false, false, 
null, false));
+      vm4.invoke(() -> pauseSender(senderId));
+    }
+
+    // Do some puts to cause overflow
+    int numPuts = 10;
+    vm4.invoke(() -> doHeavyPuts(getTestMethodName(), numPuts));
+
+    // Compare overflow stats to mbean attributes
+    vm4.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(senderId));
+
+    vm4.invoke(() -> stopSender(senderId));
+    vm4.invoke(() -> startSenderwithCleanQueues(senderId));
+
+    vm4.invoke(() -> checkParallelOverflowStatsAreZero(senderId));
+
+    // Check queue is clear
+    vm4.invoke(() -> checkQueueSize(senderId, 0));
+
+    // Compare overflow stats to mbean attributes
+    vm4.invoke(() -> compareParallelOverflowStatsToMBeanAttributes(senderId));
+  }
+
   private void compareParallelOverflowStatsToMBeanAttributes(String senderId) 
throws Exception {
     // Get disk region stats associated with the queue region
     PartitionedRegion region =
@@ -216,4 +269,22 @@ public class 
GatewaySenderOverflowMBeanAttributesDistributedTest extends WANTest
       assertThat(ids.getStatSampler().waitForSample((60000))).isTrue();
     }
   }
+
+  private void checkParallelOverflowStatsAreZero(String senderId) throws 
Exception {
+
+    // Get gateway sender mbean
+    ManagementService service = ManagementService.getManagementService(cache);
+    GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean(senderId);
+    assertThat(bean).isNotNull();
+
+    // Wait for the sampler to take a few samples
+    waitForSamplerToSample(5);
+
+    // Verify the bean attributes match the stat values
+    await().untilAsserted(() -> {
+      assertThat(bean.getEntriesOverflowedToDisk()).isEqualTo(0);
+      assertThat(bean.getBytesOverflowedToDisk()).isEqualTo(0);
+      assertThat(bean.getTotalQueueSizeBytesInUse()).isEqualTo(0);
+    });
+  }
 }

Reply via email to