GEODE-1999: Fix offheap memory leak when exception is thrown during 
basicDestroy call to remove GatewaySenderEventImpl from the sender queue

Using try and finally to make sure the offheap reference will be released.
Make similar changes for the parrellel wan queue as well.
Also release offheap memory if a virtualPut failed to put the 
GatewaySenderEvent into the sender queue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/08adacd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/08adacd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/08adacd2

Branch: refs/heads/feature/GEODE-1930
Commit: 08adacd2cfb93533ec016a82a0f71d7110e1819d
Parents: 582694d
Author: eshu <[email protected]>
Authored: Thu Oct 13 10:44:53 2016 -0700
Committer: eshu <[email protected]>
Committed: Thu Oct 13 10:44:53 2016 -0700

----------------------------------------------------------------------
 .../cache/AbstractBucketRegionQueue.java        | 34 +++++------
 .../geode/internal/cache/BucketRegionQueue.java | 59 +++++++++++---------
 .../wan/serial/SerialGatewaySenderQueue.java    | 26 ++++++---
 3 files changed, 68 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 8fa8597..7ae1249 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -357,31 +357,31 @@ public abstract class AbstractBucketRegionQueue extends 
BucketRegion {
       boolean ifOld, Object expectedOldValue, boolean requireOldValue,
       long lastModified, boolean overwriteDestroyed) throws TimeoutException,
       CacheWriterException {
-    boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-        requireOldValue, lastModified, overwriteDestroyed);
-    if (success) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Key : ----> {}", event.getKey());
+    try {
+      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
+          requireOldValue, lastModified, overwriteDestroyed);
+      if (success) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Key : ----> {}", event.getKey());
+        }      
+      } else {
+        GatewaySenderEventImpl.release(event.getRawNewValue());
       }
-      //@Unretained Object ov = event.getRawOldValue();
-      //if (ov instanceof GatewaySenderEventImpl) {
-      //  ((GatewaySenderEventImpl)ov).release();
-      //}
-       GatewaySenderEventImpl.release(event.getRawOldValue());
+      return success;
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
     }
-    return success;
     
   }
   @Override
   protected void basicDestroy(final EntryEventImpl event,
       final boolean cacheWrite, Object expectedOldValue)
       throws EntryNotFoundException, CacheWriterException, TimeoutException {
-    super.basicDestroy(event, cacheWrite, expectedOldValue);
-    //@Unretained Object rov = event.getRawOldValue();
-    //if (rov instanceof GatewaySenderEventImpl) {
-    //  ((GatewaySenderEventImpl) rov).release();
-    //}
-       GatewaySenderEventImpl.release(event.getRawOldValue());
+    try {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 294b616..ecc659a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -257,34 +257,38 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       boolean ifOld, Object expectedOldValue, boolean requireOldValue,
       long lastModified, boolean overwriteDestroyed) throws TimeoutException,
       CacheWriterException {
-    boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-        requireOldValue, lastModified, overwriteDestroyed);
-
-    if (success) {
-      GatewaySenderEventImpl.release(event.getRawOldValue());
-
-      if (getPartitionedRegion().getColocatedWith() == null) {
-        return success;
-      }
-
-      if (getPartitionedRegion().isConflationEnabled() && 
this.getBucketAdvisor().isPrimary()) {
-        Object object = event.getNewValue();
-        Long key = (Long)event.getKey();
-        if (object instanceof Conflatable) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Key :{} , Object : {} is conflatable", key, object);
-          }
-          // TODO: TO optimize by destroying on primary and secondary 
separately
-          // in case of conflation
-          conflateOldEntry((Conflatable)object, key);
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Object : {} is not conflatable", object);
+    try {
+      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
+          requireOldValue, lastModified, overwriteDestroyed);
+  
+      if (success) {
+        if (getPartitionedRegion().getColocatedWith() == null) {
+          return success;
+        }
+  
+        if (getPartitionedRegion().isConflationEnabled() && 
this.getBucketAdvisor().isPrimary()) {
+          Object object = event.getNewValue();
+          Long key = (Long)event.getKey();
+          if (object instanceof Conflatable) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Key :{} , Object : {} is conflatable", key, 
object);
+            }
+            // TODO: TO optimize by destroying on primary and secondary 
separately
+            // in case of conflation
+            conflateOldEntry((Conflatable)object, key);
+          } else {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Object : {} is not conflatable", object);
+            }
           }
         }
+      } else {
+        GatewaySenderEventImpl.release(event.getRawNewValue());
       }
+      return success;
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
     }
-    return success;
   }
 
   private void conflateOldEntry(Conflatable object, Long tailKey) {
@@ -357,9 +361,12 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     if (getPartitionedRegion().isConflationEnabled()) {
       removeIndex((Long)event.getKey());
     }
-    super.basicDestroy(event, cacheWrite, expectedOldValue);
+    try {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    } finally {
+      GatewaySenderEventImpl.release(event.getRawOldValue());
+    }
 
-    GatewaySenderEventImpl.release(event.getRawOldValue());
     // Primary buckets should already remove the key while peeking
     if (!this.getBucketAdvisor().isPrimary()) {
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 79b9d86..a22666c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1301,22 +1301,32 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
     protected void basicDestroy(final EntryEventImpl event,
         final boolean cacheWrite, Object expectedOldValue)
         throws EntryNotFoundException, CacheWriterException, TimeoutException {
-
-      super.basicDestroy(event, cacheWrite, expectedOldValue);
-      GatewaySenderEventImpl.release(event.getRawOldValue());
+      try {
+        super.basicDestroy(event, cacheWrite, expectedOldValue);
+      } finally {
+        GatewaySenderEventImpl.release(event.getRawOldValue());
+      }
     }
     @Override
     protected boolean virtualPut(EntryEventImpl event, boolean ifNew,
         boolean ifOld, Object expectedOldValue, boolean requireOldValue,
         long lastModified, boolean overwriteDestroyed) throws TimeoutException,
         CacheWriterException {
-      boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue,
-          requireOldValue, lastModified, overwriteDestroyed);
-
-      if (success) {
+      try {
+        boolean success = super.virtualPut(event, ifNew, ifOld, 
expectedOldValue,
+            requireOldValue, lastModified, overwriteDestroyed);
+        if (!success) {
+          //release offheap reference if GatewaySenderEventImpl is not put 
into 
+          //the region queue
+          GatewaySenderEventImpl.release(event.getRawNewValue());
+        }
+        return success;
+      } finally {
+        //GatewaySenderQueue probably only adding new events into the queue.
+        //Add the finally block just in case if there actually is an update 
+        //in the sender queue or occurs in the the future.
         GatewaySenderEventImpl.release(event.getRawOldValue());
       }
-      return success;
     }
   }
 }

Reply via email to