fix-6
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ea3420b3 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ea3420b3 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ea3420b3 Branch: refs/heads/feature/GEM-1299 Commit: ea3420b3efd9dc21fa29f0f7b70bef911d4e827e Parents: 80a95f6 Author: zhouxh <[email protected]> Authored: Wed Apr 26 23:23:13 2017 -0700 Committer: zhouxh <[email protected]> Committed: Wed Apr 26 23:26:22 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/geode/internal/cache/BucketRegion.java | 2 +- .../org/apache/geode/internal/cache/BucketRegionQueue.java | 6 +++++- .../java/org/apache/geode/internal/cache/LocalRegion.java | 2 +- .../cache/wan/parallel/ParallelGatewaySenderQueue.java | 1 + .../lucene/internal/LuceneIndexForPartitionedRegion.java | 9 +++++---- .../internal/distributed/PokeLuceneAsyncQueueFunction.java | 3 +++ 6 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 136d7b9..cde7cf4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -668,7 +668,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } - protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { + public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { // We don't need to clone the event for new Gateway Senders. // Preserve the bucket reference for resetting it later. LocalRegion bucketRegion = event.getRegion(); http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index bcc1d8d..56ae3f1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -384,7 +384,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { /** * Does a get that gets the value without fault values in from disk. */ - private Object optimalGet(Object k) { + public Object optimalGet(Object k) { // Get the object at that key (to remove the index). Object object = null; try { @@ -588,6 +588,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { return this.eventSeqNumQueue.peek(); } + public BlockingQueue getEventSeqNumQueue() { + return eventSeqNumQueue; + } + public boolean isReadyForPeek() { return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty() && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary(); http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 45035d7..200640e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -6338,7 +6338,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return false; } - protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { + public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { if (isPdxTypesRegion() || event.isConcurrencyConflict() /* usually concurrent cache modification problem */) { http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 9696b90..87feb21 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1133,6 +1133,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { bucketId, prQ.getFullPath()); } } + brq.getEventSeqNumQueue().add(key); addRemovedEvent(prQ, bucketId, key); } http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index a60ca01..6e3dce0 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -290,16 +290,17 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { try { for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { if (!br.getBucketAdvisor().isPrimary()) { - AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum(); - AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst); + Long currentFirst = (Long) ((BucketRegionQueue) br).firstEventSeqNum(); + Long lastPeek = (Long) lastPeekedEvents.put(br, currentFirst); if (currentFirst != null && currentFirst.equals(lastPeek)) { - redistributeEvents(lastPeek); + redistributeEvents((AsyncEvent) ((BucketRegionQueue) br).optimalGet(currentFirst)); + lastPeekedEvents.put(br, ((BucketRegionQueue) br).firstEventSeqNum()); } } else { lastPeekedEvents.put(br, null); } } - Thread.sleep(10000); + Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java index 992972b..10c6888 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java @@ -52,6 +52,7 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity { PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet(); Cache cache = pr.getCache(); String queueId = (String) pr.getAttributes().getAsyncEventQueueIds().iterator().next(); + // PR could have many AEQs, not just AEQ for lucene AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(queueId); // Get the GatewaySender @@ -60,6 +61,8 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity { // Update the shadow key BucketRegion br = pr.getBucketRegion(key); if (br.getBucketAdvisor().isPrimary()) { + // only do it for primary? how about failover again to secondary? + // why not br.notifyGatewaySender(operation, event); try { List<ParallelGatewaySenderEventProcessor> processors = ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())
