Repository: geode
Updated Branches:
  refs/heads/feature/GEM-1299 [created] 5f991020c


fix-1


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5f991020
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5f991020
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5f991020

Branch: refs/heads/feature/GEM-1299
Commit: 5f991020cced9bfe5bdf75b275adf9e4fdd31bb6
Parents: 5891ed7
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Apr 20 15:02:21 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 20 15:02:21 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegionQueue.java |  4 +
 .../parallel/ParallelGatewaySenderQueue.java    | 58 ++++++-------
 .../lucene/internal/LuceneEventListener.java    | 16 ++++
 .../LuceneIndexForPartitionedRegion.java        | 90 +++++++++++++++++++-
 .../lucene/internal/LuceneServiceImpl.java      |  2 +
 5 files changed, 139 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..bcc1d8d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -584,6 +584,10 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     this.notifyEntriesRemoved();
   }
 
+  public Object firstEventSeqNum() {
+    return this.eventSeqNumQueue.peek();
+  }
+
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
         && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();

http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index cf4c5a9..9696b90 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1104,38 +1104,36 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, 
Object key) {
     boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-    if (isPrimary) {
-      BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
-      // TODO : Kishor : Make sure we dont need to initalize a bucket
-      // before destroying a key from it
-      try {
-        if (brq != null) {
-          brq.destroyKey(key);
-        }
-        stats.decQueueSize();
-      } catch (EntryNotFoundException e) {
-        if (!this.sender.isBatchConflationEnabled() && 
logger.isDebugEnabled()) {
-          logger.debug(
-              "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException 
while removing key {} for {} for bucket = {} for GatewaySender {}",
-              key, this, bucketId, this.sender);
-        }
-      } catch (ForceReattemptException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Bucket :{} moved to other member", bucketId);
-        }
-      } catch (PrimaryBucketException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Primary bucket :{} moved to other member", bucketId);
-        }
-      } catch (RegionDestroyedException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "Caught RegionDestroyedException attempting to remove key {} 
from bucket {} in {}",
-              key, bucketId, prQ.getFullPath());
-        }
+    BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
+    // TODO : Kishor : Make sure we dont need to initalize a bucket
+    // before destroying a key from it
+    try {
+      if (brq != null) {
+        brq.destroyKey(key);
+      }
+      stats.decQueueSize();
+    } catch (EntryNotFoundException e) {
+      if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) {
+        logger.debug(
+            "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException 
while removing key {} for {} for bucket = {} for GatewaySender {}",
+            key, this, bucketId, this.sender);
+      }
+    } catch (ForceReattemptException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Bucket :{} moved to other member", bucketId);
+      }
+    } catch (PrimaryBucketException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Primary bucket :{} moved to other member", bucketId);
+      }
+    } catch (RegionDestroyedException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Caught RegionDestroyedException attempting to remove key {} from 
bucket {} in {}", key,
+            bucketId, prQ.getFullPath());
       }
-      addRemovedEvent(prQ, bucketId, key);
     }
+    addRemovedEvent(prQ, bucketId, key);
   }
 
   public void resetLastPeeked() {

http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index 0f55533..62983ef 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -31,7 +31,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import 
org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.internal.cache.BucketNotFoundException;
@@ -111,12 +113,14 @@ public class LuceneEventListener implements 
AsyncEventListener {
       }
       return true;
     } catch (BucketNotFoundException | RegionDestroyedException | 
PrimaryBucketException e) {
+      redistributeEvents(events);
       logger.debug("Bucket not found while saving to lucene index: " + 
e.getMessage(), e);
       return false;
     } catch (CacheClosedException e) {
       logger.debug("Unable to save to lucene index, cache has been closed", e);
       return false;
     } catch (AlreadyClosedException e) {
+      redistributeEvents(events);
       logger.debug("Unable to commit, the lucene index is already closed", e);
       return false;
     } catch (IOException e) {
@@ -126,6 +130,18 @@ public class LuceneEventListener implements 
AsyncEventListener {
     }
   }
 
+  private void redistributeEvents(final List<AsyncEvent> events) {
+    for (AsyncEvent event : events) {
+      try {
+        FunctionService.onRegion(event.getRegion())
+            .withArgs(new Object[] {event.getRegion().getName(), 
event.getKey(), event})
+            .execute(PokeLuceneAsyncQueueFunction.ID);
+      } catch (RegionDestroyedException | PrimaryBucketException | 
CacheClosedException e) {
+        logger.debug("Unable to redistribute async event for :" + 
event.getKey() + " : " + event);
+      }
+    }
+  }
+
   public static void setExceptionObserver(LuceneExceptionObserver observer) {
     if (observer == null) {
       observer = exception -> {

http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index c39a4a8..dbd31ba 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -15,20 +15,28 @@
 
 package org.apache.geode.cache.lucene.internal;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.FixedPartitionResolver;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
+import 
org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
 import 
org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
 import 
org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
@@ -39,14 +47,22 @@ import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
 
 /* wrapper of IndexWriter */
 public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
   protected Region fileAndChunkRegion;
   protected final FileSystemStats fileSystemStats;
 
+  private StuckThreadCleaner stuckCleanerThread;
   public static final String FILES_REGION_SUFFIX = ".files";
 
   public LuceneIndexForPartitionedRegion(String indexName, String regionPath, 
InternalCache cache) {
@@ -166,7 +182,9 @@ public class LuceneIndexForPartitionedRegion extends 
LuceneIndexImpl {
     return createRegion(regionName, attributes);
   }
 
-  public void close() {}
+  public void close() {
+    stuckCleanerThread.finish();
+  }
 
   @Override
   public void dumpFiles(final String directory) {
@@ -234,4 +252,74 @@ public class LuceneIndexForPartitionedRegion extends 
LuceneIndexImpl {
       }
     }
   }
+
+  @Override
+  protected AsyncEventQueue createAEQ(Region dataRegion) {
+    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) 
super.createAEQ(dataRegion);
+    startStuckCleaner(queue);
+    return queue;
+  }
+
+  private void startStuckCleaner(AsyncEventQueueImpl queue) {
+    stuckCleanerThread = new StuckThreadCleaner(queue);
+    Thread t = new Thread(stuckCleanerThread);
+    t.setDaemon(true);
+    t.start();
+  }
+
+  private static class StuckThreadCleaner implements Runnable {
+    private boolean done = false;
+    AsyncEventQueueImpl queue;
+
+    public StuckThreadCleaner(AsyncEventQueueImpl queue) {
+      this.queue = queue;
+    }
+
+    public void run() {
+      AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
+      List<ParallelGatewaySenderEventProcessor> processors =
+          ((ConcurrentParallelGatewaySenderEventProcessor) 
sender.getEventProcessor())
+              .getProcessors();
+
+      ConcurrentParallelGatewaySenderQueue prq =
+          (ConcurrentParallelGatewaySenderQueue) sender.getQueue();
+      PartitionedRegion pr = (PartitionedRegion) prq.getRegion();
+      HashMap lastPeekedEvents = new HashMap();
+
+      while (!done) {
+        try {
+          for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) 
{
+            if (!br.getBucketAdvisor().isPrimary()) {
+              AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) 
br).firstEventSeqNum();
+              AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, 
currentFirst);
+              if (currentFirst.equals(lastPeek)) {
+                redistributeEvents(lastPeek);
+              }
+            } else {
+              lastPeekedEvents.put(br, null);
+            }
+          }
+          Thread.sleep(10000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    public void finish() {
+      this.done = true;
+    }
+
+    private void redistributeEvents(final AsyncEvent event) {
+      try {
+        logger.info("JASON unsticking event:" + event.getKey() + ":" + event);
+        FunctionService.onRegion(event.getRegion())
+            .withArgs(new Object[] {event.getRegion().getName(), 
event.getKey(), event})
+            .execute(PokeLuceneAsyncQueueFunction.ID);
+      } catch (RegionDestroyedException | PrimaryBucketException | 
CacheClosedException e) {
+        logger.debug("Unable to redistribute async event for :" + 
event.getKey() + " : " + event);
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/5f991020/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 935f37c..08c9172 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -19,6 +19,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
+import 
org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import 
org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
 import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
@@ -101,6 +102,7 @@ public class LuceneServiceImpl implements 
InternalLuceneService {
     FunctionService.registerFunction(new LuceneGetPageFunction());
     FunctionService.registerFunction(new WaitUntilFlushedFunction());
     FunctionService.registerFunction(new DumpDirectoryFiles());
+    FunctionService.registerFunction(new PokeLuceneAsyncQueueFunction());
     registerDataSerializables();
   }
 

Reply via email to