This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new be52507  GEODE-5314: Cleanup and document MBeanStatsMonitor child 
classes
be52507 is described below

commit be52507b73e46cf1b8578b23c4ea41ee40afc625
Author: Juan José Ramos <[email protected]>
AuthorDate: Fri Sep 21 18:50:50 2018 +0100

    GEODE-5314: Cleanup and document MBeanStatsMonitor child classes
    
    - Fixed minor warnings.
    - Added JUnit Tests for all modified classes.
    - There's only one thread updating the mutable values so it's been
      decided to keep volatiles instead of moving to atomics.
    - Documentation improved to better explain the thread-safety of the
      classes.
---
 .../beans/stats/AggregateRegionStatsMonitor.java   | 319 +++++++++---------
 .../internal/beans/stats/GCStatsMonitor.java       |  73 ++--
 .../beans/stats/GatewaySenderOverflowMonitor.java  | 168 ++++++----
 .../beans/stats/MemberLevelDiskMonitor.java        | 371 +++++++++++----------
 .../internal/beans/stats/VMStatsMonitor.java       | 114 ++++---
 .../stats/AggregateRegionStatsMonitorTest.java     | 244 ++++++++++++++
 .../internal/beans/stats/GCStatsMonitorTest.java   |  84 +++++
 .../stats/GatewaySenderOverflowMonitorTest.java    | 177 ++++++++++
 .../beans/stats/MemberLevelDiskMonitorTest.java    | 230 +++++++++++++
 .../internal/beans/stats/VMStatsMonitorTest.java   | 142 ++++++++
 10 files changed, 1445 insertions(+), 477 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitor.java
index 5a58a9f..d8de342 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitor.java
@@ -19,76 +19,141 @@ import java.util.Map;
 
 import org.apache.geode.StatisticDescriptor;
 import org.apache.geode.Statistics;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.statistics.StatisticId;
 import org.apache.geode.internal.statistics.StatisticNotFoundException;
 import org.apache.geode.internal.statistics.StatisticsListener;
 import org.apache.geode.internal.statistics.StatisticsNotification;
 import org.apache.geode.internal.statistics.ValueMonitor;
-import 
org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor.DefaultHashMap;
-
 
+/**
+ * This class acts as a monitor and listen for Region statistics updates on 
behalf of MemberMBean.
+ * <p>
+ * There's only one dedicated thread that wakes up at the
+ * {@link ConfigurationProperties#STATISTIC_SAMPLE_RATE} configured, samples 
all the statistics,
+ * writes them to the {@link ConfigurationProperties#STATISTIC_ARCHIVE_FILE} 
configured (if any) and
+ * notifies listeners of changes. The mutable fields are declared as {@code 
volatile} to make sure
+ * readers of the statistics get the latest recorded value.
+ * <p>
+ * This class is conditionally thread-safe, there can be multiple concurrent 
readers accessing a
+ * instance, but concurrent writers need to be synchronized externally.
+ *
+ * @see org.apache.geode.internal.statistics.HostStatSampler
+ * @see org.apache.geode.distributed.ConfigurationProperties
+ * @see org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor
+ */
 public class AggregateRegionStatsMonitor extends MBeanStatsMonitor {
-
-
-  private volatile int primaryBucketCount = 0;
-
+  private volatile long diskSpace = 0;
   private volatile int bucketCount = 0;
-
+  private volatile long lruDestroys = 0;
+  private volatile long lruEvictions = 0;
   private volatile int totalBucketSize = 0;
+  private volatile int primaryBucketCount = 0;
+  private final Map<Statistics, ValueMonitor> monitors;
+  private final Map<Statistics, MemberLevelRegionStatisticsListener> listeners;
 
-  private volatile long lruDestroys = 0;
+  public long getDiskSpace() {
+    return diskSpace;
+  }
 
-  private volatile long lruEvictions = 0;
+  public int getTotalBucketCount() {
+    return bucketCount;
+  }
 
-  private volatile long diskSpace = 0;
+  public long getLruDestroys() {
+    return lruDestroys;
+  }
 
+  public long getLruEvictions() {
+    return lruEvictions;
+  }
 
+  public int getTotalBucketSize() {
+    return totalBucketSize;
+  }
 
-  private Map<Statistics, ValueMonitor> monitors;
+  public int getTotalPrimaryBucketCount() {
+    return primaryBucketCount;
+  }
+
+  Map<Statistics, ValueMonitor> getMonitors() {
+    return monitors;
+  }
 
-  private Map<Statistics, MemberLevelRegionStatisticsListener> listeners;
+  Map<Statistics, MemberLevelRegionStatisticsListener> getListeners() {
+    return listeners;
+  }
 
   public AggregateRegionStatsMonitor(String name) {
     super(name);
-    monitors = new HashMap<Statistics, ValueMonitor>();
-    listeners = new HashMap<Statistics, MemberLevelRegionStatisticsListener>();
+    monitors = new HashMap<>();
+    listeners = new HashMap<>();
   }
 
-  @Override
-  public void addStatisticsToMonitor(Statistics stats) {
-    ValueMonitor regionMonitor = new ValueMonitor();
-    MemberLevelRegionStatisticsListener listener = new 
MemberLevelRegionStatisticsListener();
-    regionMonitor.addListener(listener);
-    regionMonitor.addStatistics(stats);
-    monitors.put(stats, regionMonitor);
-    listeners.put(stats, listener);
-  }
+  Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
+    if (name.equals(StatsKey.PRIMARY_BUCKET_COUNT)) {
+      Number prevValue = statsMap.get(StatsKey.PRIMARY_BUCKET_COUNT);
+      return currentValue.intValue() - prevValue.intValue();
+    }
 
+    if (name.equals(StatsKey.BUCKET_COUNT)) {
+      Number prevValue = statsMap.get(StatsKey.BUCKET_COUNT);
+      return currentValue.intValue() - prevValue.intValue();
+    }
 
-  public void removePartitionStatistics(Statistics stats) {
-    MemberLevelRegionStatisticsListener listener = removeListener(stats);
-    if (listener != null) {
-      listener.decreaseParStats(stats);
+    if (name.equals(StatsKey.TOTAL_BUCKET_SIZE)) {
+      Number prevValue = statsMap.get(StatsKey.TOTAL_BUCKET_SIZE);
+      return currentValue.intValue() - prevValue.intValue();
     }
-  }
 
-  public void removeLRUStatistics(Statistics stats) {
-    removeListener(stats);
-  }
+    if (name.equals(StatsKey.LRU_EVICTIONS)) {
+      Number prevValue = statsMap.get(StatsKey.LRU_EVICTIONS);
+      return currentValue.longValue() - prevValue.longValue();
+    }
 
-  public void removeDirectoryStatistics(Statistics stats) {
-    removeListener(stats);
+    if (name.equals(StatsKey.LRU_DESTROYS)) {
+      Number prevValue = statsMap.get(StatsKey.LRU_DESTROYS);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.DISK_SPACE)) {
+      Number prevValue = statsMap.get(StatsKey.DISK_SPACE);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    return 0;
   }
 
-  @Override
-  public void stopListener() {
-    for (Statistics stat : listeners.keySet()) {
-      ValueMonitor monitor = monitors.get(stat);
-      monitor.removeListener(listeners.get(stat));
-      monitor.removeStatistics(stat);
+  void increaseStats(String name, Number value) {
+    if (name.equals(StatsKey.PRIMARY_BUCKET_COUNT)) {
+      primaryBucketCount += value.intValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.BUCKET_COUNT)) {
+      bucketCount += value.intValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.TOTAL_BUCKET_SIZE)) {
+      totalBucketSize += value.intValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.LRU_EVICTIONS)) {
+      lruEvictions += value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.LRU_DESTROYS)) {
+      lruDestroys += value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.DISK_SPACE)) {
+      diskSpace += value.longValue();
+      return;
     }
-    listeners.clear();
-    monitors.clear();
   }
 
   private MemberLevelRegionStatisticsListener removeListener(Statistics stats) 
{
@@ -98,38 +163,86 @@ public class AggregateRegionStatsMonitor extends 
MBeanStatsMonitor {
     }
 
     MemberLevelRegionStatisticsListener listener = listeners.remove(stats);
-    if (listener != null) {
+    if ((listener != null) && (monitor != null)) {
       monitor.removeListener(listener);
     }
+
     return listener;
   }
 
+  public void removeLRUStatistics(Statistics stats) {
+    removeListener(stats);
+  }
+
+  public void removeDirectoryStatistics(Statistics stats) {
+    removeListener(stats);
+  }
+
+  public void removePartitionStatistics(Statistics stats) {
+    MemberLevelRegionStatisticsListener listener = removeListener(stats);
+
+    if (listener != null) {
+      listener.decreaseParStats();
+    }
+  }
+
   @Override
   public Number getStatistic(String name) {
     if (name.equals(StatsKey.LRU_EVICTIONS)) {
       return getLruEvictions();
     }
+
     if (name.equals(StatsKey.LRU_DESTROYS)) {
       return getLruDestroys();
     }
+
     if (name.equals(StatsKey.PRIMARY_BUCKET_COUNT)) {
       return getTotalPrimaryBucketCount();
     }
+
     if (name.equals(StatsKey.BUCKET_COUNT)) {
       return getTotalBucketCount();
     }
+
     if (name.equals(StatsKey.TOTAL_BUCKET_SIZE)) {
       return getTotalBucketSize();
     }
+
     if (name.equals(StatsKey.DISK_SPACE)) {
       return getDiskSpace();
     }
+
     return 0;
   }
 
-  private class MemberLevelRegionStatisticsListener implements 
StatisticsListener {
-    DefaultHashMap statsMap = new DefaultHashMap();
+  @Override
+  public void addStatisticsToMonitor(Statistics stats) {
+    ValueMonitor regionMonitor = new ValueMonitor();
+    MemberLevelRegionStatisticsListener listener = new 
MemberLevelRegionStatisticsListener();
+    regionMonitor.addListener(listener);
+    regionMonitor.addStatistics(stats);
+
+    monitors.put(stats, regionMonitor);
+    listeners.put(stats, listener);
+  }
 
+  @Override
+  public void stopListener() {
+    for (Statistics stat : listeners.keySet()) {
+      ValueMonitor monitor = monitors.get(stat);
+      monitor.removeListener(listeners.get(stat));
+      monitor.removeStatistics(stat);
+    }
+
+    listeners.clear();
+    monitors.clear();
+  }
+
+  @Override
+  public void removeStatisticsFromMonitor(Statistics stats) {}
+
+  class MemberLevelRegionStatisticsListener implements StatisticsListener {
+    final DefaultHashMap statsMap = new DefaultHashMap();
     private boolean removed = false;
 
     @Override
@@ -138,26 +251,26 @@ public class AggregateRegionStatsMonitor extends 
MBeanStatsMonitor {
         if (removed) {
           return;
         }
+
         for (StatisticId statId : notification) {
           StatisticDescriptor descriptor = statId.getStatisticDescriptor();
           String name = descriptor.getName();
           Number value;
+
           try {
             value = notification.getValue(statId);
           } catch (StatisticNotFoundException e) {
             value = 0;
           }
+
           log(name, value);
           Number deltaValue = computeDelta(statsMap, name, value);
           statsMap.put(name, value);
           increaseStats(name, deltaValue);
-          // fix for bug 46604
         }
       }
-
     }
 
-
     /**
      * Only decrease those values which can both increase and decrease and not 
values which can only
      * increase like read/writes
@@ -166,121 +279,13 @@ public class AggregateRegionStatsMonitor extends 
MBeanStatsMonitor {
      * DefaultHashMap for the disk
      *
      */
-    public void decreaseParStats(Statistics stats) {
+    void decreaseParStats() {
       synchronized (statsMap) {
-        primaryBucketCount -= 
statsMap.get(StatsKey.PRIMARY_BUCKET_COUNT).intValue();
         bucketCount -= statsMap.get(StatsKey.BUCKET_COUNT).intValue();
         totalBucketSize -= statsMap.get(StatsKey.TOTAL_BUCKET_SIZE).intValue();
+        primaryBucketCount -= 
statsMap.get(StatsKey.PRIMARY_BUCKET_COUNT).intValue();
         removed = true;
       }
-
-    }
-
-
-
-  };
-
-
-
-  private Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
-    if (name.equals(StatsKey.PRIMARY_BUCKET_COUNT)) {
-      Number prevValue = 
statsMap.get(StatsKey.PRIMARY_BUCKET_COUNT).intValue();
-      Number deltaValue = currentValue.intValue() - prevValue.intValue();
-      return deltaValue;
-    }
-
-    if (name.equals(StatsKey.BUCKET_COUNT)) {
-      Number prevValue = statsMap.get(StatsKey.BUCKET_COUNT).intValue();
-      Number deltaValue = currentValue.intValue() - prevValue.intValue();
-      return deltaValue;
-    }
-
-    if (name.equals(StatsKey.TOTAL_BUCKET_SIZE)) {
-      Number prevValue = statsMap.get(StatsKey.TOTAL_BUCKET_SIZE).intValue();
-      Number deltaValue = currentValue.intValue() - prevValue.intValue();
-      return deltaValue;
-    }
-
-    if (name.equals(StatsKey.LRU_EVICTIONS)) {
-      Number prevValue = statsMap.get(StatsKey.LRU_EVICTIONS).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
     }
-
-    if (name.equals(StatsKey.LRU_DESTROYS)) {
-      Number prevValue = statsMap.get(StatsKey.LRU_DESTROYS).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-
-    if (name.equals(StatsKey.DISK_SPACE)) {
-      Number prevValue = statsMap.get(StatsKey.DISK_SPACE).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    return 0;
   }
-
-  private void increaseStats(String name, Number value) {
-    if (name.equals(StatsKey.PRIMARY_BUCKET_COUNT)) {
-      primaryBucketCount += value.intValue();
-      return;
-    }
-    if (name.equals(StatsKey.BUCKET_COUNT)) {
-      bucketCount += value.intValue();
-      return;
-    }
-    if (name.equals(StatsKey.TOTAL_BUCKET_SIZE)) {
-      totalBucketSize += value.intValue();
-      return;
-    }
-
-    if (name.equals(StatsKey.LRU_EVICTIONS)) {
-      lruEvictions += value.longValue();
-      return;
-    }
-
-    if (name.equals(StatsKey.LRU_DESTROYS)) {
-      lruDestroys += value.longValue();
-      return;
-    }
-
-    if (name.equals(StatsKey.DISK_SPACE)) {
-      diskSpace += value.longValue();
-      return;
-    }
-
-  }
-
-  public int getTotalPrimaryBucketCount() {
-    return primaryBucketCount;
-  }
-
-  public int getTotalBucketCount() {
-    return bucketCount;
-  }
-
-  public int getTotalBucketSize() {
-    return totalBucketSize;
-  }
-
-  public long getLruDestroys() {
-    return lruDestroys;
-  }
-
-  public long getLruEvictions() {
-    return lruEvictions;
-  }
-
-  public long getDiskSpace() {
-    return diskSpace;
-  }
-
-  @Override
-  public void removeStatisticsFromMonitor(Statistics stats) {
-    // TODO Auto-generated method stub
-
-  }
-
-
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitor.java
index e0da0a8..63999ab 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitor.java
@@ -15,72 +15,91 @@
 package org.apache.geode.management.internal.beans.stats;
 
 import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.statistics.StatisticId;
 import org.apache.geode.internal.statistics.StatisticNotFoundException;
 import org.apache.geode.internal.statistics.StatisticsNotification;
 
+/**
+ * This class acts as a monitor and listen for GC statistics updates on behalf 
of MemberMBean.
+ * <p>
+ * There's only one dedicated thread that wakes up at the
+ * {@link ConfigurationProperties#STATISTIC_SAMPLE_RATE} configured, samples 
all the statistics,
+ * writes them to the {@link ConfigurationProperties#STATISTIC_ARCHIVE_FILE} 
configured (if any) and
+ * notifies listeners of changes. The mutable fields are declared as {@code 
volatile} to make sure
+ * readers of the statistics get the latest recorded value.
+ * <p>
+ * This class is conditionally thread-safe, there can be multiple concurrent 
readers accessing a
+ * instance, but concurrent writers need to be synchronized externally.
+ *
+ * @see org.apache.geode.internal.statistics.HostStatSampler
+ * @see org.apache.geode.distributed.ConfigurationProperties
+ * @see org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor
+ */
 public class GCStatsMonitor extends MBeanStatsMonitor {
-
   private volatile long collections = 0;
-
   private volatile long collectionTime = 0;
 
+  long getCollections() {
+    return collections;
+  }
+
+  long getCollectionTime() {
+    return collectionTime;
+  }
 
   public GCStatsMonitor(String name) {
     super(name);
   }
 
+  void decreasePrevValues(DefaultHashMap statsMap) {
+    collections -= statsMap.get(StatsKey.VM_GC_STATS_COLLECTIONS).longValue();
+    collectionTime -= 
statsMap.get(StatsKey.VM_GC_STATS_COLLECTION_TIME).longValue();
+  }
+
+  void increaseStats(String name, Number value) {
+    if (name.equals(StatsKey.VM_GC_STATS_COLLECTIONS)) {
+      collections += value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.VM_GC_STATS_COLLECTION_TIME)) {
+      collectionTime += value.longValue();
+      return;
+    }
+  }
+
   @Override
   public Number getStatistic(String statName) {
     if (statName.equals(StatsKey.VM_GC_STATS_COLLECTIONS)) {
       return getCollections();
     }
+
     if (statName.equals(StatsKey.VM_GC_STATS_COLLECTION_TIME)) {
       return getCollectionTime();
     }
+
     return 0;
   }
 
-
   @Override
   public void handleNotification(StatisticsNotification notification) {
     decreasePrevValues(statsMap);
+
     for (StatisticId statId : notification) {
       StatisticDescriptor descriptor = statId.getStatisticDescriptor();
       String name = descriptor.getName();
       Number value;
+
       try {
         value = notification.getValue(statId);
       } catch (StatisticNotFoundException e) {
         value = 0;
       }
+
       log(name, value);
       increaseStats(name, value);
       statsMap.put(name, value);
     }
   }
-
-  private void decreasePrevValues(DefaultHashMap statsMap) {
-    collections -= statsMap.get(StatsKey.VM_GC_STATS_COLLECTIONS).intValue();
-    collectionTime -= 
statsMap.get(StatsKey.VM_GC_STATS_COLLECTION_TIME).intValue();
-  }
-
-  private void increaseStats(String name, Number value) {
-    if (name.equals(StatsKey.VM_GC_STATS_COLLECTIONS)) {
-      collections += value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.VM_GC_STATS_COLLECTION_TIME)) {
-      collectionTime += value.longValue();
-      return;
-    }
-  }
-
-  public long getCollections() {
-    return collections;
-  }
-
-  public long getCollectionTime() {
-    return collectionTime;
-  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
index b46531b..0d71865 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
@@ -19,23 +19,56 @@ import java.util.Map;
 
 import org.apache.geode.StatisticDescriptor;
 import org.apache.geode.Statistics;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.statistics.StatisticId;
 import org.apache.geode.internal.statistics.StatisticNotFoundException;
 import org.apache.geode.internal.statistics.StatisticsListener;
 import org.apache.geode.internal.statistics.StatisticsNotification;
 import org.apache.geode.internal.statistics.ValueMonitor;
 
+/**
+ * This class acts as a monitor and listen for Gateway Sender Overflow 
statistics updates on
+ * behalf of MemberMBean.
+ * <p>
+ * There's only one dedicated thread that wakes up at the
+ * {@link ConfigurationProperties#STATISTIC_SAMPLE_RATE} configured, samples 
all the statistics,
+ * writes them to the {@link ConfigurationProperties#STATISTIC_ARCHIVE_FILE} 
configured (if any) and
+ * notifies listeners of changes. The mutable fields are declared as {@code 
volatile} to make sure
+ * readers of the statistics get the latest recorded value.
+ * <p>
+ * This class is conditionally thread-safe, there can be multiple concurrent 
readers accessing a
+ * instance, but concurrent writers need to be synchronized externally.
+ *
+ * @see org.apache.geode.internal.statistics.HostStatSampler
+ * @see org.apache.geode.distributed.ConfigurationProperties
+ * @see org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor
+ */
 public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
-
+  private volatile long lruEvictions = 0;
+  private volatile long bytesOverflowedToDisk = 0;
   private volatile long entriesOverflowedToDisk = 0;
+  private final Map<Statistics, ValueMonitor> monitors;
+  private final Map<Statistics, StatisticsListener> listeners;
 
-  private volatile long bytesOverflowedToDisk = 0;
+  public long getLruEvictions() {
+    return lruEvictions;
+  }
 
-  private volatile long lruEvictions = 0;
+  public long getBytesOverflowedToDisk() {
+    return bytesOverflowedToDisk;
+  }
 
-  private Map<Statistics, ValueMonitor> monitors;
+  public long getEntriesOverflowedToDisk() {
+    return entriesOverflowedToDisk;
+  }
 
-  private Map<Statistics, StatisticsListener> listeners;
+  Map<Statistics, ValueMonitor> getMonitors() {
+    return monitors;
+  }
+
+  Map<Statistics, StatisticsListener> getListeners() {
+    return listeners;
+  }
 
   public GatewaySenderOverflowMonitor(String name) {
     super(name);
@@ -43,47 +76,86 @@ public class GatewaySenderOverflowMonitor extends 
MBeanStatsMonitor {
     listeners = new HashMap<>();
   }
 
+  Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
+    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
+      Number prevValue = statsMap.get(StatsKey.GATEWAYSENDER_LRU_EVICTIONS);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
+      Number prevValue = 
statsMap.get(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
+      Number prevValue = 
statsMap.get(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    return 0;
+  }
+
+  void increaseStats(String name, Number value) {
+    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
+      lruEvictions += value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
+      entriesOverflowedToDisk += value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
+      bytesOverflowedToDisk += value.longValue();
+      return;
+    }
+  }
+
+  @Override
+  public Number getStatistic(String name) {
+    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
+      return getLruEvictions();
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
+      return getEntriesOverflowedToDisk();
+    }
+
+    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
+      return getBytesOverflowedToDisk();
+    }
+
+    return 0;
+  }
+
   @Override
   public void addStatisticsToMonitor(Statistics stats) {
     ValueMonitor overflowMonitor = new ValueMonitor();
     StatisticsListener listener = new 
GatewaySenderOverflowStatisticsListener();
     overflowMonitor.addListener(listener);
     overflowMonitor.addStatistics(stats);
+
     monitors.put(stats, overflowMonitor);
     listeners.put(stats, listener);
   }
 
   @Override
-  public void removeStatisticsFromMonitor(Statistics stats) {}
-
-  @Override
   public void stopListener() {
     for (Statistics stat : listeners.keySet()) {
       ValueMonitor monitor = monitors.get(stat);
       monitor.removeListener(listeners.get(stat));
       monitor.removeStatistics(stat);
     }
+
     listeners.clear();
     monitors.clear();
   }
 
   @Override
-  public Number getStatistic(String name) {
-    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
-      return getLruEvictions();
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
-      return getEntriesOverflowedToDisk();
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
-      return getBytesOverflowedToDisk();
-    }
-    return 0;
-  }
-
-
-  private class GatewaySenderOverflowStatisticsListener implements 
StatisticsListener {
+  public void removeStatisticsFromMonitor(Statistics stats) {}
 
+  class GatewaySenderOverflowStatisticsListener implements StatisticsListener {
     DefaultHashMap statsMap = new DefaultHashMap();
 
     @Override
@@ -93,65 +165,19 @@ public class GatewaySenderOverflowMonitor extends 
MBeanStatsMonitor {
           StatisticDescriptor descriptor = statId.getStatisticDescriptor();
           String name = descriptor.getName();
           Number value;
+
           try {
             value = notification.getValue(statId);
           } catch (StatisticNotFoundException e) {
             value = 0;
           }
-          log(name, value);
 
+          log(name, value);
           Number deltaValue = computeDelta(statsMap, name, value);
           statsMap.put(name, value);
           increaseStats(name, deltaValue);
         }
       }
     }
-  };
-
-  private Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
-    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
-      Number prevValue = 
statsMap.get(StatsKey.GATEWAYSENDER_LRU_EVICTIONS).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
-      Number prevValue =
-          
statsMap.get(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
-      Number prevValue = 
statsMap.get(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    return 0;
-  }
-
-  private void increaseStats(String name, Number value) {
-    if (name.equals(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) {
-      lruEvictions += value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)) {
-      entriesOverflowedToDisk += value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK)) {
-      bytesOverflowedToDisk += value.longValue();
-      return;
-    }
-  }
-
-  public long getLruEvictions() {
-    return lruEvictions;
-  }
-
-  public long getEntriesOverflowedToDisk() {
-    return entriesOverflowedToDisk;
-  }
-
-  public long getBytesOverflowedToDisk() {
-    return bytesOverflowedToDisk;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitor.java
index 3415606..f6a8c02 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitor.java
@@ -19,110 +19,259 @@ import java.util.Map;
 
 import org.apache.geode.StatisticDescriptor;
 import org.apache.geode.Statistics;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.statistics.StatisticId;
 import org.apache.geode.internal.statistics.StatisticNotFoundException;
 import org.apache.geode.internal.statistics.StatisticsListener;
 import org.apache.geode.internal.statistics.StatisticsNotification;
 import org.apache.geode.internal.statistics.ValueMonitor;
-import 
org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor.DefaultHashMap;
 
+/**
+ * This class acts as a monitor and listen for Disk statistics updates on 
behalf of MemberMBean.
+ * <p>
+ * There's only one dedicated thread that wakes up at the
+ * {@link ConfigurationProperties#STATISTIC_SAMPLE_RATE} configured, samples 
all the statistics,
+ * writes them to the {@link ConfigurationProperties#STATISTIC_ARCHIVE_FILE} 
configured (if any) and
+ * notifies listeners of changes. The mutable fields are declared as {@code 
volatile} to make sure
+ * readers of the statistics get the latest recorded value.
+ * <p>
+ * This class is conditionally thread-safe, there can be multiple concurrent 
readers accessing a
+ * instance, but concurrent writers need to be synchronized externally.
+ *
+ * @see org.apache.geode.internal.statistics.HostStatSampler
+ * @see org.apache.geode.distributed.ConfigurationProperties
+ * @see org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor
+ */
 public class MemberLevelDiskMonitor extends MBeanStatsMonitor {
-
-
+  private volatile long flushes = 0;
+  private volatile int queueSize = 0;
+  private volatile long flushTime = 0;
+  private volatile long flushedBytes = 0;
   private volatile long diskReadBytes = 0;
-
+  private volatile int backupsCompleted = 0;
   private volatile long diskWrittenBytes = 0;
-
   private volatile int backupsInProgress = 0;
+  private final Map<Statistics, ValueMonitor> monitors;
+  private final Map<Statistics, MemberLevelDiskStatisticsListener> listeners;
 
-  private volatile int backupsCompleted = 0;
+  public long getFlushes() {
+    return flushes;
+  }
 
-  private volatile long flushedBytes = 0;
+  public int getQueueSize() {
+    return queueSize;
+  }
 
-  private volatile long flushes = 0;
+  public long getFlushTime() {
+    return flushTime;
+  }
 
-  private volatile long flushTime = 0;
+  public long getFlushedBytes() {
+    return flushedBytes;
+  }
 
-  private volatile int queueSize = 0;
+  public long getDiskReadBytes() {
+    return diskReadBytes;
+  }
+
+  public int getBackupsCompleted() {
+    return backupsCompleted;
+  }
 
+  public long getDiskWrittenBytes() {
+    return diskWrittenBytes;
+  }
 
-  private Map<Statistics, ValueMonitor> monitors;
+  public int getBackupsInProgress() {
+    return backupsInProgress;
+  }
 
-  private Map<Statistics, MemberLevelDiskStatisticsListener> listeners;
+  Map<Statistics, ValueMonitor> getMonitors() {
+    return monitors;
+  }
+
+  Map<Statistics, MemberLevelDiskStatisticsListener> getListeners() {
+    return listeners;
+  }
 
   public MemberLevelDiskMonitor(String name) {
     super(name);
-    monitors = new HashMap<Statistics, ValueMonitor>();
-    listeners = new HashMap<Statistics, MemberLevelDiskStatisticsListener>();
+    monitors = new HashMap<>();
+    listeners = new HashMap<>();
   }
 
-  @Override
-  public void addStatisticsToMonitor(Statistics stats) {
-    ValueMonitor diskMonitor = new ValueMonitor();
-    MemberLevelDiskStatisticsListener listener = new 
MemberLevelDiskStatisticsListener();
-    diskMonitor.addListener(listener);
-    diskMonitor.addStatistics(stats);
-    monitors.put(stats, diskMonitor);
-    listeners.put(stats, listener);
-  }
+  Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
+    if (name.equals(StatsKey.DISK_READ_BYTES)) {
+      Number prevValue = statsMap.get(StatsKey.DISK_READ_BYTES);
+      return currentValue.longValue() - prevValue.longValue();
+    }
 
-  @Override
-  public void removeStatisticsFromMonitor(Statistics stats) {
-    ValueMonitor monitor = monitors.remove(stats);
-    if (monitor != null) {
-      monitor.removeStatistics(stats);
+    if (name.equals(StatsKey.DISK_RECOVERED_BYTES)) {
+      Number prevValue = statsMap.get(StatsKey.DISK_RECOVERED_BYTES);
+      return currentValue.longValue() - prevValue.longValue();
     }
-    MemberLevelDiskStatisticsListener listener = listeners.remove(stats);
-    if (listener != null) {
-      monitor.removeListener(listener);
+
+    if (name.equals(StatsKey.DISK_WRITEN_BYTES)) {
+      Number prevValue = statsMap.get(StatsKey.DISK_WRITEN_BYTES);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.BACKUPS_IN_PROGRESS)) {
+      // A negative value is also OK. previous backup_in_progress = 5 
curr_backup_in_progress = 2
+      // delta = -3 delta should be added to aggregate backup in progress
+      Number prevValue = statsMap.get(StatsKey.BACKUPS_IN_PROGRESS);
+      return currentValue.intValue() - prevValue.intValue();
     }
-    listener.decreaseDiskStoreStats(stats);
+
+    if (name.equals(StatsKey.BACKUPS_COMPLETED)) {
+      Number prevValue = statsMap.get(StatsKey.BACKUPS_COMPLETED);
+      return currentValue.intValue() - prevValue.intValue();
+    }
+
+    if (name.equals(StatsKey.FLUSHED_BYTES)) {
+      Number prevValue = statsMap.get(StatsKey.FLUSHED_BYTES);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.NUM_FLUSHES)) {
+      Number prevValue = statsMap.get(StatsKey.NUM_FLUSHES);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.TOTAL_FLUSH_TIME)) {
+      Number prevValue = statsMap.get(StatsKey.TOTAL_FLUSH_TIME);
+      return currentValue.longValue() - prevValue.longValue();
+    }
+
+    if (name.equals(StatsKey.DISK_QUEUE_SIZE)) {
+      Number prevValue = statsMap.get(StatsKey.DISK_QUEUE_SIZE);
+      return currentValue.intValue() - prevValue.intValue();
+    }
+
+    return 0;
   }
 
-  @Override
-  public void stopListener() {
-    for (Statistics stat : listeners.keySet()) {
-      ValueMonitor monitor = monitors.get(stat);
-      monitor.removeListener(listeners.get(stat));
-      monitor.removeStatistics(stat);
+  void increaseStats(String name, Number value) {
+    if ((name.equals(StatsKey.DISK_READ_BYTES) || 
name.equals(StatsKey.DISK_RECOVERED_BYTES))) {
+      diskReadBytes = diskReadBytes + value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.DISK_WRITEN_BYTES)) {
+      diskWrittenBytes = diskWrittenBytes + value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.BACKUPS_IN_PROGRESS)) {
+      backupsInProgress = backupsInProgress + value.intValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.BACKUPS_COMPLETED)) {
+      backupsCompleted = backupsCompleted + value.intValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.FLUSHED_BYTES)) {
+      flushedBytes = flushedBytes + value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.NUM_FLUSHES)) {
+      flushes = flushes + value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.TOTAL_FLUSH_TIME)) {
+      flushTime = flushTime + value.longValue();
+      return;
+    }
+
+    if (name.equals(StatsKey.DISK_QUEUE_SIZE)) {
+      queueSize = queueSize + value.intValue();
+      return;
     }
-    listeners.clear();
-    monitors.clear();
   }
 
   @Override
   public Number getStatistic(String name) {
     if (name.equals(StatsKey.DISK_READ_BYTES)) {
-      return getDiskReads();
+      return getDiskReadBytes();
     }
+
     if (name.equals(StatsKey.DISK_WRITEN_BYTES)) {
-      return getDiskWrites();
+      return getDiskWrittenBytes();
     }
+
     if (name.equals(StatsKey.BACKUPS_IN_PROGRESS)) {
       return getBackupsInProgress();
     }
+
     if (name.equals(StatsKey.BACKUPS_COMPLETED)) {
       return getBackupsCompleted();
     }
+
     if (name.equals(StatsKey.FLUSHED_BYTES)) {
       return getFlushedBytes();
     }
+
     if (name.equals(StatsKey.NUM_FLUSHES)) {
       return getFlushes();
     }
+
     if (name.equals(StatsKey.TOTAL_FLUSH_TIME)) {
       return getFlushTime();
     }
+
     if (name.equals(StatsKey.DISK_QUEUE_SIZE)) {
       return getQueueSize();
     }
+
     return 0;
   }
 
-  private class MemberLevelDiskStatisticsListener implements 
StatisticsListener {
+  @Override
+  public void addStatisticsToMonitor(Statistics stats) {
+    ValueMonitor diskMonitor = new ValueMonitor();
+    MemberLevelDiskStatisticsListener listener = new 
MemberLevelDiskStatisticsListener();
+    diskMonitor.addListener(listener);
+    diskMonitor.addStatistics(stats);
 
-    DefaultHashMap statsMap = new DefaultHashMap();
+    monitors.put(stats, diskMonitor);
+    listeners.put(stats, listener);
+  }
+
+  @Override
+  public void stopListener() {
+    for (Statistics stat : listeners.keySet()) {
+      ValueMonitor monitor = monitors.get(stat);
+      monitor.removeListener(listeners.get(stat));
+      monitor.removeStatistics(stat);
+    }
+
+    monitors.clear();
+    listeners.clear();
+  }
+
+  @Override
+  public void removeStatisticsFromMonitor(Statistics stats) {
+    ValueMonitor monitor = monitors.remove(stats);
+    if (monitor != null) {
+      monitor.removeStatistics(stats);
+    }
+
+    MemberLevelDiskStatisticsListener listener = listeners.remove(stats);
+    if (listener != null) {
+      if (monitor != null) {
+        monitor.removeListener(listener);
+      }
 
+      listener.decreaseDiskStoreStats();
+    }
+  }
+
+  class MemberLevelDiskStatisticsListener implements StatisticsListener {
+    DefaultHashMap statsMap = new DefaultHashMap();
     private boolean removed = false;
 
     @Override
@@ -131,23 +280,23 @@ public class MemberLevelDiskMonitor extends 
MBeanStatsMonitor {
         if (removed) {
           return;
         }
+
         for (StatisticId statId : notification) {
           StatisticDescriptor descriptor = statId.getStatisticDescriptor();
           String name = descriptor.getName();
           Number value;
+
           try {
             value = notification.getValue(statId);
           } catch (StatisticNotFoundException e) {
             value = 0;
           }
-          log(name, value);
 
+          log(name, value);
           Number deltaValue = computeDelta(statsMap, name, value);
           statsMap.put(name, value);
           increaseStats(name, deltaValue);
-
         }
-
       }
     }
 
@@ -159,138 +308,12 @@ public class MemberLevelDiskMonitor extends 
MBeanStatsMonitor {
      * DefaultHashMap for the disk
      *
      */
-
-    public void decreaseDiskStoreStats(Statistics stats) {
+    void decreaseDiskStoreStats() {
       synchronized (statsMap) {
         queueSize -= statsMap.get(StatsKey.DISK_QUEUE_SIZE).intValue();
         backupsInProgress -= 
statsMap.get(StatsKey.BACKUPS_IN_PROGRESS).intValue();;
         removed = true;
-
       }
-
-    }
-
-  };
-
-  private Number computeDelta(DefaultHashMap statsMap, String name, Number 
currentValue) {
-    if (name.equals(StatsKey.DISK_READ_BYTES)) {
-      Number prevValue = statsMap.get(StatsKey.DISK_READ_BYTES).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.DISK_RECOVERED_BYTES)) {
-      Number prevValue = 
statsMap.get(StatsKey.DISK_RECOVERED_BYTES).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
     }
-
-    if (name.equals(StatsKey.DISK_WRITEN_BYTES)) {
-      Number prevValue = statsMap.get(StatsKey.DISK_WRITEN_BYTES).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.BACKUPS_IN_PROGRESS)) {
-      /**
-       * A negative value is also OK. previous backup_in_progress = 5 
curr_backup_in_progress = 2
-       * delta = -3 delta should be added to aggregate backup in progress
-       */
-      Number prevValue = 
statsMap.get(StatsKey.BACKUPS_IN_PROGRESS).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.BACKUPS_COMPLETED)) {
-      Number prevValue = statsMap.get(StatsKey.BACKUPS_COMPLETED).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.FLUSHED_BYTES)) {
-      Number prevValue = statsMap.get(StatsKey.FLUSHED_BYTES).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.NUM_FLUSHES)) {
-      Number prevValue = statsMap.get(StatsKey.NUM_FLUSHES).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.TOTAL_FLUSH_TIME)) {
-      Number prevValue = statsMap.get(StatsKey.TOTAL_FLUSH_TIME).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    if (name.equals(StatsKey.DISK_QUEUE_SIZE)) {
-      Number prevValue = statsMap.get(StatsKey.DISK_QUEUE_SIZE).longValue();
-      Number deltaValue = currentValue.longValue() - prevValue.longValue();
-      return deltaValue;
-    }
-    return 0;
-  }
-
-  private void increaseStats(String name, Number value) {
-    if ((name.equals(StatsKey.DISK_READ_BYTES) || 
name.equals(StatsKey.DISK_RECOVERED_BYTES))) {
-      diskReadBytes = diskReadBytes + value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.DISK_WRITEN_BYTES)) {
-      diskWrittenBytes = diskWrittenBytes + value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.BACKUPS_IN_PROGRESS)) {
-      backupsInProgress = backupsInProgress + value.intValue();
-      return;
-    }
-    if (name.equals(StatsKey.BACKUPS_COMPLETED)) {
-      backupsCompleted = backupsCompleted + value.intValue();
-      return;
-    }
-    if (name.equals(StatsKey.FLUSHED_BYTES)) {
-      flushedBytes = flushedBytes + value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.NUM_FLUSHES)) {
-      flushes = flushes + value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.TOTAL_FLUSH_TIME)) {
-      flushTime = flushTime + value.longValue();
-      return;
-    }
-    if (name.equals(StatsKey.DISK_QUEUE_SIZE)) {
-      queueSize = queueSize + value.intValue();
-      return;
-    }
-  }
-
-
-  public long getDiskReads() {
-    return diskReadBytes;
-  }
-
-  public long getDiskWrites() {
-    return diskWrittenBytes;
-  }
-
-  public int getBackupsInProgress() {
-    return backupsInProgress;
-  }
-
-  public int getBackupsCompleted() {
-    return backupsCompleted;
-  }
-
-  public long getFlushedBytes() {
-    return flushedBytes;
-  }
-
-  public long getFlushes() {
-    return flushes;
-  }
-
-  public long getFlushTime() {
-    return flushTime;
-  }
-
-  public int getQueueSize() {
-    return queueSize;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitor.java
index a21eb81..eb43294 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitor.java
@@ -15,8 +15,10 @@
 package org.apache.geode.management.internal.beans.stats;
 
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.statistics.StatisticId;
 import org.apache.geode.internal.statistics.StatisticNotFoundException;
 import org.apache.geode.internal.statistics.StatisticsNotification;
@@ -24,64 +26,72 @@ import org.apache.geode.management.internal.MBeanJMXAdapter;
 
 /**
  * This class acts as a monitor and listen for VM stats update on behalf of 
MemberMBean.
+ * <p>
+ * There's only one dedicated thread that wakes up at the
+ * {@link ConfigurationProperties#STATISTIC_SAMPLE_RATE} configured, samples 
all the statistics,
+ * writes them to the {@link ConfigurationProperties#STATISTIC_ARCHIVE_FILE} 
configured (if any) and
+ * notifies listeners of changes. The mutable fields are declared as {@code 
volatile} to make sure
+ * readers of the statistics get the latest recorded value.
+ * <p>
+ * This class is conditionally thread-safe, there can be multiple concurrent 
readers accessing a
+ * instance, but concurrent writers need to be synchronized externally.
  *
- *
+ * @see org.apache.geode.internal.statistics.HostStatSampler
+ * @see org.apache.geode.distributed.ConfigurationProperties
+ * @see org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor
  */
 public class VMStatsMonitor extends MBeanStatsMonitor {
-
-  private static final int VALUE_NOT_AVAILABLE = -1;
-
+  static final int VALUE_NOT_AVAILABLE = -1;
+  private static final String PROCESS_CPU_TIME_ATTRIBUTE = "ProcessCpuTime";
+  private long lastSystemTime = 0;
+  private long lastProcessCpuTime = 0;
   private volatile float cpuUsage = 0;
+  private final boolean processCPUTimeAvailable;
 
-  private static String processCPUTimeAttr = "ProcessCpuTime";
-
-  private long lastSystemTime = 0;
+  public float getCpuUsage() {
+    return cpuUsage;
+  }
 
-  private long lastProcessCpuTime = 0;
+  long getLastSystemTime() {
+    return lastSystemTime;
+  }
 
-  private boolean processCPUTimeAvailable;
+  long getLastProcessCpuTime() {
+    return lastProcessCpuTime;
+  }
 
   public VMStatsMonitor(String name) {
     super(name);
-    processCPUTimeAvailable = 
MBeanJMXAdapter.isAttributeAvailable(processCPUTimeAttr,
+    processCPUTimeAvailable = 
MBeanJMXAdapter.isAttributeAvailable(PROCESS_CPU_TIME_ATTRIBUTE,
         ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME);
     if (!processCPUTimeAvailable) {
       cpuUsage = VALUE_NOT_AVAILABLE;
     }
-
   }
 
-
-  @Override
-  public void handleNotification(StatisticsNotification notification) {
-
-    for (StatisticId statId : notification) {
-      StatisticDescriptor descriptor = statId.getStatisticDescriptor();
-      String name = descriptor.getName();
-      Number value;
-      try {
-        value = notification.getValue(statId);
-      } catch (StatisticNotFoundException e) {
-        value = 0;
-      }
-      log(name, value);
-      statsMap.put(name, value);
-    }
-    refreshStats();
+  long currentTimeMillis() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
   }
 
+  /**
+   *
+   * @param systemTime Current system time.
+   * @param cpuTime Last gathered cpu time.
+   * @return The time (as a percentage) that this member's process time with 
respect to Statistics
+   *         sample time interval. If process time between two sample time t1 
& t2 is p1 and p2
+   *         cpuUsage = ((p2-p1) * 100) / ((t2-t1).
+   */
+  float calculateCpuUsage(long systemTime, long cpuTime) {
+    // 10000 = (Nano conversion factor / 100 for percentage)
+    long denom = (systemTime - getLastSystemTime()) * 10000;
+    return (float) (cpuTime - getLastProcessCpuTime()) / denom;
+  }
 
   /**
    * Right now it only refreshes CPU usage in terms of percentage. This method 
can be used for any
    * other computation based on Stats in future.
-   *
-   * Returns the time (as a percentage) that this member's process time with 
respect to Statistics
-   * sample time interval. If process time between two sample time t1 & t2 is 
p1 and p2 cpuUsage =
-   * ((p2-p1) * 100) / ((t2-t1)
-   *
    */
-  private void refreshStats() {
-
+  synchronized void refreshStats() {
     if (processCPUTimeAvailable) {
       Number processCpuTime = statsMap.get(StatsKey.VM_PROCESS_CPU_TIME);
 
@@ -92,9 +102,8 @@ public class VMStatsMonitor extends MBeanStatsMonitor {
         return;
       }
 
-
       if (lastSystemTime == 0) {
-        lastSystemTime = System.currentTimeMillis();
+        lastSystemTime = currentTimeMillis();
         return;
       }
 
@@ -103,22 +112,31 @@ public class VMStatsMonitor extends MBeanStatsMonitor {
         lastProcessCpuTime = cpuTime;
         return;
       }
-      long systemTime = System.currentTimeMillis();
-
-      // 10000 = (Nano conversion factor / 100 for percentage)
-      long denom = (systemTime - lastSystemTime) * 10000;
-
-      float processCpuUsage = (float) (cpuTime - lastProcessCpuTime) / denom;
 
+      long systemTime = currentTimeMillis();
+      cpuUsage = calculateCpuUsage(systemTime, cpuTime);
       lastSystemTime = systemTime;
       lastProcessCpuTime = cpuTime;
-      cpuUsage = processCpuUsage;
     }
-
   }
 
-  public float getCpuUsage() {
-    return cpuUsage;
-  }
+  @Override
+  public void handleNotification(StatisticsNotification notification) {
+    for (StatisticId statId : notification) {
+      StatisticDescriptor descriptor = statId.getStatisticDescriptor();
+      String name = descriptor.getName();
+      Number value;
 
+      try {
+        value = notification.getValue(statId);
+      } catch (StatisticNotFoundException e) {
+        value = 0;
+      }
+
+      log(name, value);
+      statsMap.put(name, value);
+    }
+
+    refreshStats();
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitorTest.java
new file mode 100644
index 0000000..302f243
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/AggregateRegionStatsMonitorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.geode.Statistics;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.io.MainWithChildrenRollingFileHandler;
+import org.apache.geode.internal.statistics.SampleCollector;
+import org.apache.geode.internal.statistics.StatArchiveHandlerConfig;
+import org.apache.geode.internal.statistics.StatisticsSampler;
+import org.apache.geode.internal.statistics.TestStatisticsManager;
+import org.apache.geode.internal.statistics.TestStatisticsSampler;
+import org.apache.geode.internal.statistics.ValueMonitor;
+import org.apache.geode.test.junit.categories.StatisticsTest;
+
+@Category(StatisticsTest.class)
+public class AggregateRegionStatsMonitorTest {
+  @Rule
+  public TestName testName = new TestName();
+
+  private AggregateRegionStatsMonitor aggregateRegionStatsMonitor;
+
+  @Before
+  public void setUp() {
+    final long startTime = System.currentTimeMillis();
+    TestStatisticsManager manager =
+        new TestStatisticsManager(1, getClass().getSimpleName(), startTime);
+    StatArchiveHandlerConfig mockStatArchiveHandlerConfig = 
mock(StatArchiveHandlerConfig.class,
+        getClass().getSimpleName() + "$" + 
StatArchiveHandlerConfig.class.getSimpleName());
+    when(mockStatArchiveHandlerConfig.getArchiveFileName()).thenReturn(new 
File(""));
+    
when(mockStatArchiveHandlerConfig.getArchiveFileSizeLimit()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getArchiveDiskSpaceLimit()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemId()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getSystemStartTime()).thenReturn(startTime);
+    when(mockStatArchiveHandlerConfig.getSystemDirectoryPath()).thenReturn("");
+    when(mockStatArchiveHandlerConfig.getProductDescription())
+        .thenReturn(getClass().getSimpleName());
+
+    StatisticsSampler sampler = new TestStatisticsSampler(manager);
+    SampleCollector sampleCollector = new SampleCollector(sampler);
+    sampleCollector.initialize(mockStatArchiveHandlerConfig, 
NanoTimer.getTime(),
+        new MainWithChildrenRollingFileHandler());
+    aggregateRegionStatsMonitor =
+        spy(new AggregateRegionStatsMonitor(this.testName.getMethodName()));
+
+    assertThat(aggregateRegionStatsMonitor).isNotNull();
+    assertThat(aggregateRegionStatsMonitor.getMonitors()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getListeners()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getDiskSpace()).isEqualTo(0);
+    assertThat(aggregateRegionStatsMonitor.getTotalBucketCount()).isEqualTo(0);
+    assertThat(aggregateRegionStatsMonitor.getLruDestroys()).isEqualTo(0);
+    assertThat(aggregateRegionStatsMonitor.getLruEvictions()).isEqualTo(0);
+    assertThat(aggregateRegionStatsMonitor.getTotalBucketSize()).isEqualTo(0);
+    
assertThat(aggregateRegionStatsMonitor.getTotalPrimaryBucketCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldReturnZeroForUnknownStatistics() {
+    assertThat(aggregateRegionStatsMonitor.computeDelta(new 
MBeanStatsMonitor.DefaultHashMap(),
+        "unknownStatistic", 6)).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldOperateForHandledStatistics() {
+    MBeanStatsMonitor.DefaultHashMap statsMap = new 
MBeanStatsMonitor.DefaultHashMap();
+    statsMap.put(StatsKey.PRIMARY_BUCKET_COUNT, 5);
+    statsMap.put(StatsKey.BUCKET_COUNT, 13);
+    statsMap.put(StatsKey.TOTAL_BUCKET_SIZE, 1024);
+    statsMap.put(StatsKey.LRU_EVICTIONS, 12);
+    statsMap.put(StatsKey.LRU_DESTROYS, 5);
+    statsMap.put(StatsKey.DISK_SPACE, 2048);
+
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.PRIMARY_BUCKET_COUNT, 6))
+        .isEqualTo(1);
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.BUCKET_COUNT, 15))
+        .isEqualTo(2);
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.TOTAL_BUCKET_SIZE, 1030))
+        .isEqualTo(6);
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.LRU_EVICTIONS, 20))
+        .isEqualTo(8L);
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.LRU_DESTROYS, 6))
+        .isEqualTo(1L);
+    assertThat(aggregateRegionStatsMonitor.computeDelta(statsMap, 
StatsKey.DISK_SPACE, 2050))
+        .isEqualTo(2L);
+  }
+
+  @Test
+  public void increaseStatsShouldIncrementStatisticsUsingTheSelectedValue() {
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.PRIMARY_BUCKET_COUNT, 
5);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.BUCKET_COUNT, 13);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.TOTAL_BUCKET_SIZE, 
1024);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_EVICTIONS, 12);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_DESTROYS, 5);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.DISK_SPACE, 2048);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.PRIMARY_BUCKET_COUNT))
+        .isEqualTo(5);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.BUCKET_COUNT)).isEqualTo(13);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.TOTAL_BUCKET_SIZE))
+        .isEqualTo(1024);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_EVICTIONS)).isEqualTo(12L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_DESTROYS)).isEqualTo(5L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.DISK_SPACE)).isEqualTo(2048L);
+
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.PRIMARY_BUCKET_COUNT, 
2);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.BUCKET_COUNT, 2);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.TOTAL_BUCKET_SIZE, 1);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_EVICTIONS, 8);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_DESTROYS, 5);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.DISK_SPACE, 2);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.PRIMARY_BUCKET_COUNT))
+        .isEqualTo(7);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.BUCKET_COUNT)).isEqualTo(15);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.TOTAL_BUCKET_SIZE))
+        .isEqualTo(1025);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_EVICTIONS)).isEqualTo(20L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_DESTROYS)).isEqualTo(10L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.DISK_SPACE)).isEqualTo(2050L);
+  }
+
+  @Test
+  public void removeLRUStatisticsShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener listener =
+        
mock(AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener.class);
+    aggregateRegionStatsMonitor.getListeners().put(statistics, listener);
+    aggregateRegionStatsMonitor.getMonitors().put(statistics, regionMonitor);
+
+    aggregateRegionStatsMonitor.removeLRUStatistics(statistics);
+    assertThat(aggregateRegionStatsMonitor.getListeners()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getMonitors()).isEmpty();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+  }
+
+  @Test
+  public void removeDirectoryStatisticsShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener listener =
+        
mock(AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener.class);
+    aggregateRegionStatsMonitor.getListeners().put(statistics, listener);
+    aggregateRegionStatsMonitor.getMonitors().put(statistics, regionMonitor);
+
+    aggregateRegionStatsMonitor.removeDirectoryStatistics(statistics);
+    assertThat(aggregateRegionStatsMonitor.getListeners()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getMonitors()).isEmpty();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+  }
+
+  @Test
+  public void 
removePartitionStatisticsShouldDecreaseStatsAndRemoveBothListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener listener =
+        
mock(AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener.class);
+    aggregateRegionStatsMonitor.getListeners().put(statistics, listener);
+    aggregateRegionStatsMonitor.getMonitors().put(statistics, regionMonitor);
+
+    aggregateRegionStatsMonitor.removePartitionStatistics(statistics);
+    assertThat(aggregateRegionStatsMonitor.getListeners()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getMonitors()).isEmpty();
+    verify(listener, times(1)).decreaseParStats();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+  }
+
+  @Test
+  public void getStatisticShouldReturnZeroForUnknownStatistics() {
+    
assertThat(aggregateRegionStatsMonitor.getStatistic("unhandledStatistic")).isEqualTo(0);
+  }
+
+  @Test
+  public void getStatisticShouldReturnTheRecordedValueForHandledStatistics() {
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.PRIMARY_BUCKET_COUNT, 
5);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.BUCKET_COUNT, 13);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.TOTAL_BUCKET_SIZE, 
1024);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_EVICTIONS, 12);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.LRU_DESTROYS, 5);
+    aggregateRegionStatsMonitor.increaseStats(StatsKey.DISK_SPACE, 2048);
+
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.PRIMARY_BUCKET_COUNT))
+        .isEqualTo(5);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.BUCKET_COUNT)).isEqualTo(13);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.TOTAL_BUCKET_SIZE))
+        .isEqualTo(1024);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_EVICTIONS)).isEqualTo(12L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.LRU_DESTROYS)).isEqualTo(5L);
+    
assertThat(aggregateRegionStatsMonitor.getStatistic(StatsKey.DISK_SPACE)).isEqualTo(2048L);
+  }
+
+  @Test
+  public void addStatisticsToMonitorShouldAddListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    aggregateRegionStatsMonitor.addStatisticsToMonitor(statistics);
+
+    assertThat(aggregateRegionStatsMonitor.getMonitors().size()).isEqualTo(1);
+    assertThat(aggregateRegionStatsMonitor.getListeners().size()).isEqualTo(1);
+  }
+
+  @Test
+  public void stopListenerShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener listener =
+        
mock(AggregateRegionStatsMonitor.MemberLevelRegionStatisticsListener.class);
+    aggregateRegionStatsMonitor.getListeners().put(statistics, listener);
+    aggregateRegionStatsMonitor.getMonitors().put(statistics, regionMonitor);
+
+    aggregateRegionStatsMonitor.stopListener();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+    assertThat(aggregateRegionStatsMonitor.getMonitors()).isEmpty();
+    assertThat(aggregateRegionStatsMonitor.getListeners()).isEmpty();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitorTest.java
new file mode 100644
index 0000000..a6dac69
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GCStatsMonitorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.geode.test.junit.categories.StatisticsTest;
+
+@Category(StatisticsTest.class)
+public class GCStatsMonitorTest {
+  @Rule
+  public TestName testName = new TestName();
+
+  private GCStatsMonitor gcStatsMonitor;
+
+  @Before
+  public void setUp() {
+    gcStatsMonitor = new GCStatsMonitor(testName.getMethodName());
+
+    assertThat(gcStatsMonitor).isNotNull();
+    assertThat(gcStatsMonitor.getCollections()).isEqualTo(0);
+    assertThat(gcStatsMonitor.getCollectionTime()).isEqualTo(0);
+  }
+
+  @Test
+  public void getStatisticShouldReturnZeroForUnknownStatistics() {
+    assertThat(gcStatsMonitor.getStatistic("unknownStatistic")).isEqualTo(0);
+  }
+
+  @Test
+  public void getStatisticShouldReturnTheRecordedValueForHandledStatistics() {
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTIONS, 10);
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTION_TIME, 10000);
+
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTIONS)).isEqualTo(10L);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTION_TIME)).isEqualTo(10000L);
+  }
+
+  @Test
+  public void increaseStatsShouldIncrementStatisticsUsingTheSelectedValue() {
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTIONS, 10);
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTION_TIME, 10000);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTIONS)).isEqualTo(10L);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTION_TIME)).isEqualTo(10000L);
+
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTIONS, 15);
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTION_TIME, 20000);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTIONS)).isEqualTo(25L);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTION_TIME)).isEqualTo(30000L);
+  }
+
+  @Test
+  public void 
decreasePrevValuesShouldDecrementStatisticsUsingTheSelectedValue() {
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTIONS, 10);
+    gcStatsMonitor.increaseStats(StatsKey.VM_GC_STATS_COLLECTION_TIME, 10000);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTIONS)).isEqualTo(10L);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTION_TIME)).isEqualTo(10000L);
+    MBeanStatsMonitor.DefaultHashMap statsMap = new 
MBeanStatsMonitor.DefaultHashMap();
+    statsMap.put(StatsKey.VM_GC_STATS_COLLECTIONS, 5);
+    statsMap.put(StatsKey.VM_GC_STATS_COLLECTION_TIME, 5000);
+
+    gcStatsMonitor.decreasePrevValues(statsMap);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTIONS)).isEqualTo(5L);
+    
assertThat(gcStatsMonitor.getStatistic(StatsKey.VM_GC_STATS_COLLECTION_TIME)).isEqualTo(5000L);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
new file mode 100644
index 0000000..fe9b116
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.geode.Statistics;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.io.MainWithChildrenRollingFileHandler;
+import org.apache.geode.internal.statistics.SampleCollector;
+import org.apache.geode.internal.statistics.StatArchiveHandlerConfig;
+import org.apache.geode.internal.statistics.StatisticsSampler;
+import org.apache.geode.internal.statistics.TestStatisticsManager;
+import org.apache.geode.internal.statistics.TestStatisticsSampler;
+import org.apache.geode.internal.statistics.ValueMonitor;
+import org.apache.geode.test.junit.categories.StatisticsTest;
+
+@Category(StatisticsTest.class)
+public class GatewaySenderOverflowMonitorTest {
+  @Rule
+  public TestName testName = new TestName();
+
+  private GatewaySenderOverflowMonitor gatewaySenderOverflowMonitor;
+
+  @Before
+  public void setUp() {
+    final long startTime = System.currentTimeMillis();
+    TestStatisticsManager manager =
+        new TestStatisticsManager(1, getClass().getSimpleName(), startTime);
+    StatArchiveHandlerConfig mockStatArchiveHandlerConfig = 
mock(StatArchiveHandlerConfig.class,
+        getClass().getSimpleName() + "$" + 
StatArchiveHandlerConfig.class.getSimpleName());
+    when(mockStatArchiveHandlerConfig.getArchiveFileName()).thenReturn(new 
File(""));
+    
when(mockStatArchiveHandlerConfig.getArchiveFileSizeLimit()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getArchiveDiskSpaceLimit()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemId()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getSystemStartTime()).thenReturn(startTime);
+    when(mockStatArchiveHandlerConfig.getSystemDirectoryPath()).thenReturn("");
+    when(mockStatArchiveHandlerConfig.getProductDescription())
+        .thenReturn(getClass().getSimpleName());
+
+    StatisticsSampler sampler = new TestStatisticsSampler(manager);
+    SampleCollector sampleCollector = new SampleCollector(sampler);
+    sampleCollector.initialize(mockStatArchiveHandlerConfig, 
NanoTimer.getTime(),
+        new MainWithChildrenRollingFileHandler());
+    gatewaySenderOverflowMonitor =
+        spy(new GatewaySenderOverflowMonitor(this.testName.getMethodName()));
+
+    assertThat(gatewaySenderOverflowMonitor).isNotNull();
+    assertThat(gatewaySenderOverflowMonitor.getMonitors()).isEmpty();
+    assertThat(gatewaySenderOverflowMonitor.getListeners()).isEmpty();
+    assertThat(gatewaySenderOverflowMonitor.getLruEvictions()).isEqualTo(0);
+    
assertThat(gatewaySenderOverflowMonitor.getBytesOverflowedToDisk()).isEqualTo(0);
+    
assertThat(gatewaySenderOverflowMonitor.getEntriesOverflowedToDisk()).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldReturnZeroForUnknownStatistics() {
+    assertThat(gatewaySenderOverflowMonitor.computeDelta(new 
MBeanStatsMonitor.DefaultHashMap(),
+        "unknownStatistic", 6)).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldOperateForHandledStatistics() {
+    MBeanStatsMonitor.DefaultHashMap statsMap = new 
MBeanStatsMonitor.DefaultHashMap();
+    statsMap.put(StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 50);
+    statsMap.put(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2048);
+    statsMap.put(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 100);
+
+    assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
+        StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 60)).isEqualTo(10L);
+    assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
+        StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2100)).isEqualTo(52L);
+    assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
+        StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 
150)).isEqualTo(50L);
+  }
+
+  @Test
+  public void increaseStatsShouldIncrementStatisticsUsingTheSelectedValue() {
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_LRU_EVICTIONS,
 5L);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK,
+        1024L);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
+        10000L);
+    
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
+        .isEqualTo(5L);
+    assertThat(
+        
gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK))
+            .isEqualTo(1024L);
+    assertThat(gatewaySenderOverflowMonitor
+        
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L);
+
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_LRU_EVICTIONS,
 5L);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK,
+        1024L);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
+        10000L);
+    
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
+        .isEqualTo(10L);
+    assertThat(
+        
gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK))
+            .isEqualTo(2048L);
+    assertThat(gatewaySenderOverflowMonitor
+        
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(20000L);
+  }
+
+  @Test
+  public void getStatisticShouldReturnZeroForUnknownStatistics() {
+    
assertThat(gatewaySenderOverflowMonitor.getStatistic("unhandledStatistic")).isEqualTo(0);
+  }
+
+  @Test
+  public void getStatisticShouldReturnTheRecordedValueForHandledStatistics() {
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_LRU_EVICTIONS,
 5);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK,
+        2048);
+    
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
+        10000);
+
+    
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
+        .isEqualTo(5L);
+    assertThat(
+        
gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK))
+            .isEqualTo(2048L);
+    assertThat(gatewaySenderOverflowMonitor
+        
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L);
+  }
+
+  @Test
+  public void addStatisticsToMonitorShouldAddListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    gatewaySenderOverflowMonitor.addStatisticsToMonitor(statistics);
+
+    assertThat(gatewaySenderOverflowMonitor.getMonitors().size()).isEqualTo(1);
+    
assertThat(gatewaySenderOverflowMonitor.getListeners().size()).isEqualTo(1);
+  }
+
+  @Test
+  public void stopListenerShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    GatewaySenderOverflowMonitor.GatewaySenderOverflowStatisticsListener 
listener =
+        
mock(GatewaySenderOverflowMonitor.GatewaySenderOverflowStatisticsListener.class);
+    gatewaySenderOverflowMonitor.getListeners().put(statistics, listener);
+    gatewaySenderOverflowMonitor.getMonitors().put(statistics, regionMonitor);
+
+    gatewaySenderOverflowMonitor.stopListener();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+    assertThat(gatewaySenderOverflowMonitor.getMonitors()).isEmpty();
+    assertThat(gatewaySenderOverflowMonitor.getListeners()).isEmpty();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitorTest.java
new file mode 100644
index 0000000..e319d3a
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/MemberLevelDiskMonitorTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.geode.Statistics;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.io.MainWithChildrenRollingFileHandler;
+import org.apache.geode.internal.statistics.SampleCollector;
+import org.apache.geode.internal.statistics.StatArchiveHandlerConfig;
+import org.apache.geode.internal.statistics.StatisticsSampler;
+import org.apache.geode.internal.statistics.TestStatisticsManager;
+import org.apache.geode.internal.statistics.TestStatisticsSampler;
+import org.apache.geode.internal.statistics.ValueMonitor;
+import org.apache.geode.test.junit.categories.StatisticsTest;
+
+@Category(StatisticsTest.class)
+public class MemberLevelDiskMonitorTest {
+  @Rule
+  public TestName testName = new TestName();
+
+  private MemberLevelDiskMonitor memberLevelDiskMonitor;
+
+  @Before
+  public void setUp() {
+    final long startTime = System.currentTimeMillis();
+    TestStatisticsManager manager =
+        new TestStatisticsManager(1, getClass().getSimpleName(), startTime);
+    StatArchiveHandlerConfig mockStatArchiveHandlerConfig = 
mock(StatArchiveHandlerConfig.class,
+        getClass().getSimpleName() + "$" + 
StatArchiveHandlerConfig.class.getSimpleName());
+    when(mockStatArchiveHandlerConfig.getArchiveFileName()).thenReturn(new 
File(""));
+    
when(mockStatArchiveHandlerConfig.getArchiveFileSizeLimit()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getArchiveDiskSpaceLimit()).thenReturn(0L);
+    when(mockStatArchiveHandlerConfig.getSystemId()).thenReturn(0L);
+    
when(mockStatArchiveHandlerConfig.getSystemStartTime()).thenReturn(startTime);
+    when(mockStatArchiveHandlerConfig.getSystemDirectoryPath()).thenReturn("");
+    when(mockStatArchiveHandlerConfig.getProductDescription())
+        .thenReturn(getClass().getSimpleName());
+
+    StatisticsSampler sampler = new TestStatisticsSampler(manager);
+    SampleCollector sampleCollector = new SampleCollector(sampler);
+    sampleCollector.initialize(mockStatArchiveHandlerConfig, 
NanoTimer.getTime(),
+        new MainWithChildrenRollingFileHandler());
+    memberLevelDiskMonitor = spy(new 
MemberLevelDiskMonitor(this.testName.getMethodName()));
+
+    assertThat(memberLevelDiskMonitor).isNotNull();
+    assertThat(memberLevelDiskMonitor.getMonitors()).isEmpty();
+    assertThat(memberLevelDiskMonitor.getListeners()).isEmpty();
+    assertThat(memberLevelDiskMonitor.getFlushes()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getQueueSize()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getFlushTime()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getFlushedBytes()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getDiskReadBytes()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getBackupsCompleted()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getDiskWrittenBytes()).isEqualTo(0);
+    assertThat(memberLevelDiskMonitor.getBackupsInProgress()).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldReturnZeroForUnknownStatistics() {
+    assertThat(memberLevelDiskMonitor.computeDelta(new 
MBeanStatsMonitor.DefaultHashMap(),
+        "unknownStatistic", 6)).isEqualTo(0);
+  }
+
+  @Test
+  public void computeDeltaShouldOperateForHandledStatistics() {
+    MBeanStatsMonitor.DefaultHashMap statsMap = new 
MBeanStatsMonitor.DefaultHashMap();
+    statsMap.put(StatsKey.NUM_FLUSHES, 10);
+    statsMap.put(StatsKey.DISK_QUEUE_SIZE, 148);
+    statsMap.put(StatsKey.TOTAL_FLUSH_TIME, 10000);
+    statsMap.put(StatsKey.FLUSHED_BYTES, 2048);
+    statsMap.put(StatsKey.DISK_READ_BYTES, 1024);
+    statsMap.put(StatsKey.DISK_RECOVERED_BYTES, 512);
+    statsMap.put(StatsKey.BACKUPS_COMPLETED, 5);
+    statsMap.put(StatsKey.DISK_WRITEN_BYTES, 8192);
+    statsMap.put(StatsKey.BACKUPS_IN_PROGRESS, 2);
+
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.NUM_FLUSHES, 16))
+        .isEqualTo(6L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.DISK_QUEUE_SIZE, 150))
+        .isEqualTo(2);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.TOTAL_FLUSH_TIME, 10000))
+        .isEqualTo(0L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.FLUSHED_BYTES, 3000))
+        .isEqualTo(952L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.DISK_READ_BYTES, 2048))
+        .isEqualTo(1024L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.DISK_RECOVERED_BYTES, 1024))
+        .isEqualTo(512L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.BACKUPS_COMPLETED, 6))
+        .isEqualTo(1);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.DISK_WRITEN_BYTES, 8193))
+        .isEqualTo(1L);
+    assertThat(memberLevelDiskMonitor.computeDelta(statsMap, 
StatsKey.BACKUPS_IN_PROGRESS, 1))
+        .isEqualTo(-1);
+  }
+
+  @Test
+  public void increaseStatsShouldIncrementStatisticsUsingTheSelectedValue() {
+    memberLevelDiskMonitor.increaseStats(StatsKey.NUM_FLUSHES, 5);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_QUEUE_SIZE, 13);
+    memberLevelDiskMonitor.increaseStats(StatsKey.TOTAL_FLUSH_TIME, 1024);
+    memberLevelDiskMonitor.increaseStats(StatsKey.FLUSHED_BYTES, 12);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_READ_BYTES, 5);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_RECOVERED_BYTES, 2048);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_COMPLETED, 20);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_WRITEN_BYTES, 51);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_IN_PROGRESS, 60);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.NUM_FLUSHES)).isEqualTo(5L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_QUEUE_SIZE)).isEqualTo(13);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.TOTAL_FLUSH_TIME)).isEqualTo(1024L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.FLUSHED_BYTES)).isEqualTo(12L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_READ_BYTES)).isEqualTo(2053L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_COMPLETED)).isEqualTo(20);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_WRITEN_BYTES)).isEqualTo(51L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_IN_PROGRESS)).isEqualTo(60);
+
+    memberLevelDiskMonitor.increaseStats(StatsKey.NUM_FLUSHES, 2);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_QUEUE_SIZE, 2);
+    memberLevelDiskMonitor.increaseStats(StatsKey.TOTAL_FLUSH_TIME, 1);
+    memberLevelDiskMonitor.increaseStats(StatsKey.FLUSHED_BYTES, 8);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_READ_BYTES, 5);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_RECOVERED_BYTES, 2);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_COMPLETED, 1);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_WRITEN_BYTES, 11);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_IN_PROGRESS, 6);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.NUM_FLUSHES)).isEqualTo(7L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_QUEUE_SIZE)).isEqualTo(15);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.TOTAL_FLUSH_TIME)).isEqualTo(1025L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.FLUSHED_BYTES)).isEqualTo(20L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_READ_BYTES)).isEqualTo(2060L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_COMPLETED)).isEqualTo(21);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_WRITEN_BYTES)).isEqualTo(62L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_IN_PROGRESS)).isEqualTo(66);
+  }
+
+  @Test
+  public void addStatisticsToMonitorShouldAddListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    memberLevelDiskMonitor.addStatisticsToMonitor(statistics);
+
+    assertThat(memberLevelDiskMonitor.getMonitors().size()).isEqualTo(1);
+    assertThat(memberLevelDiskMonitor.getListeners().size()).isEqualTo(1);
+  }
+
+  @Test
+  public void stopListenerShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    MemberLevelDiskMonitor.MemberLevelDiskStatisticsListener listener =
+        mock(MemberLevelDiskMonitor.MemberLevelDiskStatisticsListener.class);
+    memberLevelDiskMonitor.getListeners().put(statistics, listener);
+    memberLevelDiskMonitor.getMonitors().put(statistics, regionMonitor);
+
+    memberLevelDiskMonitor.stopListener();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+    assertThat(memberLevelDiskMonitor.getMonitors()).isEmpty();
+    assertThat(memberLevelDiskMonitor.getListeners()).isEmpty();
+  }
+
+  @Test
+  public void getStatisticShouldReturnZeroForUnknownStatistics() {
+    
assertThat(memberLevelDiskMonitor.getStatistic("unhandledStatistic")).isEqualTo(0);
+  }
+
+  @Test
+  public void getStatisticShouldReturnTheRecordedValueForHandledStatistics() {
+    memberLevelDiskMonitor.increaseStats(StatsKey.NUM_FLUSHES, 5);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_QUEUE_SIZE, 13);
+    memberLevelDiskMonitor.increaseStats(StatsKey.TOTAL_FLUSH_TIME, 1024);
+    memberLevelDiskMonitor.increaseStats(StatsKey.FLUSHED_BYTES, 12);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_READ_BYTES, 5);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_RECOVERED_BYTES, 2048);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_COMPLETED, 20);
+    memberLevelDiskMonitor.increaseStats(StatsKey.DISK_WRITEN_BYTES, 51);
+    memberLevelDiskMonitor.increaseStats(StatsKey.BACKUPS_IN_PROGRESS, 60);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.NUM_FLUSHES)).isEqualTo(5L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_QUEUE_SIZE)).isEqualTo(13);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.TOTAL_FLUSH_TIME)).isEqualTo(1024L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.FLUSHED_BYTES)).isEqualTo(12L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_READ_BYTES)).isEqualTo(2053L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_COMPLETED)).isEqualTo(20);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.DISK_WRITEN_BYTES)).isEqualTo(51L);
+    
assertThat(memberLevelDiskMonitor.getStatistic(StatsKey.BACKUPS_IN_PROGRESS)).isEqualTo(60);
+  }
+
+  @Test
+  public void removeStatisticsFromMonitorShouldRemoveListenerAndMonitor() {
+    Statistics statistics = mock(Statistics.class);
+    ValueMonitor regionMonitor = mock(ValueMonitor.class);
+    MemberLevelDiskMonitor.MemberLevelDiskStatisticsListener listener =
+        mock(MemberLevelDiskMonitor.MemberLevelDiskStatisticsListener.class);
+    memberLevelDiskMonitor.getListeners().put(statistics, listener);
+    memberLevelDiskMonitor.getMonitors().put(statistics, regionMonitor);
+
+    memberLevelDiskMonitor.removeStatisticsFromMonitor(statistics);
+    assertThat(memberLevelDiskMonitor.getListeners()).isEmpty();
+    assertThat(memberLevelDiskMonitor.getMonitors()).isEmpty();
+    verify(listener, times(1)).decreaseDiskStoreStats();
+    verify(regionMonitor, times(1)).removeListener(listener);
+    verify(regionMonitor, times(1)).removeStatistics(statistics);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitorTest.java
new file mode 100644
index 0000000..5180971
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/VMStatsMonitorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans.stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.within;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.test.junit.categories.StatisticsTest;
+
+@Category(StatisticsTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest(MBeanJMXAdapter.class)
+public class VMStatsMonitorTest {
+  @Rule
+  public TestName testName = new TestName();
+
+  private VMStatsMonitor vmStatsMonitor;
+
+  @Before
+  public void setUp() {
+    PowerMockito.mockStatic(MBeanJMXAdapter.class);
+  }
+
+  @Test
+  public void 
statisticInitialValueShouldBeZeroWhenTheProcessCpuTimeJmxAttributeIsAvailable() 
{
+    when(MBeanJMXAdapter.isAttributeAvailable(anyString(), 
anyString())).thenReturn(true);
+    vmStatsMonitor = new VMStatsMonitor(testName.getMethodName());
+    assertThat(vmStatsMonitor).isNotNull();
+    assertThat(vmStatsMonitor.getCpuUsage()).isEqualTo(0);
+  }
+
+  @Test
+  public void 
statisticInitialValueShouldBeUndefinedWhenTheProcessCpuTimeJmxAttributeIsNotAvailable()
 {
+    when(MBeanJMXAdapter.isAttributeAvailable(anyString(), 
anyString())).thenReturn(false);
+    vmStatsMonitor = new VMStatsMonitor(testName.getMethodName());
+    assertThat(vmStatsMonitor).isNotNull();
+    
assertThat(vmStatsMonitor.getCpuUsage()).isEqualTo(VMStatsMonitor.VALUE_NOT_AVAILABLE);
+  }
+
+  @Test
+  public void calculateCpuUsageShouldCorrectlyCalculateTheCpuUsed() {
+    Instant now = Instant.now();
+    long halfSecondAsNanoseconds = 500000000L;
+    long quarterSecondAsNanoseconds = 250000000L;
+    long threeQuarterSecondAsNanoseconds = 750000000L;
+    vmStatsMonitor = spy(new VMStatsMonitor(testName.getMethodName()));
+    when(vmStatsMonitor.getLastSystemTime()).thenReturn(now.toEpochMilli());
+
+    // 50% used
+    when(vmStatsMonitor.getLastProcessCpuTime()).thenReturn(0L);
+    float initialCpuUsage = vmStatsMonitor
+        .calculateCpuUsage(now.plus(1, ChronoUnit.SECONDS).toEpochMilli(), 
halfSecondAsNanoseconds);
+    assertThat(initialCpuUsage).isNotEqualTo(Float.NaN);
+    assertThat(initialCpuUsage).isCloseTo(50F, within(1F));
+
+    // 25% decrease
+    when(vmStatsMonitor.getLastProcessCpuTime()).thenReturn(50L);
+    float decreasedCpuUsage = vmStatsMonitor.calculateCpuUsage(
+        now.plus(1, ChronoUnit.SECONDS).toEpochMilli(), 
quarterSecondAsNanoseconds);
+    assertThat(decreasedCpuUsage).isNotEqualTo(Float.NaN);
+    assertThat(decreasedCpuUsage).isLessThan(initialCpuUsage);
+    assertThat(decreasedCpuUsage).isCloseTo(25F, within(1F));
+
+    // 50% increase
+    when(vmStatsMonitor.getLastProcessCpuTime()).thenReturn(25L);
+    float increasedCpuUsage = vmStatsMonitor.calculateCpuUsage(
+        now.plus(1, ChronoUnit.SECONDS).toEpochMilli(), 
threeQuarterSecondAsNanoseconds);
+    assertThat(increasedCpuUsage).isNotEqualTo(Float.NaN);
+    assertThat(increasedCpuUsage).isGreaterThan(decreasedCpuUsage);
+    assertThat(increasedCpuUsage).isCloseTo(75F, within(1F));
+  }
+
+  @Test
+  public void refreshStatsShouldUpdateCpuUsage() {
+    ZonedDateTime now = ZonedDateTime.now();
+    when(MBeanJMXAdapter.isAttributeAvailable(anyString(), 
anyString())).thenReturn(true);
+    vmStatsMonitor = spy(new VMStatsMonitor(testName.getMethodName()));
+    assertThat(vmStatsMonitor).isNotNull();
+    assertThat(vmStatsMonitor.getCpuUsage()).isEqualTo(0);
+    Number processCpuTime = spy(Number.class);
+    vmStatsMonitor.statsMap.put(StatsKey.VM_PROCESS_CPU_TIME, processCpuTime);
+
+    // First Run: updates lastSystemTime
+    
when(vmStatsMonitor.currentTimeMillis()).thenReturn(now.toInstant().toEpochMilli());
+    vmStatsMonitor.refreshStats();
+    assertThat(vmStatsMonitor.getCpuUsage()).isEqualTo(0);
+    verify(processCpuTime, times(0)).longValue();
+
+    // Second Run: updates lastProcessCpuTime
+    when(processCpuTime.longValue()).thenReturn(500L);
+    vmStatsMonitor.refreshStats();
+    assertThat(vmStatsMonitor.getCpuUsage()).isEqualTo(0);
+    verify(processCpuTime, times(1)).longValue();
+
+    // Next runs will update the actual cpuUsage
+    for (int i = 2; i < 6; i++) {
+      long mockProcessCpuTime = i * 500;
+      long mockSystemTime = now.plus(i, 
ChronoUnit.SECONDS).toInstant().toEpochMilli();
+      when(processCpuTime.longValue()).thenReturn(mockProcessCpuTime);
+      when(vmStatsMonitor.currentTimeMillis()).thenReturn(mockSystemTime);
+
+      vmStatsMonitor.refreshStats();
+      verify(vmStatsMonitor, times(1)).calculateCpuUsage(mockSystemTime, 
mockProcessCpuTime);
+      assertThat(vmStatsMonitor.getCpuUsage()).isNotEqualTo(Float.NaN);
+      assertThat(vmStatsMonitor.getCpuUsage()).isLessThan(1F);
+    }
+  }
+}

Reply via email to