GEODE-1999: Fix offheap memory leak when exception is thrown during basicDestroy call to remove GatewaySenderEventImpl from the sender queue
Using try and finally to make sure the offheap reference will be released. Make similar changes for the parrellel wan queue as well. Also release offheap memory if a virtualPut failed to put the GatewaySenderEvent into the sender queue. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/08adacd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/08adacd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/08adacd2 Branch: refs/heads/feature/GEODE-1930 Commit: 08adacd2cfb93533ec016a82a0f71d7110e1819d Parents: 582694d Author: eshu <[email protected]> Authored: Thu Oct 13 10:44:53 2016 -0700 Committer: eshu <[email protected]> Committed: Thu Oct 13 10:44:53 2016 -0700 ---------------------------------------------------------------------- .../cache/AbstractBucketRegionQueue.java | 34 +++++------ .../geode/internal/cache/BucketRegionQueue.java | 59 +++++++++++--------- .../wan/serial/SerialGatewaySenderQueue.java | 26 ++++++--- 3 files changed, 68 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index 8fa8597..7ae1249 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -357,31 +357,31 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) throws TimeoutException, CacheWriterException { - boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, - requireOldValue, lastModified, overwriteDestroyed); - if (success) { - if (logger.isDebugEnabled()) { - logger.debug("Key : ----> {}", event.getKey()); + try { + boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified, overwriteDestroyed); + if (success) { + if (logger.isDebugEnabled()) { + logger.debug("Key : ----> {}", event.getKey()); + } + } else { + GatewaySenderEventImpl.release(event.getRawNewValue()); } - //@Unretained Object ov = event.getRawOldValue(); - //if (ov instanceof GatewaySenderEventImpl) { - // ((GatewaySenderEventImpl)ov).release(); - //} - GatewaySenderEventImpl.release(event.getRawOldValue()); + return success; + } finally { + GatewaySenderEventImpl.release(event.getRawOldValue()); } - return success; } @Override protected void basicDestroy(final EntryEventImpl event, final boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException, CacheWriterException, TimeoutException { - super.basicDestroy(event, cacheWrite, expectedOldValue); - //@Unretained Object rov = event.getRawOldValue(); - //if (rov instanceof GatewaySenderEventImpl) { - // ((GatewaySenderEventImpl) rov).release(); - //} - GatewaySenderEventImpl.release(event.getRawOldValue()); + try { + super.basicDestroy(event, cacheWrite, expectedOldValue); + } finally { + GatewaySenderEventImpl.release(event.getRawOldValue()); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 294b616..ecc659a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -257,34 +257,38 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) throws TimeoutException, CacheWriterException { - boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, - requireOldValue, lastModified, overwriteDestroyed); - - if (success) { - GatewaySenderEventImpl.release(event.getRawOldValue()); - - if (getPartitionedRegion().getColocatedWith() == null) { - return success; - } - - if (getPartitionedRegion().isConflationEnabled() && this.getBucketAdvisor().isPrimary()) { - Object object = event.getNewValue(); - Long key = (Long)event.getKey(); - if (object instanceof Conflatable) { - if (logger.isDebugEnabled()) { - logger.debug("Key :{} , Object : {} is conflatable", key, object); - } - // TODO: TO optimize by destroying on primary and secondary separately - // in case of conflation - conflateOldEntry((Conflatable)object, key); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Object : {} is not conflatable", object); + try { + boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified, overwriteDestroyed); + + if (success) { + if (getPartitionedRegion().getColocatedWith() == null) { + return success; + } + + if (getPartitionedRegion().isConflationEnabled() && this.getBucketAdvisor().isPrimary()) { + Object object = event.getNewValue(); + Long key = (Long)event.getKey(); + if (object instanceof Conflatable) { + if (logger.isDebugEnabled()) { + logger.debug("Key :{} , Object : {} is conflatable", key, object); + } + // TODO: TO optimize by destroying on primary and secondary separately + // in case of conflation + conflateOldEntry((Conflatable)object, key); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Object : {} is not conflatable", object); + } } } + } else { + GatewaySenderEventImpl.release(event.getRawNewValue()); } + return success; + } finally { + GatewaySenderEventImpl.release(event.getRawOldValue()); } - return success; } private void conflateOldEntry(Conflatable object, Long tailKey) { @@ -357,9 +361,12 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { if (getPartitionedRegion().isConflationEnabled()) { removeIndex((Long)event.getKey()); } - super.basicDestroy(event, cacheWrite, expectedOldValue); + try { + super.basicDestroy(event, cacheWrite, expectedOldValue); + } finally { + GatewaySenderEventImpl.release(event.getRawOldValue()); + } - GatewaySenderEventImpl.release(event.getRawOldValue()); // Primary buckets should already remove the key while peeking if (!this.getBucketAdvisor().isPrimary()) { if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08adacd2/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 79b9d86..a22666c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -1301,22 +1301,32 @@ public class SerialGatewaySenderQueue implements RegionQueue { protected void basicDestroy(final EntryEventImpl event, final boolean cacheWrite, Object expectedOldValue) throws EntryNotFoundException, CacheWriterException, TimeoutException { - - super.basicDestroy(event, cacheWrite, expectedOldValue); - GatewaySenderEventImpl.release(event.getRawOldValue()); + try { + super.basicDestroy(event, cacheWrite, expectedOldValue); + } finally { + GatewaySenderEventImpl.release(event.getRawOldValue()); + } } @Override protected boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, long lastModified, boolean overwriteDestroyed) throws TimeoutException, CacheWriterException { - boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, - requireOldValue, lastModified, overwriteDestroyed); - - if (success) { + try { + boolean success = super.virtualPut(event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified, overwriteDestroyed); + if (!success) { + //release offheap reference if GatewaySenderEventImpl is not put into + //the region queue + GatewaySenderEventImpl.release(event.getRawNewValue()); + } + return success; + } finally { + //GatewaySenderQueue probably only adding new events into the queue. + //Add the finally block just in case if there actually is an update + //in the sender queue or occurs in the the future. GatewaySenderEventImpl.release(event.getRawOldValue()); } - return success; } } }
