Repository: geode Updated Branches: refs/heads/feature/GEM-1299 [created] 5f991020c
fix-1 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5f991020 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5f991020 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5f991020 Branch: refs/heads/feature/GEM-1299 Commit: 5f991020cced9bfe5bdf75b275adf9e4fdd31bb6 Parents: 5891ed7 Author: zhouxh <gz...@pivotal.io> Authored: Thu Apr 20 15:02:21 2017 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Thu Apr 20 15:02:21 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/BucketRegionQueue.java | 4 + .../parallel/ParallelGatewaySenderQueue.java | 58 ++++++------- .../lucene/internal/LuceneEventListener.java | 16 ++++ .../LuceneIndexForPartitionedRegion.java | 90 +++++++++++++++++++- .../lucene/internal/LuceneServiceImpl.java | 2 + 5 files changed, 139 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 7a21d12..bcc1d8d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -584,6 +584,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { this.notifyEntriesRemoved(); } + public Object firstEventSeqNum() { + return this.eventSeqNumQueue.peek(); + } + public boolean isReadyForPeek() { return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty() && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary(); http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index cf4c5a9..9696b90 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1104,38 +1104,36 @@ public class ParallelGatewaySenderQueue implements RegionQueue { private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) { boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); - if (isPrimary) { - BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); - // TODO : Kishor : Make sure we dont need to initalize a bucket - // before destroying a key from it - try { - if (brq != null) { - brq.destroyKey(key); - } - stats.decQueueSize(); - } catch (EntryNotFoundException e) { - if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) { - logger.debug( - "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", - key, this, bucketId, this.sender); - } - } catch (ForceReattemptException e) { - if (logger.isDebugEnabled()) { - logger.debug("Bucket :{} moved to other member", bucketId); - } - } catch (PrimaryBucketException e) { - if (logger.isDebugEnabled()) { - logger.debug("Primary bucket :{} moved to other member", bucketId); - } - } catch (RegionDestroyedException e) { - if (logger.isDebugEnabled()) { - logger.debug( - "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", - key, bucketId, prQ.getFullPath()); - } + BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); + // TODO : Kishor : Make sure we dont need to initalize a bucket + // before destroying a key from it + try { + if (brq != null) { + brq.destroyKey(key); + } + stats.decQueueSize(); + } catch (EntryNotFoundException e) { + if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) { + logger.debug( + "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}", + key, this, bucketId, this.sender); + } + } catch (ForceReattemptException e) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket :{} moved to other member", bucketId); + } + } catch (PrimaryBucketException e) { + if (logger.isDebugEnabled()) { + logger.debug("Primary bucket :{} moved to other member", bucketId); + } + } catch (RegionDestroyedException e) { + if (logger.isDebugEnabled()) { + logger.debug( + "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key, + bucketId, prQ.getFullPath()); } - addRemovedEvent(prQ, bucketId, key); } + addRemovedEvent(prQ, bucketId, key); } public void resetLastPeeked() { http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java index 0f55533..62983ef 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java @@ -31,7 +31,9 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; +import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.internal.cache.BucketNotFoundException; @@ -111,12 +113,14 @@ public class LuceneEventListener implements AsyncEventListener { } return true; } catch (BucketNotFoundException | RegionDestroyedException | PrimaryBucketException e) { + redistributeEvents(events); logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e); return false; } catch (CacheClosedException e) { logger.debug("Unable to save to lucene index, cache has been closed", e); return false; } catch (AlreadyClosedException e) { + redistributeEvents(events); logger.debug("Unable to commit, the lucene index is already closed", e); return false; } catch (IOException e) { @@ -126,6 +130,18 @@ public class LuceneEventListener implements AsyncEventListener { } } + private void redistributeEvents(final List<AsyncEvent> events) { + for (AsyncEvent event : events) { + try { + FunctionService.onRegion(event.getRegion()) + .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event}) + .execute(PokeLuceneAsyncQueueFunction.ID); + } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) { + logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event); + } + } + } + public static void setExceptionObserver(LuceneExceptionObserver observer) { if (observer == null) { observer = exception -> { http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index c39a4a8..dbd31ba 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -15,20 +15,28 @@ package org.apache.geode.cache.lucene.internal; +import java.util.HashMap; +import java.util.List; import java.util.Set; import org.apache.geode.CancelException; import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.FixedPartitionResolver; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles; +import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction; import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver; import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver; @@ -39,14 +47,22 @@ import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PrimaryBucketException; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; /* wrapper of IndexWriter */ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { protected Region fileAndChunkRegion; protected final FileSystemStats fileSystemStats; + private StuckThreadCleaner stuckCleanerThread; public static final String FILES_REGION_SUFFIX = ".files"; public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) { @@ -166,7 +182,9 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { return createRegion(regionName, attributes); } - public void close() {} + public void close() { + stuckCleanerThread.finish(); + } @Override public void dumpFiles(final String directory) { @@ -234,4 +252,74 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } } } + + @Override + protected AsyncEventQueue createAEQ(Region dataRegion) { + AsyncEventQueueImpl queue = (AsyncEventQueueImpl) super.createAEQ(dataRegion); + startStuckCleaner(queue); + return queue; + } + + private void startStuckCleaner(AsyncEventQueueImpl queue) { + stuckCleanerThread = new StuckThreadCleaner(queue); + Thread t = new Thread(stuckCleanerThread); + t.setDaemon(true); + t.start(); + } + + private static class StuckThreadCleaner implements Runnable { + private boolean done = false; + AsyncEventQueueImpl queue; + + public StuckThreadCleaner(AsyncEventQueueImpl queue) { + this.queue = queue; + } + + public void run() { + AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender(); + List<ParallelGatewaySenderEventProcessor> processors = + ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor()) + .getProcessors(); + + ConcurrentParallelGatewaySenderQueue prq = + (ConcurrentParallelGatewaySenderQueue) sender.getQueue(); + PartitionedRegion pr = (PartitionedRegion) prq.getRegion(); + HashMap lastPeekedEvents = new HashMap(); + + while (!done) { + try { + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + if (!br.getBucketAdvisor().isPrimary()) { + AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum(); + AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst); + if (currentFirst.equals(lastPeek)) { + redistributeEvents(lastPeek); + } + } else { + lastPeekedEvents.put(br, null); + } + } + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + public void finish() { + this.done = true; + } + + private void redistributeEvents(final AsyncEvent event) { + try { + logger.info("JASON unsticking event:" + event.getKey() + ":" + event); + FunctionService.onRegion(event.getRegion()) + .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event}) + .execute(PokeLuceneAsyncQueueFunction.ID); + } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) { + logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event); + } + } + + } } http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 935f37c..08c9172 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction; +import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction; import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean; import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener; import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction; @@ -101,6 +102,7 @@ public class LuceneServiceImpl implements InternalLuceneService { FunctionService.registerFunction(new LuceneGetPageFunction()); FunctionService.registerFunction(new WaitUntilFlushedFunction()); FunctionService.registerFunction(new DumpDirectoryFiles()); + FunctionService.registerFunction(new PokeLuceneAsyncQueueFunction()); registerDataSerializables(); }