Repository: geode Updated Branches: refs/heads/develop 50686b0b4 -> 0862174c3
GEODE-2806: if the batch is dispatched, even the bucket is no longer primary, the batch should still be deleted as planned. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0862174c Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0862174c Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0862174c Branch: refs/heads/develop Commit: 0862174c30cad1536f2c105b783653bd0d4344e8 Parents: 50686b0 Author: zhouxh <gz...@pivotal.io> Authored: Fri Apr 21 09:57:05 2017 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Fri Apr 21 09:58:19 2017 -0700 ---------------------------------------------------------------------- .../parallel/ParallelGatewaySenderQueue.java | 58 ++++++++++---------- 1 file changed, 28 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/0862174c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index cf4c5a9..9696b90 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1104,38 +1104,36 @@ public class ParallelGatewaySenderQueue implements RegionQueue { private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) { boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); - if (isPrimary) { - BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); - // TODO : Kishor : Make sure we dont need to initalize a bucket - // before destroying a key from it - try { - if (brq != null) { - brq.destroyKey(key); - } - stats.decQueueSize(); - } catch (EntryNotFoundException e) { - if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) { - logger.debug( - "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", - key, this, bucketId, this.sender); - } - } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug("Bucket :{} moved to other member", bucketId); - } - } catch (PrimaryBucketException e) { - if (logger.isDebugEnabled()) { - logger.debug("Primary bucket :{} moved to other member", bucketId); - } - } catch (RegionDestroyedException e) { - if (logger.isDebugEnabled()) { - logger.debug( - "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", - key, bucketId, prQ.getFullPath()); - } + BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); + // TODO : Kishor : Make sure we dont need to initalize a bucket + // before destroying a key from it + try { + if (brq != null) { + brq.destroyKey(key); + } + stats.decQueueSize(); + } catch (EntryNotFoundException e) { + if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) { + logger.debug( + "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", + key, this, bucketId, this.sender); + } + } catch (ForceReattemptException e) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket :{} moved to other member", bucketId); + } + } catch (PrimaryBucketException e) { + if (logger.isDebugEnabled()) { + logger.debug("Primary bucket :{} moved to other member", bucketId); + } + } catch (RegionDestroyedException e) { + if (logger.isDebugEnabled()) { + logger.debug( + "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key, + bucketId, prQ.getFullPath()); } - addRemovedEvent(prQ, bucketId, key); } + addRemovedEvent(prQ, bucketId, key); } public void resetLastPeeked() {