GEODE-11 Adding tests of closing a cache during index updates

Testing failover of lucene indexes by closing the cache while in the
middle of updating lucene indexes. Currently there are tests for closing
the cache before the index repository commit, and also during the commit
after a fixed number of updates to the underling index data regions.

I refactored the lucene tests to use 7 buckets, rather than 113, so they
take less time and are easier to debug.

I also removed a call to Thread.interrupt in the WAN code because it was
interrupting itself in my callback. We should never be using interrupt
in the product.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4a0de723
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4a0de723
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4a0de723

Branch: refs/heads/master
Commit: 4a0de723ce07977a7965b5e7a1bf57d15ec9456a
Parents: 23211dd
Author: Dan Smith <[email protected]>
Authored: Thu Jul 7 10:02:25 2016 -0700
Committer: Dan Smith <[email protected]>
Committed: Wed Jul 13 15:27:07 2016 -0700

----------------------------------------------------------------------
 .../AbstractGatewaySenderEventProcessor.java    |   1 -
 ...rentParallelGatewaySenderEventProcessor.java |   2 +-
 .../lucene/internal/IndexRepositoryFactory.java |   2 +-
 .../cache/lucene/LuceneQueriesPRBase.java       | 140 +++++--------------
 .../lucene/LuceneQueriesPeerPRDUnitTest.java    |   5 +-
 .../LuceneQueriesPeerPROverflowDUnitTest.java   |   1 +
 .../LuceneQueriesPeerPRRedundancyDUnitTest.java |  53 ++++++-
 .../cache/lucene/test/IndexRegionSpy.java       |  82 +++++++++++
 .../cache/lucene/test/IndexRepositorySpy.java   | 130 +++++++++++++++++
 9 files changed, 297 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0e83557..b7b1ffc 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1155,7 +1155,6 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
     dispatcher.stop();
 
     if (this.isAlive()) {
-      this.interrupt();
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Joining with the dispatcher thread upto limit of 5 
seconds", this);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 82a53d3..04015f7 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -273,7 +273,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor 
extends AbstractGatew
         }
       }
     } catch (InterruptedException e) {
-      throw new InternalGemFireException(e.getMessage());
+      throw new InternalGemFireException(e);
     } catch (RejectedExecutionException rejectedExecutionEx) {
       throw rejectedExecutionEx;
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
index 12f12ad..ae4b88b 100644
--- 
a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
@@ -62,7 +62,7 @@ public class IndexRepositoryFactory {
   /**
    * Find the bucket in region2 that matches the bucket id from region1.
    */
-  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer 
bucketId) {
+  protected BucketRegion getMatchingBucket(PartitionedRegion region, Integer 
bucketId) {
     //Force the bucket to be created if it is not already
     region.getOrCreateNodeForBucketWrite(bucketId, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
index 889b16f..1de600d 100644
--- 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
@@ -19,38 +19,26 @@
 
 package com.gemstone.gemfire.cache.lucene;
 
+import static com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy.doOnce;
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
 
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
-import org.apache.lucene.analysis.Analyzer;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
 
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.control.RebalanceOperation;
 import com.gemstone.gemfire.cache.control.RebalanceResults;
-import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
-import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
-import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
-import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy;
 import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.dunit.VM;
 
@@ -61,6 +49,7 @@ import com.gemstone.gemfire.test.dunit.VM;
  *
  */
 public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
+  protected static final int NUM_BUCKETS = 7;
 
   @After
   public void cleanupRebalanceCallback() {
@@ -68,8 +57,6 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     removeCallback(dataStore2);
   }
 
-
-
   @Test
   public void returnCorrectResultsWhenRebalanceHappensOnIndexUpdate() throws 
InterruptedException {
     addCallbackToTriggerRebalance(dataStore1);
@@ -95,46 +82,54 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     putEntriesAndValidateQueryResults();
   }
 
-  protected void putEntriesAndValidateQueryResults() {
+  @Test
+  public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws 
InterruptedException {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
     };
     dataStore1.invoke(() -> initDataStore(createIndex));
     accessor.invoke(() -> initAccessor(createIndex));
-    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
 
-    put113Entries();
+    putEntryInEachBucket();
 
     dataStore2.invoke(() -> initDataStore(createIndex));
-    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
 
-    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+    rebalanceRegion(dataStore2);
 
-    executeTextSearch(accessor, "world", "text", 113);
+    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
   }
 
   @Test
-  public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws 
InterruptedException {
+  public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() 
throws InterruptedException {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
     };
     dataStore1.invoke(() -> initDataStore(createIndex));
     accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
 
-    put113Entries();
+    putEntryInEachBucket();
 
     dataStore2.invoke(() -> initDataStore(createIndex));
+    rebalanceRegion(dataStore2);
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+
     assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
 
-    rebalanceRegion(dataStore2);
+    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
+  }
 
-    executeTextSearch(accessor, "world", "text", 113);
+  protected PartitionAttributes getPartitionAttributes() {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    factory.setLocalMaxMemory(100);
+    factory.setTotalNumBuckets(NUM_BUCKETS);
+    return factory.create();
   }
 
-  @Test
-  public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() 
throws InterruptedException {
+  protected void putEntriesAndValidateQueryResults() {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
@@ -143,22 +138,21 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     accessor.invoke(() -> initAccessor(createIndex));
     dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
 
-    put113Entries();
+    putEntryInEachBucket();
 
     dataStore2.invoke(() -> initDataStore(createIndex));
-    rebalanceRegion(dataStore2);
     dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
 
-    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
 
-    executeTextSearch(accessor, "world", "text", 113);
+    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
   }
 
-  protected void put113Entries() {
+  protected void putEntryInEachBucket() {
     accessor.invoke(() -> {
       final Cache cache = getCache();
       Region<Object, Object> region = cache.getRegion(REGION_NAME);
-      IntStream.range(0,113).forEach(i -> region.put(i, new TestObject("hello 
world")));
+      IntStream.range(0,NUM_BUCKETS).forEach(i -> region.put(i, new 
TestObject("hello world")));
     });
   }
 
@@ -166,7 +160,7 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     vm.invoke(() -> {
       IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
 
-      spy.beforeWrite(doOnce(key -> rebalanceRegion(vm)));
+      spy.beforeWriteIndexRepository(doOnce(key -> rebalanceRegion(vm)));
     });
   }
 
@@ -174,7 +168,7 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     vm.invoke(() -> {
       IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
 
-      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+      spy.beforeWriteIndexRepository(doOnce(key -> moveBucket(destination, 
key)));
     });
   }
 
@@ -196,72 +190,6 @@ public abstract class LuceneQueriesPRBase extends 
LuceneQueriesBase {
     });
   }
 
-  protected static class IndexRepositorySpy extends IndexRepositoryFactory {
-
-    private Consumer<Object> beforeWrite = key -> {};
-
-    public static IndexRepositorySpy injectSpy() {
-      IndexRepositorySpy factory = new IndexRepositorySpy();
-      PartitionedRepositoryManager.indexRepositoryFactory = factory;
-      return factory;
-    }
-
-    public static void remove() {
-      PartitionedRepositoryManager.indexRepositoryFactory = new 
IndexRepositoryFactory();
-    }
-
-    private IndexRepositorySpy() {
-    }
-
-    @Override
-    public IndexRepository createIndexRepository(final Integer bucketId,
-                                                 final PartitionedRegion 
userRegion,
-                                                 final PartitionedRegion 
fileRegion,
-                                                 final PartitionedRegion 
chunkRegion,
-                                                 final LuceneSerializer 
serializer,
-                                                 final Analyzer analyzer,
-                                                 final LuceneIndexStats 
indexStats,
-                                                 final FileSystemStats 
fileSystemStats)
-      throws IOException
-    {
-      final IndexRepository indexRepo = super.createIndexRepository(bucketId, 
userRegion, fileRegion, chunkRegion,
-        serializer, analyzer,
-        indexStats,
-        fileSystemStats);
-      final IndexRepository spy = Mockito.spy(indexRepo);
-
-      Answer invokeBeforeWrite = invocation -> {
-        beforeWrite.accept(invocation.getArgumentAt(0, Object.class));
-        invocation.callRealMethod();
-        return null;
-      };
-      doAnswer(invokeBeforeWrite).when(spy).update(any(), any());
-      doAnswer(invokeBeforeWrite).when(spy).create(any(), any());
-      doAnswer(invokeBeforeWrite).when(spy).delete(any());
-
-      return spy;
-    }
-
-    /**
-     * Add a callback that runs before a call to
-     * {@link IndexRepository#create(Object, Object)}
-     */
-    public void beforeWrite(Consumer<Object> action) {
-      this.beforeWrite = action;
-    }
-  }
-
-  protected static <T> Consumer<T> doOnce(Consumer<T> consumer) {
-    return new Consumer<T>() {
-      boolean done;
-
-      @Override
-      public void accept(final T t) {
-        if (!done) {
-          done = true;
-          consumer.accept(t);
-        }
-      }
-    };
-  };
+  ;
+  ;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
index 00b8254..702ac1f 100644
--- 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
@@ -18,7 +18,6 @@ package com.gemstone.gemfire.cache.lucene;
 
 import static 
com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
 
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
@@ -30,10 +29,8 @@ public class LuceneQueriesPeerPRDUnitTest extends 
LuceneQueriesPRBase {
 
   @Override protected void initDataStore(final SerializableRunnableIF 
createIndex) throws Exception {
     createIndex.run();
-    PartitionAttributesFactory partitionAttributesFactory = new 
PartitionAttributesFactory();
-    partitionAttributesFactory.setLocalMaxMemory(100);
     getCache().createRegionFactory(RegionShortcut.PARTITION)
-      .setPartitionAttributes(partitionAttributesFactory.create())
+      .setPartitionAttributes(getPartitionAttributes())
       .create(REGION_NAME);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java
index 8fd0a08..3ea561a 100644
--- 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java
@@ -33,6 +33,7 @@ public class LuceneQueriesPeerPROverflowDUnitTest extends 
LuceneQueriesPRBase {
     createIndex.run();
     EvictionAttributes evicAttr = 
EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK);
     getCache().createRegionFactory(RegionShortcut.PARTITION_OVERFLOW)
+      .setPartitionAttributes(getPartitionAttributes())
       .setEvictionAttributes(evicAttr)
       .create(REGION_NAME);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
index 0a7bb67..dc86a7c 100644
--- 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
@@ -16,13 +16,18 @@
  */
 package com.gemstone.gemfire.cache.lucene;
 
+import static com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy.*;
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.lucene.test.IndexRegionSpy;
+import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy;
 import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
-import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import 
com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
@@ -31,6 +36,7 @@ import 
com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.jayway.awaitility.Awaitility;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -40,7 +46,9 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends 
LuceneQueriesPRBase
 
   @Override protected void initDataStore(final SerializableRunnableIF 
createIndex) throws Exception {
     createIndex.run();
-    
getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT).create(REGION_NAME);
+    Region region = 
getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
+      .setPartitionAttributes(getPartitionAttributes())
+      .create(REGION_NAME);
   }
 
   @Override protected void initAccessor(final SerializableRunnableIF 
createIndex) throws Exception {
@@ -52,6 +60,37 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends 
LuceneQueriesPRBase
     final DistributedMember member2 = dataStore2.invoke(() -> 
getCache().getDistributedSystem().getDistributedMember());
     addCallbackToMovePrimary(dataStore1, member2);
 
+    putEntriesAndValidateResultsWithRedundancy();
+  }
+
+  @Test
+  public void returnCorrectResultsWhenCloseCacheHappensOnIndexUpdate() throws 
InterruptedException {
+    dataStore1.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWriteIndexRepository(doAfterN(key -> getCache().close(), 2));
+    });
+
+    putEntriesAndValidateResultsWithRedundancy();
+
+    //Wait until the cache is closed in datastore1
+    dataStore1.invoke(() -> Awaitility.await().atMost(60, 
TimeUnit.SECONDS).until(basicGetCache()::isClosed));
+  }
+
+  @Test
+  public void returnCorrectResultsWhenCloseCacheHappensOnPartialIndexWrite() 
throws InterruptedException {
+    final DistributedMember member2 = dataStore2.invoke(() -> 
getCache().getDistributedSystem().getDistributedMember());
+    dataStore1.invoke(() -> {
+      IndexRegionSpy.beforeWrite(getCache(), doAfterN(key -> 
getCache().close(), 100));
+    });
+
+    putEntriesAndValidateResultsWithRedundancy();
+
+    //Wait until the cache is closed in datastore1
+    dataStore1.invoke(() -> Awaitility.await().atMost(60, 
TimeUnit.SECONDS).until(basicGetCache()::isClosed));
+  }
+
+  private void putEntriesAndValidateResultsWithRedundancy() {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
@@ -60,21 +99,23 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends 
LuceneQueriesPRBase
     dataStore2.invoke(() -> initDataStore(createIndex));
     accessor.invoke(() -> initAccessor(createIndex));
     dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+    dataStore2.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
 
-    put113Entries();
+    putEntryInEachBucket();
 
     dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+    dataStore2.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
 
-    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore2, 60000));
 
-    executeTextSearch(accessor, "world", "text", 113);
+    executeTextSearch(accessor, "world", "text", NUM_BUCKETS);
   }
 
   protected void addCallbackToMovePrimary(VM vm, final DistributedMember 
destination) {
     vm.invoke(() -> {
       IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
 
-      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+      spy.beforeWriteIndexRepository(doOnce(key -> moveBucket(destination, 
key)));
     });
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java
new file mode 100644
index 0000000..1e16707
--- /dev/null
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.test;
+
+import java.util.function.Consumer;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.RegionListener;
+
+/**
+ * Allows spying on operations that happen to an the regions underlying a 
lucene index.
+ */
+public class IndexRegionSpy {
+
+  public static void beforeWrite(Cache cache, final Consumer<Object> 
beforeWrite) {
+    GemFireCacheImpl gemfireCache = (GemFireCacheImpl) cache;
+    gemfireCache.addRegionListener(new SpyRegionListener(beforeWrite));
+  }
+
+  private static class SpyRegionListener implements RegionListener {
+
+    private final Consumer<Object> beforeWrite;
+
+    public SpyRegionListener(final Consumer<Object> beforeWrite) {
+      this.beforeWrite = beforeWrite;
+    }
+
+    @Override
+    public RegionAttributes beforeCreate(final Region parent,
+                                         final String regionName,
+                                         final RegionAttributes attrs,
+                                         final InternalRegionArguments 
internalRegionArgs)
+    {
+      return attrs;
+    }
+
+    @Override public void afterCreate(final Region region) {
+      if(region.getName().contains(".files") || 
region.getName().contains(".chunks")) {
+        region.getAttributesMutator().addCacheListener(new 
CacheListenerAdapter() {
+          @Override public void afterCreate(final EntryEvent event) {
+            beforeWrite.accept(event.getKey());
+          }
+
+          @Override public void afterDestroy(final EntryEvent event) {
+            beforeWrite.accept(event.getKey());
+          }
+
+          @Override public void afterInvalidate(final EntryEvent event) {
+            beforeWrite.accept(event.getKey());
+          }
+
+          @Override public void afterUpdate(final EntryEvent event) {
+            beforeWrite.accept(event.getKey());
+          }
+        });
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
new file mode 100644
index 0000000..0b66f55
--- /dev/null
+++ 
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
+import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class IndexRepositorySpy extends IndexRepositoryFactory {
+
+  private Consumer<Object> beforeWrite = key -> {};
+
+  public static IndexRepositorySpy injectSpy() {
+    IndexRepositorySpy factory = new IndexRepositorySpy();
+    PartitionedRepositoryManager.indexRepositoryFactory = factory;
+    return factory;
+  }
+
+  public static void remove() {
+    PartitionedRepositoryManager.indexRepositoryFactory = new 
IndexRepositoryFactory();
+  }
+
+  private IndexRepositorySpy() {
+  }
+
+  @Override
+  public IndexRepository createIndexRepository(final Integer bucketId,
+                                               final PartitionedRegion 
userRegion,
+                                               final PartitionedRegion 
fileRegion,
+                                               final PartitionedRegion 
chunkRegion,
+                                               final LuceneSerializer 
serializer,
+                                               final Analyzer analyzer,
+                                               final LuceneIndexStats 
indexStats,
+                                               final FileSystemStats 
fileSystemStats)
+    throws IOException
+  {
+    final IndexRepository indexRepo = super.createIndexRepository(bucketId, 
userRegion, fileRegion, chunkRegion,
+      serializer, analyzer,
+      indexStats,
+      fileSystemStats);
+    final IndexRepository spy = Mockito.spy(indexRepo);
+
+    Answer invokeBeforeWrite = invocation -> {
+      beforeWrite.accept(invocation.getArgumentAt(0, Object.class));
+      return invocation.callRealMethod();
+    };
+
+    doAnswer(invokeBeforeWrite).when(spy).update(any(), any());
+    doAnswer(invokeBeforeWrite).when(spy).create(any(), any());
+    doAnswer(invokeBeforeWrite).when(spy).delete(any());
+
+    return spy;
+  }
+
+  /**
+   * Add a callback that runs before a call to
+   * {@link IndexRepository#create(Object, Object)},
+   * {@link IndexRepository#update(Object, Object)} or
+   * {@link IndexRepository#delete(Object)}
+   */
+  public void beforeWriteIndexRepository(Consumer<Object> action) {
+    this.beforeWrite = action;
+  }
+
+  /**
+   * Return a consumer that will invoke the passed in consumer only once
+   */
+  public static <T> Consumer<T> doOnce(Consumer<T> consumer) {
+    return new Consumer<T>() {
+      boolean done;
+
+      @Override
+      public void accept(final T t) {
+        if (!done) {
+          done = true;
+          consumer.accept(t);
+        }
+      }
+    };
+  }
+
+  /**
+   * Return a consumer that will invoke the passed in consumer only after
+   * it has been called exactly N times.
+   */
+  public static <T> Consumer<T> doAfterN(Consumer<T> consumer, int times) {
+    return new Consumer<T>() {
+      int count = 0;
+
+      @Override
+      public void accept(final T t) {
+        if (++count == times) {
+          consumer.accept(t);
+        }
+      }
+    };
+  }
+}

Reply via email to