GEODE-11 Adding tests of closing a cache during index updates Testing failover of lucene indexes by closing the cache while in the middle of updating lucene indexes. Currently there are tests for closing the cache before the index repository commit, and also during the commit after a fixed number of updates to the underling index data regions.
I refactored the lucene tests to use 7 buckets, rather than 113, so they take less time and are easier to debug. I also removed a call to Thread.interrupt in the WAN code because it was interrupting itself in my callback. We should never be using interrupt in the product. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4a0de723 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4a0de723 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4a0de723 Branch: refs/heads/master Commit: 4a0de723ce07977a7965b5e7a1bf57d15ec9456a Parents: 23211dd Author: Dan Smith <[email protected]> Authored: Thu Jul 7 10:02:25 2016 -0700 Committer: Dan Smith <[email protected]> Committed: Wed Jul 13 15:27:07 2016 -0700 ---------------------------------------------------------------------- .../AbstractGatewaySenderEventProcessor.java | 1 - ...rentParallelGatewaySenderEventProcessor.java | 2 +- .../lucene/internal/IndexRepositoryFactory.java | 2 +- .../cache/lucene/LuceneQueriesPRBase.java | 140 +++++-------------- .../lucene/LuceneQueriesPeerPRDUnitTest.java | 5 +- .../LuceneQueriesPeerPROverflowDUnitTest.java | 1 + .../LuceneQueriesPeerPRRedundancyDUnitTest.java | 53 ++++++- .../cache/lucene/test/IndexRegionSpy.java | 82 +++++++++++ .../cache/lucene/test/IndexRepositorySpy.java | 130 +++++++++++++++++ 9 files changed, 297 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 0e83557..b7b1ffc 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -1155,7 +1155,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { dispatcher.stop(); if (this.isAlive()) { - this.interrupt(); if (logger.isDebugEnabled()) { logger.debug("{}: Joining with the dispatcher thread upto limit of 5 seconds", this); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 82a53d3..04015f7 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -273,7 +273,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew } } } catch (InterruptedException e) { - throw new InternalGemFireException(e.getMessage()); + throw new InternalGemFireException(e); } catch (RejectedExecutionException rejectedExecutionEx) { throw rejectedExecutionEx; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java index 12f12ad..ae4b88b 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java @@ -62,7 +62,7 @@ public class IndexRepositoryFactory { /** * Find the bucket in region2 that matches the bucket id from region1. */ - private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) { + protected BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) { //Force the bucket to be created if it is not already region.getOrCreateNodeForBucketWrite(bucketId, null); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java index 889b16f..1de600d 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java @@ -19,38 +19,26 @@ package com.gemstone.gemfire.cache.lucene; +import static com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy.doOnce; import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*; import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.function.Consumer; import java.util.stream.IntStream; -import org.apache.lucene.analysis.Analyzer; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.stubbing.Answer; import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.PartitionAttributes; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.control.RebalanceOperation; import com.gemstone.gemfire.cache.control.RebalanceResults; -import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory; -import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats; -import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager; -import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; -import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; -import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy; import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.internal.cache.BucketNotFoundException; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.VM; @@ -61,6 +49,7 @@ import com.gemstone.gemfire.test.dunit.VM; * */ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { + protected static final int NUM_BUCKETS = 7; @After public void cleanupRebalanceCallback() { @@ -68,8 +57,6 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { removeCallback(dataStore2); } - - @Test public void returnCorrectResultsWhenRebalanceHappensOnIndexUpdate() throws InterruptedException { addCallbackToTriggerRebalance(dataStore1); @@ -95,46 +82,54 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { putEntriesAndValidateQueryResults(); } - protected void putEntriesAndValidateQueryResults() { + @Test + public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws InterruptedException { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); }; dataStore1.invoke(() -> initDataStore(createIndex)); accessor.invoke(() -> initAccessor(createIndex)); - dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); - put113Entries(); + putEntryInEachBucket(); dataStore2.invoke(() -> initDataStore(createIndex)); - dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); + assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000)); - assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000)); + rebalanceRegion(dataStore2); - executeTextSearch(accessor, "world", "text", 113); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); } @Test - public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws InterruptedException { + public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() throws InterruptedException { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); }; dataStore1.invoke(() -> initDataStore(createIndex)); accessor.invoke(() -> initAccessor(createIndex)); + dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); - put113Entries(); + putEntryInEachBucket(); dataStore2.invoke(() -> initDataStore(createIndex)); + rebalanceRegion(dataStore2); + dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); + assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000)); - rebalanceRegion(dataStore2); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); + } - executeTextSearch(accessor, "world", "text", 113); + protected PartitionAttributes getPartitionAttributes() { + PartitionAttributesFactory factory = new PartitionAttributesFactory(); + factory.setLocalMaxMemory(100); + factory.setTotalNumBuckets(NUM_BUCKETS); + return factory.create(); } - @Test - public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() throws InterruptedException { + protected void putEntriesAndValidateQueryResults() { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); @@ -143,22 +138,21 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { accessor.invoke(() -> initAccessor(createIndex)); dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); - put113Entries(); + putEntryInEachBucket(); dataStore2.invoke(() -> initDataStore(createIndex)); - rebalanceRegion(dataStore2); dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); - assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000)); + assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000)); - executeTextSearch(accessor, "world", "text", 113); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); } - protected void put113Entries() { + protected void putEntryInEachBucket() { accessor.invoke(() -> { final Cache cache = getCache(); Region<Object, Object> region = cache.getRegion(REGION_NAME); - IntStream.range(0,113).forEach(i -> region.put(i, new TestObject("hello world"))); + IntStream.range(0,NUM_BUCKETS).forEach(i -> region.put(i, new TestObject("hello world"))); }); } @@ -166,7 +160,7 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { vm.invoke(() -> { IndexRepositorySpy spy = IndexRepositorySpy.injectSpy(); - spy.beforeWrite(doOnce(key -> rebalanceRegion(vm))); + spy.beforeWriteIndexRepository(doOnce(key -> rebalanceRegion(vm))); }); } @@ -174,7 +168,7 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { vm.invoke(() -> { IndexRepositorySpy spy = IndexRepositorySpy.injectSpy(); - spy.beforeWrite(doOnce(key -> moveBucket(destination, key))); + spy.beforeWriteIndexRepository(doOnce(key -> moveBucket(destination, key))); }); } @@ -196,72 +190,6 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { }); } - protected static class IndexRepositorySpy extends IndexRepositoryFactory { - - private Consumer<Object> beforeWrite = key -> {}; - - public static IndexRepositorySpy injectSpy() { - IndexRepositorySpy factory = new IndexRepositorySpy(); - PartitionedRepositoryManager.indexRepositoryFactory = factory; - return factory; - } - - public static void remove() { - PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory(); - } - - private IndexRepositorySpy() { - } - - @Override - public IndexRepository createIndexRepository(final Integer bucketId, - final PartitionedRegion userRegion, - final PartitionedRegion fileRegion, - final PartitionedRegion chunkRegion, - final LuceneSerializer serializer, - final Analyzer analyzer, - final LuceneIndexStats indexStats, - final FileSystemStats fileSystemStats) - throws IOException - { - final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, - serializer, analyzer, - indexStats, - fileSystemStats); - final IndexRepository spy = Mockito.spy(indexRepo); - - Answer invokeBeforeWrite = invocation -> { - beforeWrite.accept(invocation.getArgumentAt(0, Object.class)); - invocation.callRealMethod(); - return null; - }; - doAnswer(invokeBeforeWrite).when(spy).update(any(), any()); - doAnswer(invokeBeforeWrite).when(spy).create(any(), any()); - doAnswer(invokeBeforeWrite).when(spy).delete(any()); - - return spy; - } - - /** - * Add a callback that runs before a call to - * {@link IndexRepository#create(Object, Object)} - */ - public void beforeWrite(Consumer<Object> action) { - this.beforeWrite = action; - } - } - - protected static <T> Consumer<T> doOnce(Consumer<T> consumer) { - return new Consumer<T>() { - boolean done; - - @Override - public void accept(final T t) { - if (!done) { - done = true; - consumer.accept(t); - } - } - }; - }; + ; + ; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java index 00b8254..702ac1f 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java @@ -18,7 +18,6 @@ package com.gemstone.gemfire.cache.lucene; import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.RegionShortcut; import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.junit.categories.DistributedTest; @@ -30,10 +29,8 @@ public class LuceneQueriesPeerPRDUnitTest extends LuceneQueriesPRBase { @Override protected void initDataStore(final SerializableRunnableIF createIndex) throws Exception { createIndex.run(); - PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); - partitionAttributesFactory.setLocalMaxMemory(100); getCache().createRegionFactory(RegionShortcut.PARTITION) - .setPartitionAttributes(partitionAttributesFactory.create()) + .setPartitionAttributes(getPartitionAttributes()) .create(REGION_NAME); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java index 8fd0a08..3ea561a 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPROverflowDUnitTest.java @@ -33,6 +33,7 @@ public class LuceneQueriesPeerPROverflowDUnitTest extends LuceneQueriesPRBase { createIndex.run(); EvictionAttributes evicAttr = EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK); getCache().createRegionFactory(RegionShortcut.PARTITION_OVERFLOW) + .setPartitionAttributes(getPartitionAttributes()) .setEvictionAttributes(evicAttr) .create(REGION_NAME); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java index 0a7bb67..dc86a7c 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java @@ -16,13 +16,18 @@ */ package com.gemstone.gemfire.cache.lucene; +import static com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy.*; import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*; import static org.junit.Assert.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.lucene.test.IndexRegionSpy; +import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy; import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities; -import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.internal.cache.PartitionedRegion; @@ -31,6 +36,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.VM; import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.jayway.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -40,7 +46,9 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase @Override protected void initDataStore(final SerializableRunnableIF createIndex) throws Exception { createIndex.run(); - getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT).create(REGION_NAME); + Region region = getCache().createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) + .setPartitionAttributes(getPartitionAttributes()) + .create(REGION_NAME); } @Override protected void initAccessor(final SerializableRunnableIF createIndex) throws Exception { @@ -52,6 +60,37 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember()); addCallbackToMovePrimary(dataStore1, member2); + putEntriesAndValidateResultsWithRedundancy(); + } + + @Test + public void returnCorrectResultsWhenCloseCacheHappensOnIndexUpdate() throws InterruptedException { + dataStore1.invoke(() -> { + IndexRepositorySpy spy = IndexRepositorySpy.injectSpy(); + + spy.beforeWriteIndexRepository(doAfterN(key -> getCache().close(), 2)); + }); + + putEntriesAndValidateResultsWithRedundancy(); + + //Wait until the cache is closed in datastore1 + dataStore1.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(basicGetCache()::isClosed)); + } + + @Test + public void returnCorrectResultsWhenCloseCacheHappensOnPartialIndexWrite() throws InterruptedException { + final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember()); + dataStore1.invoke(() -> { + IndexRegionSpy.beforeWrite(getCache(), doAfterN(key -> getCache().close(), 100)); + }); + + putEntriesAndValidateResultsWithRedundancy(); + + //Wait until the cache is closed in datastore1 + dataStore1.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(basicGetCache()::isClosed)); + } + + private void putEntriesAndValidateResultsWithRedundancy() { SerializableRunnableIF createIndex = () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); @@ -60,21 +99,23 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase dataStore2.invoke(() -> initDataStore(createIndex)); accessor.invoke(() -> initAccessor(createIndex)); dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); + dataStore2.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); - put113Entries(); + putEntryInEachBucket(); dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); + dataStore2.invoke(() -> LuceneTestUtilities.resumeSender(getCache())); - assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000)); + assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore2, 60000)); - executeTextSearch(accessor, "world", "text", 113); + executeTextSearch(accessor, "world", "text", NUM_BUCKETS); } protected void addCallbackToMovePrimary(VM vm, final DistributedMember destination) { vm.invoke(() -> { IndexRepositorySpy spy = IndexRepositorySpy.injectSpy(); - spy.beforeWrite(doOnce(key -> moveBucket(destination, key))); + spy.beforeWriteIndexRepository(doOnce(key -> moveBucket(destination, key))); }); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java new file mode 100644 index 0000000..1e16707 --- /dev/null +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRegionSpy.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.test; + +import java.util.function.Consumer; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.EntryEvent; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.util.CacheListenerAdapter; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.InternalRegionArguments; +import com.gemstone.gemfire.internal.cache.RegionListener; + +/** + * Allows spying on operations that happen to an the regions underlying a lucene index. + */ +public class IndexRegionSpy { + + public static void beforeWrite(Cache cache, final Consumer<Object> beforeWrite) { + GemFireCacheImpl gemfireCache = (GemFireCacheImpl) cache; + gemfireCache.addRegionListener(new SpyRegionListener(beforeWrite)); + } + + private static class SpyRegionListener implements RegionListener { + + private final Consumer<Object> beforeWrite; + + public SpyRegionListener(final Consumer<Object> beforeWrite) { + this.beforeWrite = beforeWrite; + } + + @Override + public RegionAttributes beforeCreate(final Region parent, + final String regionName, + final RegionAttributes attrs, + final InternalRegionArguments internalRegionArgs) + { + return attrs; + } + + @Override public void afterCreate(final Region region) { + if(region.getName().contains(".files") || region.getName().contains(".chunks")) { + region.getAttributesMutator().addCacheListener(new CacheListenerAdapter() { + @Override public void afterCreate(final EntryEvent event) { + beforeWrite.accept(event.getKey()); + } + + @Override public void afterDestroy(final EntryEvent event) { + beforeWrite.accept(event.getKey()); + } + + @Override public void afterInvalidate(final EntryEvent event) { + beforeWrite.accept(event.getKey()); + } + + @Override public void afterUpdate(final EntryEvent event) { + beforeWrite.accept(event.getKey()); + } + }); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4a0de723/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java new file mode 100644 index 0000000..0b66f55 --- /dev/null +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.function.Consumer; + +import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats; +import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; + +import org.apache.lucene.analysis.Analyzer; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class IndexRepositorySpy extends IndexRepositoryFactory { + + private Consumer<Object> beforeWrite = key -> {}; + + public static IndexRepositorySpy injectSpy() { + IndexRepositorySpy factory = new IndexRepositorySpy(); + PartitionedRepositoryManager.indexRepositoryFactory = factory; + return factory; + } + + public static void remove() { + PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory(); + } + + private IndexRepositorySpy() { + } + + @Override + public IndexRepository createIndexRepository(final Integer bucketId, + final PartitionedRegion userRegion, + final PartitionedRegion fileRegion, + final PartitionedRegion chunkRegion, + final LuceneSerializer serializer, + final Analyzer analyzer, + final LuceneIndexStats indexStats, + final FileSystemStats fileSystemStats) + throws IOException + { + final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, + serializer, analyzer, + indexStats, + fileSystemStats); + final IndexRepository spy = Mockito.spy(indexRepo); + + Answer invokeBeforeWrite = invocation -> { + beforeWrite.accept(invocation.getArgumentAt(0, Object.class)); + return invocation.callRealMethod(); + }; + + doAnswer(invokeBeforeWrite).when(spy).update(any(), any()); + doAnswer(invokeBeforeWrite).when(spy).create(any(), any()); + doAnswer(invokeBeforeWrite).when(spy).delete(any()); + + return spy; + } + + /** + * Add a callback that runs before a call to + * {@link IndexRepository#create(Object, Object)}, + * {@link IndexRepository#update(Object, Object)} or + * {@link IndexRepository#delete(Object)} + */ + public void beforeWriteIndexRepository(Consumer<Object> action) { + this.beforeWrite = action; + } + + /** + * Return a consumer that will invoke the passed in consumer only once + */ + public static <T> Consumer<T> doOnce(Consumer<T> consumer) { + return new Consumer<T>() { + boolean done; + + @Override + public void accept(final T t) { + if (!done) { + done = true; + consumer.accept(t); + } + } + }; + } + + /** + * Return a consumer that will invoke the passed in consumer only after + * it has been called exactly N times. + */ + public static <T> Consumer<T> doAfterN(Consumer<T> consumer, int times) { + return new Consumer<T>() { + int count = 0; + + @Override + public void accept(final T t) { + if (++count == times) { + consumer.accept(t); + } + } + }; + } +}
