GEODE-1677: Events are now added to tmpQueuedEvents while shadow pr is created
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9103a3db Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9103a3db Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9103a3db Branch: refs/heads/master Commit: 9103a3db8642b9b9b4e00aa2d9a8ae3a0ddbc906 Parents: 54f305b Author: Barry Oglesby <[email protected]> Authored: Wed Jul 13 17:33:05 2016 -0700 Committer: Barry Oglesby <[email protected]> Committed: Thu Jul 21 13:47:54 2016 -0700 ---------------------------------------------------------------------- ...rentParallelGatewaySenderEventProcessor.java | 2 +- .../ConcurrentParallelGatewaySenderQueue.java | 23 ++++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9103a3db/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 04015f7..8b6a700 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -107,7 +107,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew createProcessors(sender.getDispatcherThreads(), targetRs); // this.queue = parallelQueue; - this.queue = new ConcurrentParallelGatewaySenderQueue(this.processors); + this.queue = new ConcurrentParallelGatewaySenderQueue(sender, this.processors); setDaemon(true); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9103a3db/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java index f995ba4..ccdf42a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java @@ -27,6 +27,7 @@ import com.gemstone.gemfire.internal.cache.DistributedRegion; import com.gemstone.gemfire.internal.cache.ForceReattemptException; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; @@ -53,10 +54,13 @@ import com.gemstone.gemfire.internal.size.SingleObjectSizer; */ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { + private final AbstractGatewaySender sender; + private final ParallelGatewaySenderEventProcessor processors[]; - public ConcurrentParallelGatewaySenderQueue( + public ConcurrentParallelGatewaySenderQueue(AbstractGatewaySender sender, ParallelGatewaySenderEventProcessor pro[]) { + this.sender = sender; this.processors = pro; } @@ -168,9 +172,20 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { } public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) { - for(int i =0; i< processors.length; i++){ - processors[i].addShadowPartitionedRegionForUserPR(pr); - } + // Reset enqueuedAllTempQueueEvents if the sender is running + // This is done so that any events received while the shadow PR is added are queued in the tmpQueuedEvents + // instead of blocking the distribute call which could cause a deadlock. See GEM-801. + if (this.sender.isRunning()) { + this.sender.setEnqueuedAllTempQueueEvents(false); + } + this.sender.getLifeCycleLock().writeLock().lock(); + try { + for (int i = 0; i < processors.length; i++) { + processors[i].addShadowPartitionedRegionForUserPR(pr); + } + } finally { + this.sender.getLifeCycleLock().writeLock().unlock(); + } } private ParallelGatewaySenderEventProcessor getPGSProcessor(int bucketId) {
