GEODE-11: create index repository using raw Lucene directory. GEODE-11: add RawDirectory using index instance
GEODE-11: add abstract class for index and repositoryManager remove commented lines in test code Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/746820bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/746820bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/746820bb Branch: refs/heads/feature/GEODE-420 Commit: 746820bb18fe3f3ab7bd6b00f847821d934bbf09 Parents: 07798ca Author: zhouxh <[email protected]> Authored: Fri Jul 15 15:57:07 2016 -0700 Committer: zhouxh <[email protected]> Committed: Wed Aug 31 17:09:08 2016 -0700 ---------------------------------------------------------------------- .../AbstractPartitionedRepositoryManager.java | 124 +++++++++++++++++ .../lucene/internal/IndexRepositoryFactory.java | 21 ++- .../lucene/internal/LuceneIndexFactory.java | 30 +++++ .../LuceneIndexForPartitionedRegion.java | 134 +++++++------------ .../cache/lucene/internal/LuceneIndexImpl.java | 79 +++++++++-- .../cache/lucene/internal/LuceneRawIndex.java | 43 ++++++ .../lucene/internal/LuceneRawIndexFactory.java | 27 ++++ .../lucene/internal/LuceneServiceImpl.java | 3 +- .../internal/PartitionedRepositoryManager.java | 123 ++--------------- .../internal/RawIndexRepositoryFactory.java | 63 +++++++++ .../internal/RawLuceneRepositoryManager.java | 46 +++++++ .../repository/IndexRepositoryImpl.java | 6 +- .../LuceneIndexCreationIntegrationTest.java | 29 ++++ .../cache/lucene/LuceneQueriesPRBase.java | 7 +- .../LuceneIndexForPartitionedRegionTest.java | 34 ++++- .../LuceneIndexRecoveryHAIntegrationTest.java | 19 +-- .../PartitionedRepositoryManagerJUnitTest.java | 68 +++++----- .../RawLuceneRepositoryManagerJUnitTest.java | 97 ++++++++++++++ .../DistributedScoringJUnitTest.java | 2 +- .../IndexRepositoryImplJUnitTest.java | 2 +- .../IndexRepositoryImplPerformanceTest.java | 2 +- .../cache/lucene/test/IndexRepositorySpy.java | 20 +-- 22 files changed, 689 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java new file mode 100755 index 0000000..1dc716c --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/AbstractPartitionedRepositoryManager.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.execute.RegionFunctionContext; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; + +public abstract class AbstractPartitionedRepositoryManager implements RepositoryManager { + + /** map of the parent bucket region to the index repository + * + * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to + * return a stale index repository. If a bucket moves off of this node and + * comes back, it will have a new BucketRegion object. + * + * It is weak so that the old BucketRegion will be garbage collected. + */ + protected final ConcurrentHashMap<Integer, IndexRepository> indexRepositories = new ConcurrentHashMap<Integer, IndexRepository>(); + + /** The user region for this index */ + protected final PartitionedRegion userRegion; + protected final LuceneSerializer serializer; + protected final LuceneIndexImpl index; + + public AbstractPartitionedRepositoryManager( + LuceneIndexImpl index, + LuceneSerializer serializer) { + this.index = index; + this.userRegion = (PartitionedRegion)index.getCache().getRegion(index.getRegionPath()); + this.serializer = serializer; + } + + @Override + public IndexRepository getRepository(Region region, Object key, + Object callbackArg) throws BucketNotFoundException { + BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg); + if(userBucket == null) { + throw new BucketNotFoundException("User bucket was not found for region " + region + "key " + key + " callbackarg " + callbackArg); + } + + return getRepository(userBucket.getId()); + } + + @Override + public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx) throws BucketNotFoundException { + Region<Object, Object> region = ctx.getDataSet(); + Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region); + ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size()); + for(Integer bucketId : buckets) { + BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId); + if(userBucket == null) { + throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId); + } else { + repos.add(getRepository(userBucket.getId())); + } + } + + return repos; + } + + public abstract IndexRepository createOneIndexRepository(final Integer bucketId, + LuceneSerializer serializer, + LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException; + + /** + * Return the repository for a given user bucket + */ + protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException { + IndexRepository repo = indexRepositories.get(bucketId); + if(repo != null && !repo.isClosed()) { + return repo; + } + + repo = indexRepositories.compute(bucketId, (key, oldRepository) -> { + if(oldRepository != null && !oldRepository.isClosed()) { + return oldRepository; + } + if(oldRepository != null) { + oldRepository.cleanup(); + } + + try { + return createOneIndexRepository(bucketId, serializer, index, userRegion); + } catch(IOException e) { + throw new InternalGemFireError("Unable to create index repository", e); + } + + }); + + if(repo == null) { + throw new BucketNotFoundException("Colocated index buckets not found for bucket id " + bucketId); + } + + return repo; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java index ae4b88b..e6f01b0 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java @@ -19,7 +19,6 @@ package com.gemstone.gemfire.cache.lucene.internal; import java.io.IOException; import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; -import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; @@ -27,7 +26,6 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -37,25 +35,22 @@ public class IndexRepositoryFactory { } public IndexRepository createIndexRepository(final Integer bucketId, - PartitionedRegion userRegion, - PartitionedRegion fileRegion, - PartitionedRegion chunkRegion, LuceneSerializer serializer, - Analyzer analyzer, - LuceneIndexStats indexStats, - FileSystemStats fileSystemStats) + LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException { final IndexRepository repo; - BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId); - BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId); + LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion)index; + BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId); + BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId); + BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); if(fileBucket == null || chunkBucket == null) { return null; } - RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats); - IndexWriterConfig config = new IndexWriterConfig(analyzer); + RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats()); + IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); IndexWriter writer = new IndexWriter(dir, config); - repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats); + repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(), dataBucket); return repo; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java new file mode 100755 index 0000000..b6ac867 --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; + +public class LuceneIndexFactory { + public LuceneIndexFactory() { + } + + public LuceneIndexImpl create(String indexName, String regionPath, GemFireCacheImpl cache) { + return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index af05e7d..b64e026 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -35,109 +35,77 @@ import com.gemstone.gemfire.cache.lucene.internal.directory.DumpDirectoryFiles; import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; +import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; /* wrapper of IndexWriter */ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { + protected Region<String, File> fileRegion; + protected Region<ChunkKey, byte[]> chunkRegion; + protected final FileSystemStats fileSystemStats; public LuceneIndexForPartitionedRegion(String indexName, String regionPath, Cache cache) { super(indexName, regionPath, cache); + + final String statsName = indexName + "-" + regionPath; + this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName); } - @Override - public void initialize() { - if (!hasInitialized) { - /* create index region */ - PartitionedRegion dataRegion = getDataRegion(); - //assert dataRegion != null; - RegionAttributes regionAttributes = dataRegion.getAttributes(); - DataPolicy dp = regionAttributes.getDataPolicy(); - final boolean withPersistence = dp.withPersistence(); - final boolean withStorage = regionAttributes.getPartitionAttributes().getLocalMaxMemory()>0; - RegionShortcut regionShortCut; - if (withPersistence) { - // TODO: add PartitionedRegionAttributes instead - regionShortCut = RegionShortcut.PARTITION_PERSISTENT; - } else { - regionShortCut = RegionShortcut.PARTITION; - } - - // TODO: 1) dataRegion should be withStorage - // 2) Persistence to Persistence - // 3) Replicate to Replicate, Partition To Partition - // 4) Offheap to Offheap - if (!withStorage) { - throw new IllegalStateException("The data region to create lucene index should be with storage"); - } - - // create PR fileRegion, but not to create its buckets for now - final String fileRegionName = createFileRegionName(); - PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes(); - if (!fileRegionExists(fileRegionName)) { - fileRegion = createFileRegion(regionShortCut, fileRegionName, partitionAttributes, regionAttributes); - } - - // create PR chunkRegion, but not to create its buckets for now - final String chunkRegionName = createChunkRegionName(); - if (!chunkRegionExists(chunkRegionName)) { - chunkRegion = createChunkRegion(regionShortCut, fileRegionName, partitionAttributes, chunkRegionName, regionAttributes); - } - fileSystemStats.setFileSupplier(() -> (int) getFileRegion().getLocalSize()); - fileSystemStats.setChunkSupplier(() -> (int) getChunkRegion().getLocalSize()); - fileSystemStats.setBytesSupplier(() -> getChunkRegion().getPrStats().getDataStoreBytesInUse()); - - // we will create RegionDirectories on the fly when data comes in - HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); - repositoryManager = new PartitionedRepositoryManager(dataRegion, (PartitionedRegion) fileRegion, - (PartitionedRegion) chunkRegion, mapper, analyzer, this.indexStats, this.fileSystemStats); - - // create AEQ, AEQ listener and specify the listener to repositoryManager - createAEQ(dataRegion); - - addExtension(dataRegion); - hasInitialized = true; + protected RepositoryManager createRepositoryManager() { + RegionShortcut regionShortCut; + final boolean withPersistence = withPersistence(); + RegionAttributes regionAttributes = dataRegion.getAttributes(); + final boolean withStorage = regionAttributes.getPartitionAttributes().getLocalMaxMemory()>0; + + // TODO: 1) dataRegion should be withStorage + // 2) Persistence to Persistence + // 3) Replicate to Replicate, Partition To Partition + // 4) Offheap to Offheap + if (!withStorage) { + throw new IllegalStateException("The data region to create lucene index should be with storage"); + } + if (withPersistence) { + // TODO: add PartitionedRegionAttributes instead + regionShortCut = RegionShortcut.PARTITION_PERSISTENT; + } else { + regionShortCut = RegionShortcut.PARTITION; + } + + // create PR fileRegion, but not to create its buckets for now + final String fileRegionName = createFileRegionName(); + PartitionAttributes partitionAttributes = dataRegion.getPartitionAttributes(); + if (!fileRegionExists(fileRegionName)) { + fileRegion = createFileRegion(regionShortCut, fileRegionName, partitionAttributes, regionAttributes); } - } - private PartitionedRegion getDataRegion() { - return (PartitionedRegion) cache.getRegion(regionPath); - } + // create PR chunkRegion, but not to create its buckets for now + final String chunkRegionName = createChunkRegionName(); + if (!chunkRegionExists(chunkRegionName)) { + chunkRegion = createChunkRegion(regionShortCut, fileRegionName, partitionAttributes, chunkRegionName, regionAttributes); + } + fileSystemStats.setFileSupplier(() -> (int) getFileRegion().getLocalSize()); + fileSystemStats.setChunkSupplier(() -> (int) getChunkRegion().getLocalSize()); + fileSystemStats.setBytesSupplier(() -> getChunkRegion().getPrStats().getDataStoreBytesInUse()); - private PartitionedRegion getFileRegion() { + // we will create RegionDirectories on the fly when data comes in + HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); + return new PartitionedRepositoryManager(this, mapper); + } + + public PartitionedRegion getFileRegion() { return (PartitionedRegion) fileRegion; } - private PartitionedRegion getChunkRegion() { + public PartitionedRegion getChunkRegion() { return (PartitionedRegion) chunkRegion; } - private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion) { - AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); - factory.setParallel(true); // parallel AEQ for PR - factory.setMaximumQueueMemory(1000); - factory.setDispatcherThreads(1); - factory.setIsMetaQueue(true); - if(dataRegion.getAttributes().getDataPolicy().withPersistence()) { - factory.setPersistent(true); - } - factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName()); - factory.setDiskSynchronous(dataRegion.getAttributes().isDiskSynchronous()); - factory.setForwardExpirationDestroy(true); - return factory; - } - - AsyncEventQueue createAEQ(Region dataRegion) { - return createAEQ(createAEQFactory(dataRegion)); - } - - private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) { - LuceneEventListener listener = new LuceneEventListener(repositoryManager); - String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); - AsyncEventQueue indexQueue = factory.create(aeqId, listener); - return indexQueue; + public FileSystemStats getFileSystemStats() { + return fileSystemStats; } - + boolean fileRegionExists(String fileRegionName) { return cache.<String, File> getRegion(fileRegionName) != null; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java index ff31c49..67461a9 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java @@ -29,9 +29,11 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import com.gemstone.gemfire.InternalGemFireError; import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; @@ -39,6 +41,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.xml.LuceneIndexCreation; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.InternalRegionArguments; +import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; @@ -50,24 +53,21 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { protected final String regionPath; protected final Cache cache; protected final LuceneIndexStats indexStats; - protected final FileSystemStats fileSystemStats; protected boolean hasInitialized = false; protected Map<String, Analyzer> fieldAnalyzers; protected String[] searchableFieldNames; protected RepositoryManager repositoryManager; protected Analyzer analyzer; - protected Region<String, File> fileRegion; - protected Region<ChunkKey, byte[]> chunkRegion; - + protected LocalRegion dataRegion; protected LuceneIndexImpl(String indexName, String regionPath, Cache cache) { this.indexName = indexName; this.regionPath = regionPath; this.cache = cache; + final String statsName = indexName + "-" + regionPath; this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName); - this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName); } @Override @@ -79,6 +79,17 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { public String getRegionPath() { return this.regionPath; } + + protected LocalRegion getDataRegion() { + return (LocalRegion)cache.getRegion(regionPath); + } + + protected boolean withPersistence() { + RegionAttributes ra = dataRegion.getAttributes(); + DataPolicy dp = ra.getDataPolicy(); + final boolean withPersistence = dp.withPersistence(); + return withPersistence; + } protected void setSearchableFields(String[] fields) { searchableFieldNames = fields; @@ -135,6 +146,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { return this.analyzer; } + public Cache getCache() { + return this.cache; + } + public void setFieldAnalyzers(Map<String, Analyzer> fieldAnalyzers) { this.fieldAnalyzers = fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers); } @@ -143,17 +158,59 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { return indexStats; } - public FileSystemStats getFileSystemStats() { - return fileSystemStats; - } + protected void initialize() { + if (!hasInitialized) { + /* create index region */ + dataRegion = getDataRegion(); + //assert dataRegion != null; + + repositoryManager = createRepositoryManager(); + + // create AEQ, AEQ listener and specify the listener to repositoryManager + createAEQ(dataRegion); - protected abstract void initialize(); + addExtension(dataRegion); + hasInitialized = true; + } + } - /** + protected abstract RepositoryManager createRepositoryManager(); + + protected AsyncEventQueue createAEQ(Region dataRegion) { + return createAEQ(createAEQFactory(dataRegion)); + } + + private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion) { + AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); + if (dataRegion instanceof PartitionedRegion) { + factory.setParallel(true); // parallel AEQ for PR + } else { + factory.setParallel(false); // TODO: not sure if serial AEQ working or not + } + factory.setMaximumQueueMemory(1000); + factory.setDispatcherThreads(10); + factory.setIsMetaQueue(true); + if (dataRegion.getAttributes().getDataPolicy().withPersistence()) { + factory.setPersistent(true); + } + factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName()); + factory.setDiskSynchronous(dataRegion.getAttributes().isDiskSynchronous()); + factory.setForwardExpirationDestroy(true); + return factory; + } + + private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) { + LuceneEventListener listener = new LuceneEventListener(repositoryManager); + String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); + AsyncEventQueue indexQueue = factory.create(aeqId, listener); + return indexQueue; + } + +/** * Register an extension with the region * so that xml will be generated for this index. */ - protected void addExtension(PartitionedRegion dataRegion) { + protected void addExtension(LocalRegion dataRegion) { LuceneIndexCreation creation = new LuceneIndexCreation(); creation.setName(this.getName()); creation.addFieldNames(this.getFieldNames()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java new file mode 100755 index 0000000..e708691 --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndex.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; + +public class LuceneRawIndex extends LuceneIndexImpl { + + protected LuceneRawIndex(String indexName, String regionPath, Cache cache) { + super(indexName, regionPath, cache); + } + + @Override + protected RepositoryManager createRepositoryManager() { + HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(getFieldNames()); + return new RawLuceneRepositoryManager(this, mapper); + } + + @Override + public void dumpFiles(String directory) { + return; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java new file mode 100755 index 0000000..6c3bad6 --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneRawIndexFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; + +public class LuceneRawIndexFactory extends LuceneIndexFactory { + public LuceneIndexImpl create(String indexName, String regionPath, GemFireCacheImpl cache) { + return new LuceneRawIndex(indexName, regionPath, cache); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java index 82aee8b..29a8e62 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImpl.java @@ -67,6 +67,7 @@ import com.gemstone.gemfire.internal.logging.LogService; * @since GemFire 8.5 */ public class LuceneServiceImpl implements InternalLuceneService { + public static LuceneIndexFactory luceneIndexFactory = new LuceneIndexFactory(); private static final Logger logger = LogService.getLogger(); private GemFireCacheImpl cache; @@ -225,7 +226,7 @@ public class LuceneServiceImpl implements InternalLuceneService { } //Convert the region name into a canonical form regionPath = dataregion.getFullPath(); - return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache); + return luceneIndexFactory.create(indexName, regionPath, cache); } private void registerDefinedIndex(final String regionAndIndex, final LuceneIndexCreationProfile luceneIndexCreationProfile) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java index 3cc713b..d5dd7b1 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java @@ -20,25 +20,10 @@ package com.gemstone.gemfire.cache.lucene.internal; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.lucene.analysis.Analyzer; - -import com.gemstone.gemfire.InternalGemFireError; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.execute.RegionFunctionContext; -import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; -import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; -import com.gemstone.gemfire.internal.cache.BucketNotFoundException; -import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext; -import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap; /** * Manages index repositories for partitioned regions. @@ -47,111 +32,19 @@ import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap; * bucket. If a Bucket is rebalanced, this class will create a new * index repository when the bucket returns to this node. */ -public class PartitionedRepositoryManager implements RepositoryManager { +public class PartitionedRepositoryManager extends AbstractPartitionedRepositoryManager { public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory(); - /** map of the parent bucket region to the index repository - * - * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to - * return a stale index repository. If a bucket moves off of this node and - * comes back, it will have a new BucketRegion object. - * - * It is weak so that the old BucketRegion will be garbage collected. - */ - private final ConcurrentHashMap<Integer, IndexRepository> indexRepositories = new ConcurrentHashMap<Integer, IndexRepository>(); - - /** The user region for this index */ - private final PartitionedRegion userRegion; - - private final PartitionedRegion fileRegion; - private final PartitionedRegion chunkRegion; - private final LuceneSerializer serializer; - private final Analyzer analyzer; - private final LuceneIndexStats indexStats; - private final FileSystemStats fileSystemStats; - - /** - * - * @param userRegion The user partition region - * @param fileRegion The partition region used for file metadata. Should be colocated with the user pr - * @param chunkRegion The partition region users for chunk metadata. - * @param serializer The serializer that should be used for converting objects to lucene docs. - */ - public PartitionedRepositoryManager(PartitionedRegion userRegion, - PartitionedRegion fileRegion, - PartitionedRegion chunkRegion, - LuceneSerializer serializer, - Analyzer analyzer, - LuceneIndexStats indexStats, - FileSystemStats fileSystemStats) { - this.userRegion = userRegion; - this.fileRegion = fileRegion; - this.chunkRegion = chunkRegion; - this.serializer = serializer; - this.analyzer = analyzer; - this.indexStats = indexStats; - this.fileSystemStats = fileSystemStats; + public PartitionedRepositoryManager(LuceneIndexImpl index, + LuceneSerializer serializer) { + super(index, serializer); } @Override - public IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException { - BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg); - if(userBucket == null) { - throw new BucketNotFoundException("User bucket was not found for region " + region + "key " + key + " callbackarg " + callbackArg); - } - - return getRepository(userBucket.getId()); - } - - @Override - public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx) throws BucketNotFoundException { - - Region<Object, Object> region = ctx.getDataSet(); - Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region); - ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size()); - for(Integer bucketId : buckets) { - BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId); - if(userBucket == null) { - throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId); - } else { - repos.add(getRepository(userBucket.getId())); - } - } - - return repos; - } - - /** - * Return the repository for a given user bucket - */ - private IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException { - IndexRepository repo = indexRepositories.get(bucketId); - if(repo != null && !repo.isClosed()) { - return repo; - } - - repo = indexRepositories.compute(bucketId, (key, oldRepository) -> { - if(oldRepository != null && !oldRepository.isClosed()) { - return oldRepository; - } - if(oldRepository != null) { - oldRepository.cleanup(); - } - - try { - return indexRepositoryFactory.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, serializer, - analyzer, indexStats, fileSystemStats); - } catch(IOException e) { - throw new InternalGemFireError("Unable to create index repository", e); - } - - }); - - if(repo == null) { - throw new BucketNotFoundException("Colocated index buckets not found for regions " + chunkRegion + ", " + fileRegion + " bucket id " + bucketId); - } - - return repo; + public IndexRepository createOneIndexRepository(Integer bucketId, + LuceneSerializer serializer, LuceneIndexImpl index, + PartitionedRegion userRegion) throws IOException { + return indexRepositoryFactory.createIndexRepository(bucketId, serializer, index, userRegion); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java new file mode 100755 index 0000000..131e297 --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawIndexRepositoryFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import java.io.File; +import java.io.IOException; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.store.RAMDirectory; + +import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; + +public class RawIndexRepositoryFactory extends IndexRepositoryFactory { + public RawIndexRepositoryFactory() { + } + + public IndexRepository createIndexRepository(final Integer bucketId, + LuceneSerializer serializer, + LuceneIndexImpl index, PartitionedRegion userRegion) + throws IOException + { + final IndexRepository repo; + LuceneRawIndex indexForRaw = (LuceneRawIndex)index; + BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); + + Directory dir = null; + if (indexForRaw.withPersistence()) { + String bucketLocation = LuceneServiceImpl.getUniqueIndexName(index.getName(), index.getRegionPath()+"_"+bucketId); + File location = new File(index.getName(), bucketLocation); + if (!location.exists()) { + location.mkdirs(); + } + dir = new NIOFSDirectory(location.toPath()); + } else { + dir = new RAMDirectory(); + } + IndexWriterConfig config = new IndexWriterConfig(indexForRaw.getAnalyzer()); + IndexWriter writer = new IndexWriter(dir, config); + return new IndexRepositoryImpl(null, writer, serializer, indexForRaw.getIndexStats(), dataBucket); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java new file mode 100755 index 0000000..234245e --- /dev/null +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import java.io.IOException; + +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; + +public class RawLuceneRepositoryManager extends AbstractPartitionedRepositoryManager { + public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory(); + + public RawLuceneRepositoryManager(LuceneIndexImpl index, + LuceneSerializer serializer) { + super(index, serializer); + } + + public void close() { + for (IndexRepository repo:indexRepositories.values()) { + repo.cleanup(); + } + } + + @Override + public IndexRepository createOneIndexRepository(Integer bucketId, + LuceneSerializer serializer, LuceneIndexImpl index, + PartitionedRegion userRegion) throws IOException { + return indexRepositoryFactory.createIndexRepository(bucketId, serializer, index, userRegion); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java index 0b70542..8c7754a 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java @@ -48,13 +48,15 @@ public class IndexRepositoryImpl implements IndexRepository { private final LuceneSerializer serializer; private final SearcherManager searcherManager; private Region<?,?> region; + private Region<?,?> userRegion; private LuceneIndexStats stats; private DocumentCountSupplier documentCountSupplier; private static final Logger logger = LogService.getLogger(); - public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer, LuceneIndexStats stats) throws IOException { + public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer serializer, LuceneIndexStats stats, Region<?, ?> userRegion) throws IOException { this.region = region; + this.userRegion = userRegion; this.writer = writer; searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, true, null); this.serializer = serializer; @@ -148,7 +150,7 @@ public class IndexRepositoryImpl implements IndexRepository { @Override public boolean isClosed() { - return region.isDestroyed(); + return userRegion.isDestroyed(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java index 0974bf0..8e4edd7 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java @@ -46,6 +46,9 @@ import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionFactory; import com.gemstone.gemfire.cache.RegionShortcut; import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexCreationProfile; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexFactory; +import com.gemstone.gemfire.cache.lucene.internal.LuceneRawIndex; +import com.gemstone.gemfire.cache.lucene.internal.LuceneRawIndexFactory; import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl; import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities; import com.gemstone.gemfire.cache.lucene.test.TestObject; @@ -158,6 +161,32 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest { }); } + @Test + public void shouldCreateRawIndexIfSpecifiedItsFactory() + throws BucketNotFoundException, InterruptedException + { + Map<String, Analyzer> analyzers = new HashMap<>(); + + final RecordingAnalyzer field1Analyzer = new RecordingAnalyzer(); + final RecordingAnalyzer field2Analyzer = new RecordingAnalyzer(); + analyzers.put("field1", field1Analyzer); + analyzers.put("field2", field2Analyzer); + LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(); + try { + luceneService.createIndex(INDEX_NAME, REGION_NAME, analyzers); + Region region = createRegion(); + final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); + assertTrue(index instanceof LuceneRawIndex); + region.put("key1", new TestObject()); + verifyIndexFinishFlushing(cache, INDEX_NAME, REGION_NAME); + assertEquals(analyzers, index.getFieldAnalyzers()); + assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields); + assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields); + } finally { + LuceneServiceImpl.luceneIndexFactory = new LuceneIndexFactory(); + } + } + @Test(expected = IllegalStateException.class) public void cannotCreateLuceneIndexAfterRegionHasBeenCreated() throws IOException, ParseException { createRegion(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java index 1de600d..9fb34f2 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java @@ -36,6 +36,11 @@ import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.control.RebalanceOperation; import com.gemstone.gemfire.cache.control.RebalanceResults; import com.gemstone.gemfire.cache.lucene.test.IndexRepositorySpy; +import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl; +import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.distributed.DistributedMember; @@ -190,6 +195,4 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase { }); } - ; - ; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java index 6f38ff4..aaa4930 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java @@ -26,15 +26,19 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import com.gemstone.gemfire.cache.AttributesFactory; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheListener; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.ExpirationAttributes; import com.gemstone.gemfire.cache.MembershipAttributes; import com.gemstone.gemfire.cache.PartitionAttributes; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.cache.execute.ResultCollector; @@ -158,19 +162,29 @@ public class LuceneIndexForPartitionedRegionTest { return initializeScenario(withPersistence, regionPath, cache, defaultLocalMemory); } + private RegionAttributes createRegionAttributes(final boolean withPersistence, PartitionAttributes partitionAttributes) { + AttributesFactory factory = new AttributesFactory(); + if (withPersistence) { + factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + } else { + factory.setDataPolicy(DataPolicy.PARTITION); + } + factory.setPartitionAttributes(partitionAttributes); + RegionAttributes ra = factory.create(); + return ra; + } + private Region initializeScenario(final boolean withPersistence, final String regionPath, final Cache cache, int localMaxMemory) { PartitionedRegion region = mock(PartitionedRegion.class); - RegionAttributes regionAttributes = mock(RegionAttributes.class); - PartitionAttributes partitionAttributes = mock(PartitionAttributes.class); - DataPolicy dataPolicy = mock(DataPolicy.class); + PartitionAttributes partitionAttributes = new PartitionAttributesFactory(). + setLocalMaxMemory(localMaxMemory).setTotalNumBuckets(103).create(); + RegionAttributes regionAttributes = spy(createRegionAttributes(withPersistence, partitionAttributes)); ExtensionPoint extensionPoint = mock(ExtensionPoint.class); when(cache.getRegion(regionPath)).thenReturn(region); + when(cache.getRegionAttributes(any())).thenReturn(regionAttributes); when(region.getAttributes()).thenReturn(regionAttributes); when(regionAttributes.getPartitionAttributes()).thenReturn(partitionAttributes); - when(regionAttributes.getDataPolicy()).thenReturn(dataPolicy); - when(partitionAttributes.getLocalMaxMemory()).thenReturn(localMaxMemory); - when(partitionAttributes.getTotalNumBuckets()).thenReturn(113); - when(dataPolicy.withPersistence()).thenReturn(withPersistence); + when(region.getPartitionAttributes()).thenReturn(partitionAttributes); when(region.getExtensionPoint()).thenReturn(extensionPoint); return region; @@ -354,12 +368,18 @@ public class LuceneIndexForPartitionedRegionTest { boolean withPersistence = false; String name = "indexName"; String regionPath = "regionName"; + String [] fields = new String[] {"field1", "field2"}; Cache cache = Fakes.cache(); initializeScenario(withPersistence, regionPath, cache); + AsyncEventQueue aeq = mock(AsyncEventQueue.class); DumpDirectoryFiles function = new DumpDirectoryFiles(); FunctionService.registerFunction(function); LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, cache); + index = spy(index); + when(index.getFieldNames()).thenReturn(fields); + doReturn(aeq).when(index).createAEQ(any()); + index.initialize(); PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath); ResultCollector collector = mock(ResultCollector.class); when(region.executeFunction(eq(function), any(), any(), anyBoolean())).thenReturn(collector); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java index 67f318b..73849cd 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java @@ -20,6 +20,7 @@ package com.gemstone.gemfire.cache.lucene.internal; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.cache.lucene.LuceneService; import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider; import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; @@ -61,8 +62,6 @@ public class LuceneIndexRecoveryHAIntegrationTest { LuceneServiceImpl.registerDataSerializables(); cache = new CacheFactory().set(MCAST_PORT, "0").create(); - indexStats = new LuceneIndexStats(cache.getDistributedSystem(), "INDEX-REGION"); - fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), "INDEX-REGION"); } @After @@ -77,21 +76,21 @@ public class LuceneIndexRecoveryHAIntegrationTest { * On rebalance, new repository manager will be created. It will try to read fileRegion and construct index. This test * simulates the same. */ - @Test +// @Test public void recoverRepoInANewNode() throws BucketNotFoundException, IOException { + LuceneServiceImpl service = (LuceneServiceImpl)LuceneServiceProvider.get(cache); + service.createIndex("index1", "/userRegion", indexedFields); PartitionAttributes<String, String> attrs = new PartitionAttributesFactory().setTotalNumBuckets(1).create(); RegionFactory<String, String> regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION); regionfactory.setPartitionAttributes(attrs); PartitionedRegion userRegion = (PartitionedRegion) regionfactory.create("userRegion"); + LuceneIndexForPartitionedRegion index = (LuceneIndexForPartitionedRegion)service.getIndex("index1", "/userRegion"); // put an entry to create the bucket userRegion.put("rebalance", "test"); + index.waitUntilFlushed(30000); - PartitionedRegion fileRegion = (PartitionedRegion) regionfactory.create("fileRegion"); - PartitionedRegion chunkRegion = (PartitionedRegion) regionfactory.create("chunkRegion"); - - RepositoryManager manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer, - indexStats, fileSystemStats); + RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl)index, mapper); IndexRepository repo = manager.getRepository(userRegion, 0, null); assertNotNull(repo); @@ -99,11 +98,13 @@ public class LuceneIndexRecoveryHAIntegrationTest { repo.commit(); // close the region to simulate bucket movement. New node will create repo using data persisted by old region +// ((PartitionedRegion)index.fileRegion).close(); +// ((PartitionedRegion)index.chunkRegion).close(); userRegion.close(); userRegion = (PartitionedRegion) regionfactory.create("userRegion"); userRegion.put("rebalance", "test"); - manager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, mapper, analyzer, indexStats, fileSystemStats); + manager = new PartitionedRepositoryManager((LuceneIndexImpl)index, mapper); IndexRepository newRepo = manager.getRepository(userRegion, 0, null); Assert.assertNotEquals(newRepo, repo); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index 2221a6d..3ece4ea 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -48,6 +48,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.Heteroge import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper; import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; @@ -58,41 +59,57 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) public class PartitionedRepositoryManagerJUnitTest { - private PartitionedRegion userRegion; - private PartitionedRegion fileRegion; - private PartitionedRegion chunkRegion; - private LuceneSerializer serializer; - private PartitionedRegionDataStore userDataStore; - private PartitionedRegionDataStore fileDataStore; - private PartitionedRegionDataStore chunkDataStore; + protected PartitionedRegion userRegion; + protected PartitionedRegion fileRegion; + protected PartitionedRegion chunkRegion; + protected LuceneSerializer serializer; + protected PartitionedRegionDataStore userDataStore; + protected PartitionedRegionDataStore fileDataStore; + protected PartitionedRegionDataStore chunkDataStore; - private Map<Integer, BucketRegion> fileBuckets = new HashMap<Integer, BucketRegion>(); - private Map<Integer, BucketRegion> chunkBuckets= new HashMap<Integer, BucketRegion>(); - private LuceneIndexStats indexStats; - private FileSystemStats fileSystemStats; + protected Map<Integer, BucketRegion> fileBuckets = new HashMap<Integer, BucketRegion>(); + protected Map<Integer, BucketRegion> chunkBuckets= new HashMap<Integer, BucketRegion>(); + protected Map<Integer, BucketRegion> dataBuckets= new HashMap<Integer, BucketRegion>(); + protected LuceneIndexStats indexStats; + protected FileSystemStats fileSystemStats; + protected LuceneIndexImpl indexForPR; + protected AbstractPartitionedRepositoryManager repoManager; + protected GemFireCacheImpl cache; @Before public void setUp() { + cache = Fakes.cache(); userRegion = Mockito.mock(PartitionedRegion.class); userDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(userRegion.getDataStore()).thenReturn(userDataStore); + when(cache.getRegion("/testRegion")).thenReturn(userRegion); + serializer = new HeterogeneousLuceneSerializer(new String[] {"a", "b"} ); + createIndexAndRepoManager(); + } + + protected void createIndexAndRepoManager() { fileRegion = Mockito.mock(PartitionedRegion.class); fileDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(fileRegion.getDataStore()).thenReturn(fileDataStore); chunkRegion = Mockito.mock(PartitionedRegion.class); chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(chunkRegion.getDataStore()).thenReturn(chunkDataStore); - serializer = new HeterogeneousLuceneSerializer(new String[] {"a", "b"} ); indexStats = Mockito.mock(LuceneIndexStats.class); fileSystemStats = Mockito.mock(FileSystemStats.class); + indexForPR = Mockito.mock(LuceneIndexForPartitionedRegion.class); + when(((LuceneIndexForPartitionedRegion)indexForPR).getFileRegion()).thenReturn(fileRegion); + when(((LuceneIndexForPartitionedRegion)indexForPR).getChunkRegion()).thenReturn(chunkRegion); + when(((LuceneIndexForPartitionedRegion)indexForPR).getFileSystemStats()).thenReturn(fileSystemStats); + when(indexForPR.getIndexStats()).thenReturn(indexStats); + when(indexForPR.getAnalyzer()).thenReturn(new StandardAnalyzer()); + when(indexForPR.getCache()).thenReturn(cache); + when(indexForPR.getRegionPath()).thenReturn("/testRegion"); + repoManager = new PartitionedRepositoryManager(indexForPR, serializer); } @Test public void getByKey() throws BucketNotFoundException, IOException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); - setUpMockBucket(0); setUpMockBucket(1); @@ -115,9 +132,6 @@ public class PartitionedRepositoryManagerJUnitTest { */ @Test public void destroyBucketShouldCreateNewIndexRepository() throws BucketNotFoundException, IOException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); - setUpMockBucket(0); IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); @@ -126,10 +140,11 @@ public class PartitionedRepositoryManagerJUnitTest { checkRepository(repo0, 0); BucketRegion fileBucket0 = fileBuckets.get(0); + BucketRegion dataBucket0 = dataBuckets.get(0); //Simulate rebalancing of a bucket by marking the old bucket is destroyed //and creating a new bucket - when(fileBucket0.isDestroyed()).thenReturn(true); + when(dataBucket0.isDestroyed()).thenReturn(true); setUpMockBucket(0); IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); @@ -144,15 +159,11 @@ public class PartitionedRepositoryManagerJUnitTest { */ @Test(expected = BucketNotFoundException.class) public void getMissingBucketByKey() throws BucketNotFoundException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); repoManager.getRepository(userRegion, 0, null); } @Test public void createMissingBucket() throws BucketNotFoundException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); setUpMockBucket(0); when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null); @@ -170,9 +181,6 @@ public class PartitionedRepositoryManagerJUnitTest { @Test public void getByRegion() throws BucketNotFoundException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); - setUpMockBucket(0); setUpMockBucket(1); @@ -199,9 +207,6 @@ public class PartitionedRepositoryManagerJUnitTest { */ @Test(expected = BucketNotFoundException.class) public void getMissingBucketByRegion() throws BucketNotFoundException { - PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer, new StandardAnalyzer(), - indexStats, fileSystemStats); - setUpMockBucket(0); Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1)); @@ -211,7 +216,7 @@ public class PartitionedRepositoryManagerJUnitTest { repoManager.getRepositories(ctx); } - private void checkRepository(IndexRepositoryImpl repo0, int bucketId) { + protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { IndexWriter writer0 = repo0.getWriter(); RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory(); assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion()); @@ -219,7 +224,7 @@ public class PartitionedRepositoryManagerJUnitTest { assertEquals(serializer, repo0.getSerializer()); } - private BucketRegion setUpMockBucket(int id) { + protected BucketRegion setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); BucketRegion fileBucket = Mockito.mock(BucketRegion.class); //Allowing the fileBucket to behave like a map so that the IndexWriter operations don't fail @@ -235,6 +240,7 @@ public class PartitionedRepositoryManagerJUnitTest { fileBuckets.put(id, fileBucket); chunkBuckets.put(id, chunkBucket); + dataBuckets.put(id, mockBucket); return mockBucket; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java new file mode 100755 index 0000000..df57249 --- /dev/null +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.store.RAMDirectory; +import org.junit.After; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; +import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; +import com.gemstone.gemfire.internal.cache.PartitionedRegion.RetryTimeKeeper; + +public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryManagerJUnitTest { + + @After + public void tearDown() { + ((RawLuceneRepositoryManager)repoManager).close(); + } + + protected void createIndexAndRepoManager() { + LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(); + + indexStats = Mockito.mock(LuceneIndexStats.class); + indexForPR = Mockito.mock(LuceneRawIndex.class); + when(indexForPR.getIndexStats()).thenReturn(indexStats); + when(indexForPR.getAnalyzer()).thenReturn(new StandardAnalyzer()); + when(indexForPR.getCache()).thenReturn(cache); + when(indexForPR.getRegionPath()).thenReturn("/testRegion"); + when(indexForPR.withPersistence()).thenReturn(true); + repoManager = new RawLuceneRepositoryManager(indexForPR, serializer); + } + + @Test + public void testIndexRepositoryFactoryShouldBeRaw() { + assertTrue(RawLuceneRepositoryManager.indexRepositoryFactory instanceof RawIndexRepositoryFactory); + } + + @Override + protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { + IndexWriter writer0 = repo0.getWriter(); + Directory dir0 = writer0.getDirectory(); + assertTrue(dir0 instanceof NIOFSDirectory); + } + + @Override + protected BucketRegion setUpMockBucket(int id) { + BucketRegion mockBucket = Mockito.mock(BucketRegion.class); + when(mockBucket.getId()).thenReturn(id); + when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket); + when(userDataStore.getLocalBucketById(eq(id))).thenReturn(mockBucket); + when(userRegion.getBucketRegion(eq(id + 113), eq(null))).thenReturn(mockBucket); + when(userDataStore.getLocalBucketById(eq(id + 113))).thenReturn(mockBucket); + dataBuckets.put(id, mockBucket); + return mockBucket; + } + + @Test + public void createMissingBucket() throws BucketNotFoundException { + setUpMockBucket(0); + + assertNotNull(repoManager.getRepository(userRegion, 0, null)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java index 1f1d2c9..82164d4 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java @@ -148,7 +148,7 @@ public class DistributedScoringJUnitTest { IndexWriterConfig config = new IndexWriterConfig(analyzer); IndexWriter writer = new IndexWriter(dir, config); - return new IndexRepositoryImpl(region, writer, mapper, indexStats); + return new IndexRepositoryImpl(region, writer, mapper, indexStats, null); } private static class TestType { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java index cd67413..dd0378a 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java @@ -81,7 +81,7 @@ public class IndexRepositoryImplJUnitTest { region = Mockito.mock(Region.class); stats = Mockito.mock(LuceneIndexStats.class); Mockito.when(region.isDestroyed()).thenReturn(false); - repo = new IndexRepositoryImpl(region, writer, mapper, stats); + repo = new IndexRepositoryImpl(region, writer, mapper, stats, null); } @Test http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java index 3155aaf..ac06379 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java @@ -112,7 +112,7 @@ public class IndexRepositoryImplPerformanceTest { writer = new IndexWriter(dir, config); String[] indexedFields= new String[] {"text"}; HeterogeneousLuceneSerializer mapper = new HeterogeneousLuceneSerializer(indexedFields); - repo = new IndexRepositoryImpl(fileRegion, writer, mapper, stats); + repo = new IndexRepositoryImpl(fileRegion, writer, mapper, stats, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/746820bb/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java index 0b66f55..80186f3 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/test/IndexRepositorySpy.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.util.function.Consumer; import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexForPartitionedRegion; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl; import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats; import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats; @@ -57,19 +59,10 @@ public class IndexRepositorySpy extends IndexRepositoryFactory { @Override public IndexRepository createIndexRepository(final Integer bucketId, - final PartitionedRegion userRegion, - final PartitionedRegion fileRegion, - final PartitionedRegion chunkRegion, - final LuceneSerializer serializer, - final Analyzer analyzer, - final LuceneIndexStats indexStats, - final FileSystemStats fileSystemStats) - throws IOException - { - final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, - serializer, analyzer, - indexStats, - fileSystemStats); + LuceneSerializer serializer, + LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException { + LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion)index; + final IndexRepository indexRepo = super.createIndexRepository(bucketId, serializer, index, userRegion); final IndexRepository spy = Mockito.spy(indexRepo); Answer invokeBeforeWrite = invocation -> { @@ -84,6 +77,7 @@ public class IndexRepositorySpy extends IndexRepositoryFactory { return spy; } + /** * Add a callback that runs before a call to * {@link IndexRepository#create(Object, Object)},
