Repository: geode Updated Branches: refs/heads/develop a6832ee2c -> 480a1e05c
GEODE-2828: AEQ created before the Lucene user regions * AEQ is being created before the Lucene user region * A countdown latch prevents the index repository computation until the user regions are ready * Integration tests do not use a Dummy executor because we need a thread pool for afterPrimary call. This closes #481 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/480a1e05 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/480a1e05 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/480a1e05 Branch: refs/heads/develop Commit: 480a1e05cd5bc332c1e5e2593c3468f640ded1c0 Parents: a6832ee Author: nabarun <n...@pivotal.io> Authored: Tue May 2 15:02:23 2017 -0700 Committer: nabarun <n...@pivotal.io> Committed: Tue May 2 21:53:46 2017 -0700 ---------------------------------------------------------------------- .../internal/LonerDistributionManager.java | 11 ++++- .../internal/offheap/OffHeapRegionBase.java | 31 ++++++++++++-- .../AbstractPartitionedRepositoryManager.java | 18 +++++++- .../lucene/internal/LuceneBucketListener.java | 4 +- .../lucene/internal/LuceneEventListener.java | 4 -- .../LuceneIndexForPartitionedRegion.java | 18 ++++---- .../cache/lucene/internal/LuceneIndexImpl.java | 44 ++++++++++++-------- .../cache/lucene/internal/LuceneRawIndex.java | 10 ++++- .../lucene/internal/LuceneRegionListener.java | 14 ++++++- .../lucene/internal/LuceneServiceImpl.java | 34 +++++++-------- .../internal/LuceneEventListenerJUnitTest.java | 4 -- .../lucene/internal/LuceneIndexFactorySpy.java | 18 -------- .../LuceneIndexForPartitionedRegionTest.java | 22 ++++++---- .../PartitionedRepositoryManagerJUnitTest.java | 2 + .../RawLuceneRepositoryManagerJUnitTest.java | 2 + 15 files changed, 143 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java index e9068e6..fdb6a13 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java @@ -71,7 +71,14 @@ public class LonerDistributionManager implements DM { // no threads needed } - protected void shutdown() {} + protected void shutdown() { + executor.shutdown(); + try { + executor.awaitTermination(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new InternalGemFireError("Interrupted while waiting for DM shutdown"); + } + } private final InternalDistributedMember id; @@ -94,7 +101,7 @@ public class LonerDistributionManager implements DM { private ConcurrentMap<InternalDistributedMember, InternalDistributedMember> canonicalIds = new ConcurrentHashMap(); static private final DummyDMStats stats = new DummyDMStats(); - static private final DummyExecutor executor = new DummyExecutor(); + static private final ExecutorService executor = Executors.newCachedThreadPool(); @Override public long cacheTimeMillis() { http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java index 62766cc..c0c6085 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java @@ -31,12 +31,17 @@ import org.apache.geode.pdx.PdxReader; import org.apache.geode.pdx.PdxSerializable; import org.apache.geode.pdx.PdxWriter; import org.apache.geode.test.dunit.WaitCriterion; +import org.awaitility.Awaitility; +import org.junit.After; import org.junit.Test; +import java.io.File; +import java.io.FilenameFilter; import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; @@ -72,6 +77,22 @@ public abstract class OffHeapRegionBase { return result; } + @After + public void cleanUp() { + File dir = new File("."); + File[] files = dir.listFiles(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + return name.startsWith("BACKUP"); + } + + }); + for (File file : files) { + file.delete(); + } + } + private void closeCache(GemFireCacheImpl gfc, boolean keepOffHeapAllocated) { gfc.close(); if (!keepOffHeapAllocated) { @@ -200,7 +221,8 @@ public abstract class OffHeapRegionBase { gfc.setCopyOnRead(true); final MemoryAllocator ma = gfc.getOffHeapStore(); assertNotNull(ma); - assertEquals(0, ma.getUsedMemory()); + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> assertEquals(0, ma.getUsedMemory())); Compressor compressor = null; if (compressed) { compressor = SnappyCompressor.getDefaultInstance(); @@ -413,7 +435,8 @@ public abstract class OffHeapRegionBase { assertTrue(ma.getUsedMemory() > 0); try { r.clear(); - assertEquals(0, ma.getUsedMemory()); + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> assertEquals(0, ma.getUsedMemory())); } catch (UnsupportedOperationException ok) { } @@ -449,8 +472,8 @@ public abstract class OffHeapRegionBase { } r.destroyRegion(); - assertEquals(0, ma.getUsedMemory()); - + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .until(() -> assertEquals(0, ma.getUsedMemory())); } finally { if (r != null && !r.isDestroyed()) { r.destroyRegion(); http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java index 26bb488..867794d 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.Region; @@ -47,18 +48,22 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository new ConcurrentHashMap<Integer, IndexRepository>(); /** The user region for this index */ - protected final PartitionedRegion userRegion; + protected PartitionedRegion userRegion = null; protected final LuceneSerializer serializer; protected final LuceneIndexImpl index; protected volatile boolean closed; + final private CountDownLatch isDataRegionReady = new CountDownLatch(1); public AbstractPartitionedRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) { this.index = index; - this.userRegion = (PartitionedRegion) index.getCache().getRegion(index.getRegionPath()); this.serializer = serializer; this.closed = false; } + public void setUserRegionForRepositoryManager() { + this.userRegion = (PartitionedRegion) index.getCache().getRegion(index.getRegionPath()); + } + @Override public IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException { @@ -95,6 +100,11 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository IndexRepository oldRepository) throws IOException; protected IndexRepository computeRepository(Integer bucketId) { + try { + isDataRegionReady.await(); + } catch (InterruptedException e) { + throw new InternalGemFireError("Uable to create index repository", e); + } IndexRepository repo = indexRepositories.compute(bucketId, (key, oldRepository) -> { try { if (closed) { @@ -111,6 +121,10 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository return repo; } + protected void allowRepositoryComputation() { + isDataRegionReady.countDown(); + } + /** * Return the repository for a given user bucket */ http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java index 32fb3fc..37871aa 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java @@ -24,10 +24,10 @@ import org.apache.lucene.store.AlreadyClosedException; public class LuceneBucketListener extends PartitionListenerAdapter { private static final Logger logger = LogService.getLogger(); - private PartitionedRepositoryManager lucenePartitionRepositoryManager; + private AbstractPartitionedRepositoryManager lucenePartitionRepositoryManager; private final DM dm; - public LuceneBucketListener(PartitionedRepositoryManager partitionedRepositoryManager, + public LuceneBucketListener(AbstractPartitionedRepositoryManager partitionedRepositoryManager, final DM dm) { lucenePartitionRepositoryManager = partitionedRepositoryManager; this.dm = dm; http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java index c3fa2ff..bc4a7da 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java @@ -27,7 +27,6 @@ import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.logging.log4j.Logger; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.InternalGemFireError; -import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; @@ -36,10 +35,7 @@ import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.internal.cache.BucketNotFoundException; -import org.apache.geode.internal.cache.CacheObserverHolder; import org.apache.geode.internal.cache.PrimaryBucketException; -import org.apache.geode.internal.cache.partitioned.Bucket; -import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.TestHook; import org.apache.geode.internal.logging.LogService; import org.apache.lucene.store.AlreadyClosedException; http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index c39a4a8..41505d7 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -57,6 +57,15 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } protected RepositoryManager createRepositoryManager() { + HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); + PartitionedRepositoryManager partitionedRepositoryManager = + new PartitionedRepositoryManager(this, mapper); + return partitionedRepositoryManager; + } + + protected void createLuceneListenersAndFileChunkRegions( + AbstractPartitionedRepositoryManager partitionedRepositoryManager) { + partitionedRepositoryManager.setUserRegionForRepositoryManager(); RegionShortcut regionShortCut; final boolean withPersistence = withPersistence(); RegionAttributes regionAttributes = dataRegion.getAttributes(); @@ -78,14 +87,6 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { // create PR fileAndChunkRegion, but not to create its buckets for now final String fileRegionName = createFileRegionName(); PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes(); - - - // create PR chunkRegion, but not to create its buckets for now - - // we will create RegionDirectories on the fly when data comes in - HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); - PartitionedRepositoryManager partitionedRepositoryManager = - new PartitionedRepositoryManager(this, mapper); DM dm = this.cache.getInternalDistributedSystem().getDistributionManager(); LuceneBucketListener lucenePrimaryBucketListener = new LuceneBucketListener(partitionedRepositoryManager, dm); @@ -98,7 +99,6 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { fileSystemStats .setBytesSupplier(() -> getFileAndChunkRegion().getPrStats().getDataStoreBytesInUse()); - return partitionedRepositoryManager; } public PartitionedRegion getFileAndChunkRegion() { http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index 36f6720..3393bcf 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -34,7 +34,6 @@ import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.extension.Extension; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -48,6 +47,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { protected final LuceneIndexStats indexStats; protected boolean hasInitialized = false; + protected boolean hasInitializedAEQ = false; protected Map<String, Analyzer> fieldAnalyzers; protected String[] searchableFieldNames; protected RepositoryManager repositoryManager; @@ -131,30 +131,41 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { if (!hasInitialized) { /* create index region */ dataRegion = getDataRegion(); - // assert dataRegion != null; - - repositoryManager = createRepositoryManager(); - - // create AEQ, AEQ listener and specify the listener to repositoryManager - createAEQ(dataRegion); - + createLuceneListenersAndFileChunkRegions( + (AbstractPartitionedRepositoryManager) repositoryManager); addExtension(dataRegion); hasInitialized = true; } } + protected void initializeAEQ(RegionAttributes attributes, String aeqId) { + if (!hasInitializedAEQ) { + repositoryManager = createRepositoryManager(); + createAEQ(attributes, aeqId); + hasInitializedAEQ = true; + } + } + protected abstract RepositoryManager createRepositoryManager(); + protected abstract void createLuceneListenersAndFileChunkRegions( + AbstractPartitionedRepositoryManager partitionedRepositoryManager); + protected AsyncEventQueue createAEQ(Region dataRegion) { - return createAEQ(createAEQFactory(dataRegion)); + String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); + return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId); } - private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion) { + protected AsyncEventQueue createAEQ(RegionAttributes attributes, String aeqId) { + return createAEQ(createAEQFactory(attributes), aeqId); + } + + private AsyncEventQueueFactoryImpl createAEQFactory(final RegionAttributes attributes) { AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); - if (dataRegion instanceof PartitionedRegion) { - PartitionedRegion pr = (PartitionedRegion) dataRegion; - if (pr.getPartitionAttributes().getLocalMaxMemory() == 0) { + if (attributes.getPartitionAttributes() != null) { + + if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) { // accessor will not create AEQ return null; } @@ -165,22 +176,21 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { factory.setMaximumQueueMemory(1000); factory.setDispatcherThreads(10); factory.setIsMetaQueue(true); - if (dataRegion.getAttributes().getDataPolicy().withPersistence()) { + if (attributes.getDataPolicy().withPersistence()) { factory.setPersistent(true); } - factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName()); + factory.setDiskStoreName(attributes.getDiskStoreName()); factory.setDiskSynchronous(true); factory.setForwardExpirationDestroy(true); return factory; } - private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) { + private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String aeqId) { if (factory == null) { return null; } LuceneEventListener listener = new LuceneEventListener(repositoryManager); factory.setGatewayEventSubstitutionListener(new LuceneEventSubstitutionFilter()); - String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); AsyncEventQueue indexQueue = factory.create(aeqId, listener); return indexQueue; } http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java index 75ab5ca..ee2930d 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java @@ -27,7 +27,15 @@ public class LuceneRawIndex extends LuceneIndexImpl { @Override protected RepositoryManager createRepositoryManager() { HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); - return new RawLuceneRepositoryManager(this, mapper); + RawLuceneRepositoryManager rawLuceneRepositoryManager = + new RawLuceneRepositoryManager(this, mapper); + return rawLuceneRepositoryManager; + } + + @Override + protected void createLuceneListenersAndFileChunkRegions( + AbstractPartitionedRepositoryManager partitionedRepositoryManager) { + partitionedRepositoryManager.setUserRegionForRepositoryManager(); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java index f4e2a79..48462a0 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java @@ -23,6 +23,7 @@ import org.apache.geode.cache.EvictionAlgorithm; import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.RegionListener; @@ -43,6 +44,8 @@ public class LuceneRegionListener implements RegionListener { private final String[] fields; + private LuceneIndexImpl luceneIndex; + public LuceneRegionListener(LuceneServiceImpl service, InternalCache cache, String indexName, String regionPath, String[] fields, Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers) { this.service = service; @@ -97,6 +100,9 @@ public class LuceneRegionListener implements RegionListener { internalRegionArgs.addCacheServiceProfile(new LuceneIndexCreationProfile(this.indexName, this.regionPath, this.fields, this.analyzer, this.fieldAnalyzers)); + luceneIndex = this.service.beforeDataRegionCreated(this.indexName, this.regionPath, attrs, + this.analyzer, this.fieldAnalyzers, aeqId, this.fields); + // Add internal async event id internalRegionArgs.addInternalAsyncEventQueueId(aeqId); } @@ -106,8 +112,12 @@ public class LuceneRegionListener implements RegionListener { @Override public void afterCreate(Region region) { if (region.getFullPath().equals(this.regionPath)) { - this.service.afterDataRegionCreated(this.indexName, this.analyzer, this.regionPath, - this.fieldAnalyzers, this.fields); + this.service.afterDataRegionCreated(this.luceneIndex); + String aeqId = LuceneServiceImpl.getUniqueIndexName(this.indexName, this.regionPath); + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); + AbstractPartitionedRepositoryManager repositoryManager = + (AbstractPartitionedRepositoryManager) luceneIndex.getRepositoryManager(); + repositoryManager.allowRepositoryComputation(); this.cache.removeRegionListener(this); } } http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 437a552..ebee59e 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -31,10 +31,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.Cache; -import org.apache.geode.cache.EvictionAlgorithm; -import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.execute.Execution; @@ -57,7 +54,6 @@ import org.apache.geode.internal.DSFIDFactory; import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.cache.extension.Extensible; import org.apache.geode.internal.cache.CacheService; -import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.RegionListener; import org.apache.geode.internal.cache.xmlcache.XmlGenerator; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -167,28 +163,28 @@ public class LuceneServiceImpl implements InternalLuceneService { * * Public because this is called by the Xml parsing code */ - public void afterDataRegionCreated(final String indexName, final Analyzer analyzer, - final String dataRegionPath, final Map<String, Analyzer> fieldAnalyzers, - final String... fields) { - LuceneIndexImpl index = createIndexRegions(indexName, dataRegionPath); - index.setSearchableFields(fields); - index.setAnalyzer(analyzer); - index.setFieldAnalyzers(fieldAnalyzers); + public void afterDataRegionCreated(LuceneIndexImpl index) { index.initialize(); registerIndex(index); if (this.managementListener != null) { this.managementListener.afterIndexCreated(index); } + } - private LuceneIndexImpl createIndexRegions(String indexName, String regionPath) { - Region dataregion = this.cache.getRegion(regionPath); - if (dataregion == null) { - logger.info("Data region " + regionPath + " not found"); - return null; - } - // Convert the region name into a canonical form - regionPath = dataregion.getFullPath(); + public LuceneIndexImpl beforeDataRegionCreated(final String indexName, final String regionPath, + RegionAttributes attributes, final Analyzer analyzer, + final Map<String, Analyzer> fieldAnalyzers, String aeqId, final String... fields) { + LuceneIndexImpl index = createIndexObject(indexName, regionPath); + index.setSearchableFields(fields); + index.setAnalyzer(analyzer); + index.setFieldAnalyzers(fieldAnalyzers); + index.initializeAEQ(attributes, aeqId); + return index; + + } + + private LuceneIndexImpl createIndexObject(String indexName, String regionPath) { return luceneIndexFactory.create(indexName, regionPath, cache); } http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java index 801f6b6..88057e5 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java @@ -67,9 +67,7 @@ public class LuceneEventListenerJUnitTest { Mockito.when(manager.getRepository(eq(region1), any(), eq(callback1))).thenReturn(repo1); Mockito.when(manager.getRepository(eq(region2), any(), eq(null))).thenReturn(repo2); - LuceneEventListener listener = new LuceneEventListener(manager); - List<AsyncEvent> events = new ArrayList<AsyncEvent>(); int numEntries = 100; @@ -115,7 +113,6 @@ public class LuceneEventListenerJUnitTest { Logger log = Mockito.mock(Logger.class); Mockito.when(manager.getRepository(any(), any(), any())) .thenThrow(BucketNotFoundException.class); - LuceneEventListener listener = new LuceneEventListener(manager); listener.logger = log; AsyncEvent event = Mockito.mock(AsyncEvent.class); @@ -128,7 +125,6 @@ public class LuceneEventListenerJUnitTest { public void shouldThrowAndCaptureIOException() throws BucketNotFoundException { RepositoryManager manager = Mockito.mock(RepositoryManager.class); Mockito.when(manager.getRepository(any(), any(), any())).thenThrow(IOException.class); - AtomicReference<Throwable> lastException = new AtomicReference<>(); LuceneEventListener.setExceptionObserver(lastException::set); LuceneEventListener listener = new LuceneEventListener(manager); http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java index 8b379a5..1a092d7 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java @@ -15,15 +15,11 @@ package org.apache.geode.cache.lucene.internal; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; import java.util.function.Consumer; import org.mockito.Mockito; -import org.mockito.stubbing.Answer; -import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; -import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.InternalCache; public class LuceneIndexFactorySpy extends LuceneIndexImplFactory { @@ -59,19 +55,5 @@ public class LuceneIndexFactorySpy extends LuceneIndexImplFactory { super(indexName, regionPath, cache); } - @Override - public RepositoryManager createRepositoryManager() { - RepositoryManager repositoryManagerSpy = Mockito.spy(super.createRepositoryManager()); - Answer getRepositoryAnswer = invocation -> { - getRepositoryConsumer.accept(invocation.getArgumentAt(0, Object.class)); - return invocation.callRealMethod(); - }; - try { - doAnswer(getRepositoryAnswer).when(repositoryManagerSpy).getRepositories(any()); - } catch (BucketNotFoundException e) { - e.printStackTrace(); - } - return repositoryManagerSpy; - } } } http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java index 8e4c179..b2fdd84 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java @@ -194,7 +194,7 @@ public class LuceneIndexForPartitionedRegionTest { Region region = initializeScenario(withPersistence, regionPath, cache, 0); LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, cache); - LuceneIndexForPartitionedRegion spy = setupSpy(region, index); + LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); spy.initialize(); } @@ -208,17 +208,18 @@ public class LuceneIndexForPartitionedRegionTest { LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, cache); - LuceneIndexForPartitionedRegion spy = setupSpy(region, index); + LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); - verify(spy).createAEQ(eq(region)); + verify(spy).createAEQ(eq(region.getAttributes()), eq("aeq")); } protected LuceneIndexForPartitionedRegion setupSpy(final Region region, - final LuceneIndexForPartitionedRegion index) { + final LuceneIndexForPartitionedRegion index, final String aeq) { index.setSearchableFields(new String[] {"field"}); LuceneIndexForPartitionedRegion spy = spy(index); doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(), any()); - doReturn(null).when(spy).createAEQ(eq(region)); + doReturn(null).when(spy).createAEQ(any(), any()); + spy.initializeAEQ(region.getAttributes(), aeq); spy.initialize(); return spy; } @@ -233,7 +234,7 @@ public class LuceneIndexForPartitionedRegionTest { LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, cache); - LuceneIndexForPartitionedRegion spy = setupSpy(region, index); + LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); verify(spy).createFileRegion(eq(RegionShortcut.PARTITION), eq(index.createFileRegionName()), any(), any(), any()); @@ -272,7 +273,8 @@ public class LuceneIndexForPartitionedRegionTest { index.setSearchableFields(new String[] {"field"}); LuceneIndexForPartitionedRegion spy = spy(index); doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(), any()); - doReturn(null).when(spy).createAEQ(any()); + doReturn(null).when(spy).createAEQ((RegionAttributes) any(), any()); + spy.initializeAEQ(any(), any()); spy.initialize(); verify(spy).createFileRegion(eq(RegionShortcut.PARTITION_PERSISTENT), @@ -292,7 +294,8 @@ public class LuceneIndexForPartitionedRegionTest { index.setSearchableFields(new String[] {"field"}); LuceneIndexForPartitionedRegion spy = spy(index); doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(), any()); - doReturn(null).when(spy).createAEQ(any()); + doReturn(null).when(spy).createAEQ(any(), any()); + spy.initializeAEQ(any(), any()); spy.initialize(); spy.initialize(); @@ -316,7 +319,8 @@ public class LuceneIndexForPartitionedRegionTest { new LuceneIndexForPartitionedRegion(name, regionPath, cache); index = spy(index); when(index.getFieldNames()).thenReturn(fields); - doReturn(aeq).when(index).createAEQ(any()); + doReturn(aeq).when(index).createAEQ(any(), any()); + index.initializeAEQ(cache.getRegionAttributes(regionPath), aeq.getId()); index.initialize(); PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath); ResultCollector collector = mock(ResultCollector.class); http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index 87317cc..30e64f2 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -113,6 +113,8 @@ public class PartitionedRepositoryManagerJUnitTest { when(indexForPR.getCache()).thenReturn(cache); when(indexForPR.getRegionPath()).thenReturn("/testRegion"); repoManager = new PartitionedRepositoryManager(indexForPR, serializer); + repoManager.setUserRegionForRepositoryManager(); + repoManager.allowRepositoryComputation(); } @Test http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java index bca7085..df31bb9 100755 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java @@ -78,6 +78,8 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa when(indexForPR.getRegionPath()).thenReturn("/testRegion"); when(indexForPR.withPersistence()).thenReturn(true); repoManager = new RawLuceneRepositoryManager(indexForPR, serializer); + repoManager.setUserRegionForRepositoryManager(); + repoManager.allowRepositoryComputation(); } @Test