This is an automated email from the ASF dual-hosted git repository.

lgallinat pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 583d141  GEODE-5562 Reading the values of 
LocalRegion.memoryThresholdReached and 
DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
583d141 is described below

commit 583d1416644a8392ed1bf257b87025a9071aaa55
Author: Lynn Gallinat <[email protected]>
AuthorDate: Fri Aug 24 08:58:27 2018 -0700

    GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and 
DistributedRegion.memoryThresholdReachedMembers must be atomic (#2320)
    
    * GEODE-5562 Reading the values of LocalRegion.memoryThresholdReached and
               DistributedRegion.memoryThresholdReachedMembers must be atomic.
---
 .../management/MemoryThresholdsDUnitTest.java      |   2 +-
 .../MemoryThresholdsOffHeapDUnitTest.java          |  18 +-
 .../cache/query/internal/DefaultQueryService.java  |  17 +-
 .../internal/cache/CacheDistributionAdvisor.java   |   2 +-
 .../geode/internal/cache/DistributedRegion.java    |  41 +--
 .../geode/internal/cache/InternalRegion.java       |   2 +
 .../apache/geode/internal/cache/LocalRegion.java   |  63 ++--
 .../geode/internal/cache/MemoryThresholdInfo.java  |  53 ++++
 .../geode/internal/cache/PartitionedRegion.java    |  29 +-
 .../internal/cache/control/HeapMemoryMonitor.java  |  70 ++++-
 .../internal/cache/control/ResourceAdvisor.java    |   2 +-
 .../execute/DistributedRegionFunctionExecutor.java |  16 +-
 .../cache/execute/MemberFunctionExecutor.java      |  19 +-
 .../cache/execute/MultiRegionFunctionExecutor.java |  32 +-
 .../execute/PartitionedRegionFunctionExecutor.java |  17 +-
 .../partitioned/PartitionedRegionRebalanceOp.java  |   2 +-
 .../internal/cache/partitioned/RegionAdvisor.java  |   4 +-
 .../tier/sockets/command/ExecuteFunction.java      |  19 +-
 .../tier/sockets/command/ExecuteFunction65.java    |  20 +-
 .../tier/sockets/command/ExecuteFunction66.java    |  20 +-
 .../internal/cache/DistributedRegionJUnitTest.java |  32 ++
 .../internal/cache/MemoryThresholdInfoTest.java    |  40 +++
 .../cache/control/HeapMemoryMonitorTest.java       | 325 +++++++++++++++++++++
 .../sockets/command/ExecuteFunction66Test.java     |   4 +
 24 files changed, 638 insertions(+), 211 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
index d8cfec4..79c522e 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsDUnitTest.java
@@ -303,7 +303,7 @@ public class MemoryThresholdsDUnitTest extends 
ClientServerTestCase {
 
           public boolean done() {
             DistributedRegion dr = (DistributedRegion) 
getRootRegion().getSubregion(regionName);
-            return dr.getMemoryThresholdReachedMembers().size() == 0;
+            return 
dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
           }
         };
         Wait.waitForCriterion(wc, 30000, 10, true);
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
index 330b974..907456c 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/management/MemoryThresholdsOffHeapDUnitTest.java
@@ -494,7 +494,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
 
           public boolean done() {
             DistributedRegion dr = (DistributedRegion) 
getRootRegion().getSubregion(regionName);
-            return dr.getMemoryThresholdReachedMembers().size() == 0;
+            return 
dr.getAtomicThresholdInfo().getMembersThatReachedThreshold().size() == 0;
           }
         };
         Wait.waitForCriterion(wc, 10000, 10, true);
@@ -602,7 +602,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
               }
 
               public boolean done() {
-                return r.memoryThresholdReached.get();
+                return r.isMemoryThresholdReached();
               }
             };
             Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -618,7 +618,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
               }
 
               public boolean done() {
-                return !r.memoryThresholdReached.get();
+                return !r.isMemoryThresholdReached();
               }
             };
             Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -670,7 +670,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
           }
 
           public boolean done() {
-            return r.memoryThresholdReached.get();
+            return r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -686,7 +686,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
           }
 
           public boolean done() {
-            return !r.memoryThresholdReached.get();
+            return !r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1235,7 +1235,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
           }
 
           public boolean done() {
-            return r.memoryThresholdReached.get();
+            return r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1258,7 +1258,7 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
           }
 
           public boolean done() {
-            return !r.memoryThresholdReached.get();
+            return !r.isMemoryThresholdReached();
           }
         };
         Wait.waitForCriterion(wc, 30 * 1000, 10, true);
@@ -1514,12 +1514,12 @@ public class MemoryThresholdsOffHeapDUnitTest extends 
ClientServerTestCase {
             @Override
             public String description() {
               return "Expected to go critical: isCritical=" + 
ohm.getState().isCritical()
-                  + " memoryThresholdReached=" + 
r.memoryThresholdReached.get();
+                  + " memoryThresholdReached=" + r.isMemoryThresholdReached();
             }
 
             @Override
             public boolean done() {
-              return ohm.getState().isCritical() && 
r.memoryThresholdReached.get();
+              return ohm.getState().isCritical() && 
r.isMemoryThresholdReached();
             }
           };
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index 2d15795..e476d45 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -68,7 +68,9 @@ import 
org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -226,12 +228,15 @@ public class DefaultQueryService implements 
InternalQueryService {
     // 
UnsupportedOperationException(LocalizedStrings.DefaultQueryService_INDEX_CREATION_IS_NOT_SUPPORTED_FOR_REGIONS_WHICH_OVERFLOW_TO_DISK_THE_REGION_INVOLVED_IS_0.toLocalizedString(regionPath));
     // }
     // if its a pr the create index on all of the local buckets.
-    if (((LocalRegion) region).memoryThresholdReached.get()
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      LocalRegion lr = (LocalRegion) region;
-      throw new LowMemoryException(
-          
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX.toLocalizedString(region.getName()),
-          lr.getMemoryThresholdReachedMembers());
+    if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      InternalRegion internalRegion = (InternalRegion) region;
+      MemoryThresholdInfo info = internalRegion.getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        throw new LowMemoryException(
+            LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_INDEX
+                .toLocalizedString(region.getName()),
+            info.getMembersThatReachedThreshold());
+      }
     }
     if (region instanceof PartitionedRegion) {
       try {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 6a726ce..2a74e7e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -1103,7 +1103,7 @@ public class CacheDistributionAdvisor extends 
DistributionAdvisor {
       logger.debug("CDA: removing profile {}", profile);
     }
     if (getAdvisee() instanceof LocalRegion && profile != null) {
-      ((LocalRegion) 
getAdvisee()).removeMemberFromCriticalList(profile.getDistributedMember());
+      ((LocalRegion) 
getAdvisee()).removeCriticalMember(profile.getDistributedMember());
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 9b395fc..d9f5903 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1113,7 +1112,7 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   /**
    * A reference counter to protected the memoryThresholdReached boolean
    */
-  private final Set<DistributedMember> memoryThresholdReachedMembers = new 
CopyOnWriteArraySet<>();
+  private final Set<DistributedMember> memoryThresholdReachedMembers = new 
HashSet<>();
 
   // TODO: cleanup getInitialImageAndRecovery
   private void getInitialImageAndRecovery(InputStream snapshotInputStream,
@@ -3751,42 +3750,35 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
       if (event.getState().isCritical() && 
!event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && 
getOffHeap()))) {
-        setMemoryThresholdReachedCounterTrue(event.getMember());
+        addCriticalMember(event.getMember());
       } else if (!event.getState().isCritical() && 
event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && 
getOffHeap()))) {
-        removeMemberFromCriticalList(event.getMember());
+        removeCriticalMember(event.getMember());
       }
     }
   }
 
   @Override
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     if (logger.isDebugEnabled()) {
       logger.debug("DR: removing member {} from critical member list", member);
     }
     synchronized (this.memoryThresholdReachedMembers) {
       this.memoryThresholdReachedMembers.remove(member);
-      if (this.memoryThresholdReachedMembers.size() == 0) {
-        memoryThresholdReached.set(false);
+      if (this.memoryThresholdReachedMembers.isEmpty()) {
+        setMemoryThresholdReached(false);
       }
     }
   }
 
   @Override
-  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
-    synchronized (this.memoryThresholdReachedMembers) {
-      return Collections.unmodifiableSet(this.memoryThresholdReachedMembers);
-    }
-  }
-
-  @Override
   public void initialCriticalMembers(boolean localMemoryIsCritical,
       Set<InternalDistributedMember> criticalMembers) {
     Set<InternalDistributedMember> others = 
getCacheDistributionAdvisor().adviseGeneric();
     for (InternalDistributedMember idm : criticalMembers) {
       if (others.contains(idm)) {
-        setMemoryThresholdReachedCounterTrue(idm);
+        addCriticalMember(idm);
       }
     }
   }
@@ -3794,12 +3786,23 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   /**
    * @param idm member whose threshold has been exceeded
    */
-  private void setMemoryThresholdReachedCounterTrue(final DistributedMember 
idm) {
+  protected void addCriticalMember(final DistributedMember idm) {
     synchronized (this.memoryThresholdReachedMembers) {
-      this.memoryThresholdReachedMembers.add(idm);
-      if (this.memoryThresholdReachedMembers.size() > 0) {
-        memoryThresholdReached.set(true);
+      if (this.memoryThresholdReachedMembers.isEmpty()) {
+        setMemoryThresholdReached(true);
       }
+      this.memoryThresholdReachedMembers.add(idm);
+    }
+  }
+
+  @Override
+  public MemoryThresholdInfo getAtomicThresholdInfo() {
+    if (!isMemoryThresholdReached()) {
+      return MemoryThresholdInfo.getNotReached();
+    }
+    synchronized (memoryThresholdReachedMembers) {
+      return new MemoryThresholdInfo(isMemoryThresholdReached(),
+          new HashSet<DistributedMember>(memoryThresholdReachedMembers));
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 8b887ef..78a664e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -408,4 +408,6 @@ public interface InternalRegion extends Region, 
HasCachePerfStats, RegionEntryCo
 
   default void handleWANEvent(EntryEventImpl event) {}
 
+  MemoryThresholdInfo getAtomicThresholdInfo();
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 482bf0c..92c2e3c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -488,7 +488,7 @@ public class LocalRegion extends AbstractRegion implements 
LoaderHelperFactory,
    * This boolean is true when a member who has this region is running low on 
memory. It is used to
    * reject region operations.
    */
-  public final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
+  private final AtomicBoolean memoryThresholdReached = new 
AtomicBoolean(false);
 
   /**
    * Lock for updating PR MetaData on client side
@@ -1023,13 +1023,13 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
             if (!newRegion.getOffHeap()) {
               newRegion.initialCriticalMembers(
                   
this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical(),
-                  this.cache.getResourceAdvisor().adviseCritialMembers());
+                  this.cache.getResourceAdvisor().adviseCriticalMembers());
             } else {
               newRegion.initialCriticalMembers(
                   
this.cache.getInternalResourceManager().getHeapMonitor().getState().isCritical()
                       || 
this.cache.getInternalResourceManager().getOffHeapMonitor().getState()
                           .isCritical(),
-                  this.cache.getResourceAdvisor().adviseCritialMembers());
+                  this.cache.getResourceAdvisor().adviseCriticalMembers());
             }
 
             // synchronization would be done on ManagementAdapter.regionOpLock
@@ -2912,7 +2912,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   }
 
   protected boolean isMemoryThresholdReachedForLoad() {
-    return this.memoryThresholdReached.get();
+    return isMemoryThresholdReached();
   }
 
   /**
@@ -5736,14 +5736,13 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @throws LowMemoryException if the target member for this operation is sick
    */
   private void checkIfAboveThreshold(final Object key) throws 
LowMemoryException {
-    if (this.memoryThresholdReached.get()) {
-      Set<DistributedMember> membersThatReachedThreshold = 
getMemoryThresholdReachedMembers();
-
+    MemoryThresholdInfo info = getAtomicThresholdInfo();
+    if (info.isMemoryThresholdReached()) {
+      Set<DistributedMember> membersThatReachedThreshold = 
info.getMembersThatReachedThreshold();
       // #45603: trigger a background eviction since we're above the the 
critical
       // threshold
       
InternalResourceManager.getInternalResourceManager(this.cache).getHeapMonitor()
           .updateStateAndSendEvent();
-
       throw new LowMemoryException(
           
LocalizedStrings.ResourceManager_LOW_MEMORY_IN_0_FOR_PUT_1_MEMBER_2.toLocalizedString(
               getFullPath(), key, membersThatReachedThreshold),
@@ -5751,6 +5750,15 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     }
   }
 
+  @Override
+  public MemoryThresholdInfo getAtomicThresholdInfo() {
+    if (!isMemoryThresholdReached()) {
+      return MemoryThresholdInfo.getNotReached();
+    }
+    return new MemoryThresholdInfo(isMemoryThresholdReached(),
+        Collections.singleton(this.cache.getMyId()));
+  }
+
   /**
    * Allows null as new value to accommodate create with a null value.
    *
@@ -10726,13 +10734,16 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       final Function function, final Object args, final ResultCollector rc, 
final Set filter,
       final ServerToClientFunctionResultSender sender) {
 
-    if (function.optimizeForWrite() && this.memoryThresholdReached.get()
+    if (function.optimizeForWrite()
         && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<DistributedMember> members = getMemoryThresholdReachedMembers();
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), members),
-          members);
+      MemoryThresholdInfo info = getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        Set<DistributedMember> members = info.getMembersThatReachedThreshold();
+        throw new LowMemoryException(
+            
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+                .toLocalizedString(function.getId(), members),
+            members);
+      }
     }
     final LocalResultCollector<?, ?> resultCollector =
         execution.getLocalResultCollector(function, rc);
@@ -10747,13 +10758,6 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     return resultCollector;
   }
 
-  /**
-   * @return the set of members which are known to be critical
-   */
-  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
-    return Collections.singleton(this.cache.getMyId());
-  }
-
   @Override
   public void onEvent(MemoryEvent event) {
     if (logger.isDebugEnabled()) {
@@ -10769,11 +10773,11 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && 
getOffHeap()))) {
         // start rejecting operations
-        this.memoryThresholdReached.set(true);
+        setMemoryThresholdReached(true);
       } else if (!event.getState().isCritical() && 
event.getPreviousState().isCritical()
           && (event.getType() == ResourceType.HEAP_MEMORY
               || (event.getType() == ResourceType.OFFHEAP_MEMORY && 
getOffHeap()))) {
-        this.memoryThresholdReached.set(false);
+        setMemoryThresholdReached(false);
       }
     }
   }
@@ -10836,7 +10840,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * This method is meant to be overridden by DistributedRegion and 
PartitionedRegions to cleanup
    * CRITICAL state
    */
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     // should not be called for LocalRegion
     Assert.assertTrue(false);
   }
@@ -10857,7 +10861,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
       Set<InternalDistributedMember> criticalMembers) {
     assert getScope().isLocal();
     if (localMemoryIsCritical) {
-      this.memoryThresholdReached.set(true);
+      setMemoryThresholdReached(true);
     }
   }
 
@@ -12252,4 +12256,13 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   public Lock getClientMetaDataLock() {
     return clientMetaDataLock;
   }
+
+  public boolean isMemoryThresholdReached() {
+    return memoryThresholdReached.get();
+  }
+
+  protected void setMemoryThresholdReached(boolean reached) {
+    this.memoryThresholdReached.set(reached);
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
new file mode 100644
index 0000000..3735a0d
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/MemoryThresholdInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.geode.distributed.DistributedMember;
+
+public class MemoryThresholdInfo {
+  private final boolean memoryThresholdReached;
+  private final Set<DistributedMember> membersThatReachedThreshold;
+  private static final MemoryThresholdInfo NOT_REACHED = new 
MemoryThresholdInfo(false,
+      Collections.EMPTY_SET);
+
+  MemoryThresholdInfo(boolean memoryThresholdReached,
+      Set<DistributedMember> membersThatReachedThreshold) {
+    this.memoryThresholdReached = memoryThresholdReached;
+    this.membersThatReachedThreshold = membersThatReachedThreshold;
+  }
+
+  public Set<DistributedMember> getMembersThatReachedThreshold() {
+    return membersThatReachedThreshold;
+  }
+
+  public boolean isMemoryThresholdReached() {
+    return memoryThresholdReached;
+  }
+
+  static MemoryThresholdInfo getNotReached() {
+    return NOT_REACHED;
+  }
+
+  @Override
+  public String toString() {
+    return "MemoryThresholdInfo{" +
+        "memoryThresholdReached=" + memoryThresholdReached +
+        ", membersThatReachedThreshold=" + membersThatReachedThreshold +
+        '}';
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 77175a3..86b35dc 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -154,17 +154,13 @@ import 
org.apache.geode.distributed.internal.membership.MemberAttributes;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.NanoTimer;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketAdvisor.ServerBucketProfile;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import 
org.apache.geode.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
 import 
org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
 import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryEvent;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.eviction.HeapEvictor;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
@@ -3617,16 +3613,7 @@ public class PartitionedRegion extends LocalRegion
     InternalDistributedMember targetNode = null;
     if (function.optimizeForWrite()) {
       targetNode = createBucket(bucketId, 0, null /* retryTimeKeeper */);
-      HeapMemoryMonitor hmm =
-          ((InternalResourceManager) 
cache.getResourceManager()).getHeapMonitor();
-      if (hmm.isMemberHeapCritical(targetNode)
-          && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-        Set<DistributedMember> sm = Collections.singleton((DistributedMember) 
targetNode);
-        throw new LowMemoryException(
-            
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                .toLocalizedString(function.getId(), sm),
-            sm);
-      }
+      
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, 
targetNode);
     } else {
       targetNode = getOrCreateNodeForBucketRead(bucketId);
     }
@@ -3796,16 +3783,8 @@ public class PartitionedRegion extends LocalRegion
     }
 
     Set<InternalDistributedMember> dest = memberToBuckets.keySet();
-    if (function.optimizeForWrite()
-        && 
cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = 
cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
-    }
+    
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+        Collections.unmodifiableSet(dest));
 
     boolean isSelf = false;
     execution.setExecutionNodes(dest);
@@ -9295,7 +9274,7 @@ public class PartitionedRegion extends LocalRegion
   }
 
   @Override
-  public void removeMemberFromCriticalList(DistributedMember member) {
+  public void removeCriticalMember(DistributedMember member) {
     if (logger.isDebugEnabled()) {
       logger.debug("PR: removing member {} from critical member list", member);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index f187f2e..5bbc27a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -18,6 +18,8 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryPoolMXBean;
 import java.lang.management.MemoryType;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -36,9 +38,11 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.InternalCache;
 import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
 import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState;
@@ -733,20 +737,54 @@ public class HeapMemoryMonitor implements 
NotificationListener, MemoryMonitor {
     });
   }
 
-  /**
-   * Given a set of members, determine if any member in the set is above 
critical threshold.
-   *
-   * @param members The set of members to check.
-   * @return True if the set contains a member above critical threshold, false 
otherwise
-   */
-  public boolean containsHeapCriticalMembers(final 
Set<InternalDistributedMember> members) {
-    if (members.contains(this.cache.getMyId()) && 
this.mostRecentEvent.getState().isCritical()) {
-      return true;
+  protected Set<DistributedMember> 
getHeapCriticalMembersFrom(Set<DistributedMember> members) {
+    Set<DistributedMember> criticalMembers = getCriticalMembers();
+    criticalMembers.retainAll(members);
+    return criticalMembers;
+  }
+
+  private Set<DistributedMember> getCriticalMembers() {
+    Set<DistributedMember> criticalMembers = new 
HashSet<>(resourceAdvisor.adviseCriticalMembers());
+    if (this.mostRecentEvent.getState().isCritical()) {
+      criticalMembers.add(cache.getMyId());
+    }
+    return criticalMembers;
+  }
+
+  public void checkForLowMemory(Function function, DistributedMember 
targetMember) {
+    Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+    checkForLowMemory(function, targetMembers);
+  }
+
+  public void checkForLowMemory(Function function, Set<DistributedMember> 
dest) {
+    LowMemoryException exception = createLowMemoryIfNeeded(function, dest);
+    if (exception != null) {
+      throw exception;
     }
+  }
+
+  public LowMemoryException createLowMemoryIfNeeded(Function function,
+      DistributedMember targetMember) {
+    Set<DistributedMember> targetMembers = Collections.singleton(targetMember);
+    return createLowMemoryIfNeeded(function, targetMembers);
+  }
 
-    return SetUtils.intersectsWith(members, 
this.resourceAdvisor.adviseCritialMembers());
+  public LowMemoryException createLowMemoryIfNeeded(Function function,
+      Set<DistributedMember> memberSet) {
+    if (function.optimizeForWrite()
+        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      Set<DistributedMember> criticalMembersFrom = 
getHeapCriticalMembersFrom(memberSet);
+      if (!criticalMembersFrom.isEmpty()) {
+        return new LowMemoryException(
+            
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
+                .toLocalizedString(function.getId(), criticalMembersFrom),
+            criticalMembersFrom);
+      }
+    }
+    return null;
   }
 
+
   /**
    * Determines if the given member is in a heap critical state.
    *
@@ -761,6 +799,16 @@ public class HeapMemoryMonitor implements 
NotificationListener, MemoryMonitor {
     return this.resourceAdvisor.isHeapCritical(member);
   }
 
+  protected MemoryEvent getMostRecentEvent() {
+    return mostRecentEvent;
+  }
+
+  protected HeapMemoryMonitor setMostRecentEvent(
+      MemoryEvent mostRecentEvent) {
+    this.mostRecentEvent = mostRecentEvent;
+    return this;
+  }
+
   class LocalHeapStatListener implements LocalStatListener {
     /*
      * (non-Javadoc)
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
index 46e8a82..84da640 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/ResourceAdvisor.java
@@ -425,7 +425,7 @@ public class ResourceAdvisor extends DistributionAdvisor {
    *
    * @return a mutable set of members in the critical state otherwise {@link 
Collections#EMPTY_SET}
    */
-  public Set<InternalDistributedMember> adviseCritialMembers() {
+  public Set<InternalDistributedMember> adviseCriticalMembers() {
     return adviseFilter(new Filter() {
       @Override
       public boolean include(Profile profile) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
index e48d79c..46ac4ea 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
@@ -29,6 +29,8 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.MemoryThresholdInfo;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
@@ -318,15 +320,15 @@ public class DistributedRegionFunctionExecutor extends 
AbstractExecution {
       }
     }
     if (!MemoryThresholds.isLowMemoryExceptionDisabled() && 
function.optimizeForWrite()) {
-      try {
-        region.checkIfAboveThreshold(null);
-      } catch (LowMemoryException ignore) {
-        Set<DistributedMember> htrm = 
region.getMemoryThresholdReachedMembers();
+      MemoryThresholdInfo info = region.getAtomicThresholdInfo();
+      if (info.isMemoryThresholdReached()) {
+        
InternalResourceManager.getInternalResourceManager(region.getCache()).getHeapMonitor()
+            .updateStateAndSendEvent();
+        Set<DistributedMember> criticalMembers = 
info.getMembersThatReachedThreshold();
         throw new LowMemoryException(
             
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                .toLocalizedString(function.getId(), htrm),
-            htrm);
-
+                .toLocalizedString(function.getId(), criticalMembers),
+            criticalMembers);
       }
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index f6f1098..16f74f8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.execute.Execution;
@@ -34,10 +33,8 @@ import 
org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class MemberFunctionExecutor extends AbstractExecution {
@@ -161,7 +158,10 @@ public class MemberFunctionExecutor extends 
AbstractExecution {
   @Override
   public void validateExecution(final Function function, final Set dest) {
     final InternalCache cache = GemFireCacheImpl.getInstance();
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache == null) {
+      return;
+    }
+    if (cache.getTxManager().getTXState() != null) {
       if (dest.size() > 1) {
         throw new TransactionException(
             
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -179,16 +179,7 @@ public class MemberFunctionExecutor extends 
AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && cache != null
-        && 
cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = 
cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(new Object[] {function.getId(), sm}),
-          sm);
-    }
+    
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, 
dest);
   }
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
index 66ccfe8..8be821b 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache.execute;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -21,7 +22,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionException;
@@ -34,13 +34,11 @@ import 
org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class MultiRegionFunctionExecutor extends AbstractExecution {
@@ -207,15 +205,9 @@ public class MultiRegionFunctionExecutor extends 
AbstractExecution {
               .toLocalizedString(function.getId()));
     }
     final InternalCache cache = GemFireCacheImpl.getInstance();
-    if (function.optimizeForWrite() && cache != null
-        && 
cache.getInternalResourceManager().getHeapMonitor().containsHeapCriticalMembers(dest)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = 
cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
+    if (cache != null) {
+      
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function,
+          Collections.unmodifiableSet(dest));
     }
     setExecutionNodes(dest);
 
@@ -367,7 +359,10 @@ public class MultiRegionFunctionExecutor extends 
AbstractExecution {
       cache = (InternalCache) r.getCache();
       break;
     }
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache == null) {
+      return;
+    }
+    if (cache.getTxManager().getTXState() != null) {
       if (targetMembers.size() > 1) {
         throw new TransactionException(
             
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -385,15 +380,6 @@ public class MultiRegionFunctionExecutor extends 
AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && 
cache.getInternalResourceManager().getHeapMonitor()
-        .containsHeapCriticalMembers(targetMembers)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = 
cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(function.getId(), sm),
-          sm);
-    }
+    
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, 
targetMembers);
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
index 6e13ebc..b3ea80d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.execute;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
@@ -26,12 +25,9 @@ import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
-import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SetUtils;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class PartitionedRegionFunctionExecutor extends AbstractExecution {
@@ -329,7 +325,7 @@ public class PartitionedRegionFunctionExecutor extends 
AbstractExecution {
   @Override
   public void validateExecution(Function function, Set targetMembers) {
     InternalCache cache = pr.getGemFireCache();
-    if (cache != null && cache.getTxManager().getTXState() != null) {
+    if (cache.getTxManager().getTXState() != null) {
       if (targetMembers.size() > 1) {
         throw new TransactionException(
             
LocalizedStrings.PartitionedRegion_TX_FUNCTION_ON_MORE_THAN_ONE_NODE
@@ -347,15 +343,6 @@ public class PartitionedRegionFunctionExecutor extends 
AbstractExecution {
         }
       }
     }
-    if (function.optimizeForWrite() && 
cache.getInternalResourceManager().getHeapMonitor()
-        .containsHeapCriticalMembers(targetMembers)
-        && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm = 
cache.getResourceAdvisor().adviseCritialMembers();
-      Set<DistributedMember> sm = SetUtils.intersection(hcm, targetMembers);
-      throw new LowMemoryException(
-          LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-              .toLocalizedString(new Object[] {function.getId(), sm}),
-          sm);
-    }
+    
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(function, 
targetMembers);
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 7135e66..85a2473 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -454,7 +454,7 @@ public class PartitionedRegionRebalanceOp {
     int redundantCopies = leaderRegion.getRedundantCopies();
     int totalNumberOfBuckets = leaderRegion.getTotalNumberOfBuckets();
     Set<InternalDistributedMember> criticalMembers =
-        resourceManager.getResourceAdvisor().adviseCritialMembers();;
+        resourceManager.getResourceAdvisor().adviseCriticalMembers();;
     boolean removeOverRedundancy = true;
 
     debug("Building Model for rebalancing " + leaderRegion + ". 
redundantCopies=" + redundantCopies
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index c6a9c30..6c8aa33 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -474,7 +474,7 @@ public class RegionAdvisor extends CacheDistributionAdvisor 
{
 
     } else {
       ResourceAdvisor advisor = 
getPartitionedRegion().getCache().getResourceAdvisor();
-      boolean sick = advisor.adviseCritialMembers().contains(member);
+      boolean sick = advisor.adviseCriticalMembers().contains(member);
       if (logger.isDebugEnabled()) {
         logger.debug("updateBucketStatus:({}):member:{}:sick:{}",
             getPartitionedRegion().bucketStringForLogs(bucketId), member, 
sick);
@@ -1822,7 +1822,7 @@ public class RegionAdvisor extends 
CacheDistributionAdvisor {
       logger.debug("RA: removing profile {}", profile);
     }
     if (getAdvisee() instanceof PartitionedRegion) {
-      ((PartitionedRegion) 
getAdvisee()).removeMemberFromCriticalList(profile.peerMemberId);
+      ((PartitionedRegion) 
getAdvisee()).removeCriticalMember(profile.peerMemberId);
     }
 
     if (this.buckets != null) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
index 1bb0367..7415d37 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
@@ -15,22 +15,15 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
 import 
org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -159,16 +152,8 @@ public class ExecuteFunction extends BaseCommand {
               + "with context :" + context.toString());
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) 
cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && 
hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = 
Collections.<DistributedMember>singleton(cache.getMyId());
-          throw new LowMemoryException(
-              
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), 
sm}),
-              sm);
-        }
+        
cache.getInternalResourceManager().getHeapMonitor().checkForLowMemory(functionObject,
+            cache.getMyId());
         functionObject.execute(context);
         stats.endFunctionExecution(startExecution, functionObject.hasResult());
       } catch (FunctionException functionException) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
index 23e2f17..2e0fa38 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
@@ -15,22 +15,15 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -190,16 +183,9 @@ public class ExecuteFunction65 extends BaseCommand {
               context);
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) 
cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && 
hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = 
Collections.singleton((DistributedMember) cache.getMyId());
-          Exception e = new LowMemoryException(
-              
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), 
sm}),
-              sm);
-
+        Exception e = cache.getInternalResourceManager().getHeapMonitor()
+            .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+        if (e != null) {
           sendException(hasResult, clientMessage, e.getMessage(), 
serverConnection, e);
           return;
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
index be4749f..6ef3576 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
@@ -15,22 +15,18 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.LowMemoryException;
 import org.apache.geode.cache.client.internal.ConnectionImpl;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
-import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -39,9 +35,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.MemoryThresholds;
 import org.apache.geode.internal.cache.execute.AbstractExecution;
 import org.apache.geode.internal.cache.execute.FunctionContextImpl;
 import org.apache.geode.internal.cache.execute.FunctionStats;
@@ -229,16 +222,9 @@ public class ExecuteFunction66 extends BaseCommand {
               context);
         }
 
-        HeapMemoryMonitor hmm =
-            ((InternalResourceManager) 
cache.getResourceManager()).getHeapMonitor();
-        if (functionObject.optimizeForWrite() && cache != null && 
hmm.getState().isCritical()
-            && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
-          Set<DistributedMember> sm = 
Collections.singleton((DistributedMember) cache.getMyId());
-          Exception e = new LowMemoryException(
-              
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
-                  .toLocalizedString(new Object[] {functionObject.getId(), 
sm}),
-              sm);
-
+        Exception e = cache.getInternalResourceManager().getHeapMonitor()
+            .createLowMemoryIfNeeded(functionObject, cache.getMyId());
+        if (e != null) {
           sendException(hasResult, clientMessage, e.getMessage(), 
serverConnection, e);
           return;
         }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 12f9dfe..2f58510 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.event.BulkOperationHolder;
 import org.apache.geode.internal.cache.event.EventTracker;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
@@ -154,4 +156,34 @@ public class DistributedRegionJUnitTest extends 
AbstractDistributedRegionJUnitTe
     }
   }
 
+  @Test
+  public void testThatMemoryThresholdInfoRelectsStateOfRegion() {
+    InternalDistributedMember internalDM = 
mock(InternalDistributedMember.class);
+    DistributedRegion distRegion = prepare(true, false);
+    distRegion.addCriticalMember(internalDM);
+
+    MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+
+    assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+    
assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+        .containsExactly(internalDM);
+    assertThat(info.isMemoryThresholdReached()).isTrue();
+    
assertThat(info.getMembersThatReachedThreshold()).containsExactly(internalDM);
+  }
+
+  @Test
+  public void testThatMemoryThresholdInfoDoesNotChangeWhenRegionChanges() {
+    InternalDistributedMember internalDM = 
mock(InternalDistributedMember.class);
+    DistributedRegion distRegion = prepare(true, false);
+
+    MemoryThresholdInfo info = distRegion.getAtomicThresholdInfo();
+    distRegion.addCriticalMember(internalDM);
+
+    assertThat(distRegion.isMemoryThresholdReached()).isTrue();
+    
assertThat(distRegion.getAtomicThresholdInfo().getMembersThatReachedThreshold())
+        .containsExactly(internalDM);
+    assertThat(info.isMemoryThresholdReached()).isFalse();
+    assertThat(info.getMembersThatReachedThreshold()).isEmpty();
+  }
+
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
new file mode 100644
index 0000000..5a60710
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/MemoryThresholdInfoTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class MemoryThresholdInfoTest {
+
+  @Test
+  public void getNotReachedReturnsIdenticalResults() {
+    MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+    MemoryThresholdInfo info2 = MemoryThresholdInfo.getNotReached();
+
+    assertThat(info1).isSameAs(info2);
+    assertThat(info1.getMembersThatReachedThreshold())
+        .isSameAs(info2.getMembersThatReachedThreshold());
+  }
+
+  @Test
+  public void getNotReachedReturnsEmptyMembersReached() {
+    MemoryThresholdInfo info1 = MemoryThresholdInfo.getNotReached();
+
+    assertThat(info1.getMembersThatReachedThreshold()).isEmpty();
+  }
+
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
new file mode 100644
index 0000000..f5ddc52
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/control/HeapMemoryMonitorTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({MemoryThresholds.class})
+public class HeapMemoryMonitorTest {
+
+  private HeapMemoryMonitor heapMonitor;
+  private Function function;
+  private Set memberSet;
+  private DistributedMember member;
+  private InternalDistributedMember myself;
+  private ResourceAdvisor resourceAdvisor;
+  private static final String LOW_MEMORY_REGEX =
+      "Function: null cannot be executed because the members.*are running low 
on memory";
+
+  @Before
+  public void setup() {
+    InternalCache internalCache = mock(InternalCache.class);
+    DistributedSystem distributedSystem = mock(DistributedSystem.class);
+    function = mock(Function.class);
+    member = mock(InternalDistributedMember.class);
+    myself = mock(InternalDistributedMember.class);
+    resourceAdvisor = mock(ResourceAdvisor.class);
+
+    when(internalCache.getDistributedSystem()).thenReturn(distributedSystem);
+    when(internalCache.getDistributionAdvisor()).thenReturn(resourceAdvisor);
+    when(internalCache.getMyId()).thenReturn(myself);
+
+    heapMonitor = new HeapMemoryMonitor(null, internalCache, null);
+    memberSet = new HashSet<>();
+    memberSet.add(member);
+    heapMonitor.setMostRecentEvent(new 
MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+        MemoryThresholds.MemoryState.DISABLED, 
MemoryThresholds.MemoryState.DISABLED, null, 0L,
+        true, null)); // myself is not critical
+  }
+
+  // ========== tests for getHeapCriticalMembersFrom ==========
+  @Test
+  public void 
getHeapCriticalMembersFrom_WithEmptyCriticalMembersReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), 
memberSet);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithEmptyArgReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(memberSet, 
Collections.emptySet());
+  }
+
+  @Test
+  public void getHeapCriticalMembersFromWithEmptySetsReturnsEmptySet() {
+    getHeapCriticalMembersFrom_returnsEmptySet(Collections.emptySet(), 
Collections.emptySet());
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithDisjointSetsReturnsEmptySet() {
+    Set argSet = new HashSet();
+    argSet.add(mock(InternalDistributedMember.class));
+
+    getHeapCriticalMembersFrom_returnsEmptySet(memberSet, argSet);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_WithEqualSetsReturnsMember() {
+    getHeapCriticalMembersFrom_returnsNonEmptySet(memberSet, 
Collections.unmodifiableSet(memberSet),
+        new HashSet(memberSet));
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_ReturnsMultipleMembers() {
+    DistributedMember member1 = mock(InternalDistributedMember.class);
+    DistributedMember member2 = mock(InternalDistributedMember.class);
+    DistributedMember member3 = mock(InternalDistributedMember.class);
+    DistributedMember member4 = mock(InternalDistributedMember.class);
+    Set advisorSet = new HashSet();
+    advisorSet.add(member1);
+    advisorSet.add(member2);
+    advisorSet.add(member4);
+    Set argSet = new HashSet(memberSet);
+    argSet.add(member1);
+    argSet.add(member3);
+    argSet.add(member4);
+    Set expectedResult = new HashSet();
+    expectedResult.add(member1);
+    expectedResult.add(member4);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet, argSet, 
expectedResult);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_DoesNotReturnMyselfWhenNotCritical() {
+    Set expectedResult = new HashSet(memberSet);
+    Set advisorSet = new HashSet(memberSet);
+    memberSet.add(myself);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+        Collections.unmodifiableSet(memberSet),
+        expectedResult);
+  }
+
+  @Test
+  public void getHeapCriticalMembersFrom_IncludesMyselfWhenCritical() throws 
Exception {
+    Set advisorSet = new HashSet(memberSet);
+    heapMonitor.setMostRecentEvent(new 
MemoryEvent(InternalResourceManager.ResourceType.HEAP_MEMORY,
+        MemoryThresholds.MemoryState.DISABLED, 
MemoryThresholds.MemoryState.CRITICAL, null, 0L,
+        true, null));
+    memberSet.add(myself);
+
+    getHeapCriticalMembersFrom_returnsNonEmptySet(advisorSet,
+        Collections.unmodifiableSet(memberSet),
+        new HashSet(memberSet));
+  }
+
+  // ========== tests for createLowMemoryIfNeeded (with Set argument) 
==========
+  @Test
+  public void 
createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(false, false, memberSet);
+  }
+
+  @Test
+  public void 
createLowMemoryIfNeededWithSetArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(true, true, memberSet);
+  }
+
+  @Test
+  public void 
createLowMemoryIfNeededWithSetArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    createLowMemoryIfNeededWithSetArg_returnsNull(true, false, 
Collections.emptySet());
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithSetArg_ReturnsException() throws 
Exception {
+    setMocking(true, false, memberSet);
+
+    LowMemoryException exception = 
heapMonitor.createLowMemoryIfNeeded(function, memberSet);
+
+    assertLowMemoryException(exception);
+  }
+
+  // ========== tests for createLowMemoryIfNeeded (with DistributedMember 
argument) ==========
+  @Test
+  public void 
createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(false, false, member);
+  }
+
+  @Test
+  public void 
createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(true, true, member);
+  }
+
+  @Test
+  public void 
createLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    createLowMemoryIfNeededWithMemberArg_returnsNull(true, false, member);
+  }
+
+  @Test
+  public void createLowMemoryIfNeededWithMemberArg_ReturnsException() throws 
Exception {
+    setMocking(true, false, memberSet);
+
+    LowMemoryException exception = 
heapMonitor.createLowMemoryIfNeeded(function, member);
+
+    assertLowMemoryException(exception);
+  }
+
+  // ========== tests for checkForLowMemory (with Set argument) ==========
+  @Test
+  public void 
checkForLowMemoryWithSetArg_DoesNotThrowWhenNotOptimizedForWrite() throws 
Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(false, false, memberSet);
+  }
+
+  @Test
+  public void 
checkForLowMemoryWithSetArg_DoesNotThrowWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(true, true, memberSet);
+  }
+
+  @Test
+  public void checkForLowMemoryWithSetArg_DoesNotThrowWhenNoCriticalMembers() 
throws Exception {
+    checkForLowMemoryWithSetArg_doesNotThrow(true, false, 
Collections.emptySet());
+  }
+
+  @Test
+  public void checkForLowMemoryWithSetArg_ThrowsLowMemoryException() throws 
Exception {
+    setMocking(true, false, memberSet);
+
+    assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, 
memberSet))
+        .isExactlyInstanceOf(LowMemoryException.class);
+  }
+
+  // ========== tests for checkForLowMemory (with DistributedMember argument) 
==========
+  @Test
+  public void 
checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNotOptimizedForWrite()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(false, false, member);
+  }
+
+  @Test
+  public void 
checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenLowMemoryExceptionDisabled()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(true, true, member);
+  }
+
+  @Test
+  public void 
checkForLowMemoryIfNeededWithMemberArg_ReturnsNullWhenNoCriticalMembers()
+      throws Exception {
+    checkForLowMemoryWithMemberArg_doesNotThrow(true, false, member);
+  }
+
+  @Test
+  public void checkForLowMemoryWithMemberArg_ReturnsException() throws 
Exception {
+    setMocking(true, false, memberSet);
+
+    assertThatThrownBy(() -> heapMonitor.checkForLowMemory(function, member))
+        
.isExactlyInstanceOf(LowMemoryException.class).hasMessageMatching(LOW_MEMORY_REGEX);
+  }
+
+  // ========== private methods ==========
+  private void getHeapCriticalMembersFrom_returnsEmptySet(Set 
adviseCriticalMembers, Set argSet) {
+    
when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+    Set<DistributedMember> criticalMembers = 
heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+    assertThat(criticalMembers).isEmpty();
+  }
+
+  private void getHeapCriticalMembersFrom_returnsNonEmptySet(Set 
adviseCriticalMembers, Set argSet,
+      Set expectedResult) {
+    
when(resourceAdvisor.adviseCriticalMembers()).thenReturn(adviseCriticalMembers);
+
+    Set<DistributedMember> criticalMembers = 
heapMonitor.getHeapCriticalMembersFrom(argSet);
+
+    assertThat(criticalMembers).containsAll(expectedResult);
+  }
+
+  private void createLowMemoryIfNeededWithSetArg_returnsNull(boolean 
optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception 
{
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+    LowMemoryException exception = 
heapMonitor.createLowMemoryIfNeeded(function, memberSetArg);
+
+    assertThat(exception).isNull();
+  }
+
+  private void createLowMemoryIfNeededWithMemberArg_returnsNull(boolean 
optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) 
throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, 
Collections.emptySet());
+
+    LowMemoryException exception = 
heapMonitor.createLowMemoryIfNeeded(function, memberArg);
+
+    assertThat(exception).isNull();
+  }
+
+  private void checkForLowMemoryWithSetArg_doesNotThrow(boolean 
optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, Set memberSetArg) throws Exception 
{
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, memberSetArg);
+
+    heapMonitor.checkForLowMemory(function, memberSetArg);
+  }
+
+  private void checkForLowMemoryWithMemberArg_doesNotThrow(boolean 
optimizeForWrite,
+      boolean isLowMemoryExceptionDisabled, DistributedMember memberArg) 
throws Exception {
+    setMocking(optimizeForWrite, isLowMemoryExceptionDisabled, 
Collections.emptySet());
+
+    heapMonitor.checkForLowMemory(function, memberArg);
+  }
+
+  private void setMocking(boolean optimizeForWrite, boolean 
isLowMemoryExceptionDisabled,
+      Set argSet) throws Exception {
+    when(function.optimizeForWrite()).thenReturn(optimizeForWrite);
+    setIsLowMemoryExceptionDisabled(isLowMemoryExceptionDisabled);
+    when(resourceAdvisor.adviseCriticalMembers()).thenReturn(argSet);
+  }
+
+  private void assertLowMemoryException(LowMemoryException exception) {
+    assertThat(exception).isExactlyInstanceOf(LowMemoryException.class);
+    assertThat(exception.getMessage()).containsPattern(LOW_MEMORY_REGEX);
+  }
+
+  private void setIsLowMemoryExceptionDisabled(boolean 
isLowMemoryExceptionDisabled)
+      throws Exception {
+    PowerMockito.mockStatic(MemoryThresholds.class);
+    PowerMockito.when(MemoryThresholds.class, 
MemoryThresholds.isLowMemoryExceptionDisabled())
+        .thenReturn(isLowMemoryExceptionDisabled);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
index a61e49f..fc234c2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66Test.java
@@ -45,6 +45,7 @@ import 
org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
@@ -124,6 +125,7 @@ public class ExecuteFunction66Test {
     
when(this.cache.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
     
when(this.cache.getDistributedSystem()).thenReturn(mock(InternalDistributedSystem.class));
     
when(this.cache.getResourceManager()).thenReturn(this.internalResourceManager);
+    
when(this.cache.getInternalResourceManager()).thenReturn(this.internalResourceManager);
 
     when(this.callbackArgPart.getObject()).thenReturn(CALLBACK_ARG);
 
@@ -149,6 +151,8 @@ public class ExecuteFunction66Test {
     when(this.serverConnection.getAcceptor()).thenReturn(this.acceptor);
     
when(this.serverConnection.getHandshake()).thenReturn(mock(ServerSideHandshake.class));
 
+    
when(this.internalResourceManager.getHeapMonitor()).thenReturn(mock(HeapMemoryMonitor.class));
+
     PowerMockito.mockStatic(FunctionService.class);
     
PowerMockito.when(FunctionService.getFunction(eq(FUNCTION))).thenReturn(this.functionObject);
   }

Reply via email to