Repository: geode
Updated Branches:
refs/heads/develop a6832ee2c -> 480a1e05c
GEODE-2828: AEQ created before the Lucene user regions
* AEQ is being created before the Lucene user region
* A countdown latch prevents the index repository computation until the
user regions are ready
* Integration tests do not use a Dummy executor because we need a
thread pool for afterPrimary call.
This closes #481
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/480a1e05
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/480a1e05
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/480a1e05
Branch: refs/heads/develop
Commit: 480a1e05cd5bc332c1e5e2593c3468f640ded1c0
Parents: a6832ee
Author: nabarun <[email protected]>
Authored: Tue May 2 15:02:23 2017 -0700
Committer: nabarun <[email protected]>
Committed: Tue May 2 21:53:46 2017 -0700
----------------------------------------------------------------------
.../internal/LonerDistributionManager.java | 11 ++++-
.../internal/offheap/OffHeapRegionBase.java | 31 ++++++++++++--
.../AbstractPartitionedRepositoryManager.java | 18 +++++++-
.../lucene/internal/LuceneBucketListener.java | 4 +-
.../lucene/internal/LuceneEventListener.java | 4 --
.../LuceneIndexForPartitionedRegion.java | 18 ++++----
.../cache/lucene/internal/LuceneIndexImpl.java | 44 ++++++++++++--------
.../cache/lucene/internal/LuceneRawIndex.java | 10 ++++-
.../lucene/internal/LuceneRegionListener.java | 14 ++++++-
.../lucene/internal/LuceneServiceImpl.java | 34 +++++++--------
.../internal/LuceneEventListenerJUnitTest.java | 4 --
.../lucene/internal/LuceneIndexFactorySpy.java | 18 --------
.../LuceneIndexForPartitionedRegionTest.java | 22 ++++++----
.../PartitionedRepositoryManagerJUnitTest.java | 2 +
.../RawLuceneRepositoryManagerJUnitTest.java | 2 +
15 files changed, 143 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index e9068e6..fdb6a13 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -71,7 +71,14 @@ public class LonerDistributionManager implements DM {
// no threads needed
}
- protected void shutdown() {}
+ protected void shutdown() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new InternalGemFireError("Interrupted while waiting for DM
shutdown");
+ }
+ }
private final InternalDistributedMember id;
@@ -94,7 +101,7 @@ public class LonerDistributionManager implements DM {
private ConcurrentMap<InternalDistributedMember, InternalDistributedMember>
canonicalIds =
new ConcurrentHashMap();
static private final DummyDMStats stats = new DummyDMStats();
- static private final DummyExecutor executor = new DummyExecutor();
+ static private final ExecutorService executor =
Executors.newCachedThreadPool();
@Override
public long cacheTimeMillis() {
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
index 62766cc..c0c6085 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
@@ -31,12 +31,17 @@ import org.apache.geode.pdx.PdxReader;
import org.apache.geode.pdx.PdxSerializable;
import org.apache.geode.pdx.PdxWriter;
import org.apache.geode.test.dunit.WaitCriterion;
+import org.awaitility.Awaitility;
+import org.junit.After;
import org.junit.Test;
+import java.io.File;
+import java.io.FilenameFilter;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
@@ -72,6 +77,22 @@ public abstract class OffHeapRegionBase {
return result;
}
+ @After
+ public void cleanUp() {
+ File dir = new File(".");
+ File[] files = dir.listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("BACKUP");
+ }
+
+ });
+ for (File file : files) {
+ file.delete();
+ }
+ }
+
private void closeCache(GemFireCacheImpl gfc, boolean keepOffHeapAllocated) {
gfc.close();
if (!keepOffHeapAllocated) {
@@ -200,7 +221,8 @@ public abstract class OffHeapRegionBase {
gfc.setCopyOnRead(true);
final MemoryAllocator ma = gfc.getOffHeapStore();
assertNotNull(ma);
- assertEquals(0, ma.getUsedMemory());
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> assertEquals(0, ma.getUsedMemory()));
Compressor compressor = null;
if (compressed) {
compressor = SnappyCompressor.getDefaultInstance();
@@ -413,7 +435,8 @@ public abstract class OffHeapRegionBase {
assertTrue(ma.getUsedMemory() > 0);
try {
r.clear();
- assertEquals(0, ma.getUsedMemory());
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> assertEquals(0, ma.getUsedMemory()));
} catch (UnsupportedOperationException ok) {
}
@@ -449,8 +472,8 @@ public abstract class OffHeapRegionBase {
}
r.destroyRegion();
- assertEquals(0, ma.getUsedMemory());
-
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> assertEquals(0, ma.getUsedMemory()));
} finally {
if (r != null && !r.isDestroyed()) {
r.destroyRegion();
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
index 26bb488..867794d 100755
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Region;
@@ -47,18 +48,22 @@ public abstract class AbstractPartitionedRepositoryManager
implements Repository
new ConcurrentHashMap<Integer, IndexRepository>();
/** The user region for this index */
- protected final PartitionedRegion userRegion;
+ protected PartitionedRegion userRegion = null;
protected final LuceneSerializer serializer;
protected final LuceneIndexImpl index;
protected volatile boolean closed;
+ final private CountDownLatch isDataRegionReady = new CountDownLatch(1);
public AbstractPartitionedRepositoryManager(LuceneIndexImpl index,
LuceneSerializer serializer) {
this.index = index;
- this.userRegion = (PartitionedRegion)
index.getCache().getRegion(index.getRegionPath());
this.serializer = serializer;
this.closed = false;
}
+ public void setUserRegionForRepositoryManager() {
+ this.userRegion = (PartitionedRegion)
index.getCache().getRegion(index.getRegionPath());
+ }
+
@Override
public IndexRepository getRepository(Region region, Object key, Object
callbackArg)
throws BucketNotFoundException {
@@ -95,6 +100,11 @@ public abstract class AbstractPartitionedRepositoryManager
implements Repository
IndexRepository oldRepository) throws IOException;
protected IndexRepository computeRepository(Integer bucketId) {
+ try {
+ isDataRegionReady.await();
+ } catch (InterruptedException e) {
+ throw new InternalGemFireError("Uable to create index repository", e);
+ }
IndexRepository repo = indexRepositories.compute(bucketId, (key,
oldRepository) -> {
try {
if (closed) {
@@ -111,6 +121,10 @@ public abstract class AbstractPartitionedRepositoryManager
implements Repository
return repo;
}
+ protected void allowRepositoryComputation() {
+ isDataRegionReady.countDown();
+ }
+
/**
* Return the repository for a given user bucket
*/
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
index 32fb3fc..37871aa 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
@@ -24,10 +24,10 @@ import org.apache.lucene.store.AlreadyClosedException;
public class LuceneBucketListener extends PartitionListenerAdapter {
private static final Logger logger = LogService.getLogger();
- private PartitionedRepositoryManager lucenePartitionRepositoryManager;
+ private AbstractPartitionedRepositoryManager
lucenePartitionRepositoryManager;
private final DM dm;
- public LuceneBucketListener(PartitionedRepositoryManager
partitionedRepositoryManager,
+ public LuceneBucketListener(AbstractPartitionedRepositoryManager
partitionedRepositoryManager,
final DM dm) {
lucenePartitionRepositoryManager = partitionedRepositoryManager;
this.dm = dm;
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index c3fa2ff..bc4a7da 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -27,7 +27,6 @@ import
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
@@ -36,10 +35,7 @@ import
org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.internal.cache.BucketNotFoundException;
-import org.apache.geode.internal.cache.CacheObserverHolder;
import org.apache.geode.internal.cache.PrimaryBucketException;
-import org.apache.geode.internal.cache.partitioned.Bucket;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.TestHook;
import org.apache.geode.internal.logging.LogService;
import org.apache.lucene.store.AlreadyClosedException;
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index c39a4a8..41505d7 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -57,6 +57,15 @@ public class LuceneIndexForPartitionedRegion extends
LuceneIndexImpl {
}
protected RepositoryManager createRepositoryManager() {
+ HeterogeneousLuceneSerializer mapper = new
HeterogeneousLuceneSerializer(getFieldNames());
+ PartitionedRepositoryManager partitionedRepositoryManager =
+ new PartitionedRepositoryManager(this, mapper);
+ return partitionedRepositoryManager;
+ }
+
+ protected void createLuceneListenersAndFileChunkRegions(
+ AbstractPartitionedRepositoryManager partitionedRepositoryManager) {
+ partitionedRepositoryManager.setUserRegionForRepositoryManager();
RegionShortcut regionShortCut;
final boolean withPersistence = withPersistence();
RegionAttributes regionAttributes = dataRegion.getAttributes();
@@ -78,14 +87,6 @@ public class LuceneIndexForPartitionedRegion extends
LuceneIndexImpl {
// create PR fileAndChunkRegion, but not to create its buckets for now
final String fileRegionName = createFileRegionName();
PartitionAttributes partitionAttributes =
dataRegion.getPartitionAttributes();
-
-
- // create PR chunkRegion, but not to create its buckets for now
-
- // we will create RegionDirectories on the fly when data comes in
- HeterogeneousLuceneSerializer mapper = new
HeterogeneousLuceneSerializer(getFieldNames());
- PartitionedRepositoryManager partitionedRepositoryManager =
- new PartitionedRepositoryManager(this, mapper);
DM dm = this.cache.getInternalDistributedSystem().getDistributionManager();
LuceneBucketListener lucenePrimaryBucketListener =
new LuceneBucketListener(partitionedRepositoryManager, dm);
@@ -98,7 +99,6 @@ public class LuceneIndexForPartitionedRegion extends
LuceneIndexImpl {
fileSystemStats
.setBytesSupplier(() ->
getFileAndChunkRegion().getPrStats().getDataStoreBytesInUse());
- return partitionedRepositoryManager;
}
public PartitionedRegion getFileAndChunkRegion() {
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index 36f6720..3393bcf 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -34,7 +34,6 @@ import
org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.extension.Extension;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -48,6 +47,7 @@ public abstract class LuceneIndexImpl implements
InternalLuceneIndex {
protected final LuceneIndexStats indexStats;
protected boolean hasInitialized = false;
+ protected boolean hasInitializedAEQ = false;
protected Map<String, Analyzer> fieldAnalyzers;
protected String[] searchableFieldNames;
protected RepositoryManager repositoryManager;
@@ -131,30 +131,41 @@ public abstract class LuceneIndexImpl implements
InternalLuceneIndex {
if (!hasInitialized) {
/* create index region */
dataRegion = getDataRegion();
- // assert dataRegion != null;
-
- repositoryManager = createRepositoryManager();
-
- // create AEQ, AEQ listener and specify the listener to repositoryManager
- createAEQ(dataRegion);
-
+ createLuceneListenersAndFileChunkRegions(
+ (AbstractPartitionedRepositoryManager) repositoryManager);
addExtension(dataRegion);
hasInitialized = true;
}
}
+ protected void initializeAEQ(RegionAttributes attributes, String aeqId) {
+ if (!hasInitializedAEQ) {
+ repositoryManager = createRepositoryManager();
+ createAEQ(attributes, aeqId);
+ hasInitializedAEQ = true;
+ }
+ }
+
protected abstract RepositoryManager createRepositoryManager();
+ protected abstract void createLuceneListenersAndFileChunkRegions(
+ AbstractPartitionedRepositoryManager partitionedRepositoryManager);
+
protected AsyncEventQueue createAEQ(Region dataRegion) {
- return createAEQ(createAEQFactory(dataRegion));
+ String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
+ return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId);
}
- private AsyncEventQueueFactoryImpl createAEQFactory(final Region dataRegion)
{
+ protected AsyncEventQueue createAEQ(RegionAttributes attributes, String
aeqId) {
+ return createAEQ(createAEQFactory(attributes), aeqId);
+ }
+
+ private AsyncEventQueueFactoryImpl createAEQFactory(final RegionAttributes
attributes) {
AsyncEventQueueFactoryImpl factory =
(AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
- if (dataRegion instanceof PartitionedRegion) {
- PartitionedRegion pr = (PartitionedRegion) dataRegion;
- if (pr.getPartitionAttributes().getLocalMaxMemory() == 0) {
+ if (attributes.getPartitionAttributes() != null) {
+
+ if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) {
// accessor will not create AEQ
return null;
}
@@ -165,22 +176,21 @@ public abstract class LuceneIndexImpl implements
InternalLuceneIndex {
factory.setMaximumQueueMemory(1000);
factory.setDispatcherThreads(10);
factory.setIsMetaQueue(true);
- if (dataRegion.getAttributes().getDataPolicy().withPersistence()) {
+ if (attributes.getDataPolicy().withPersistence()) {
factory.setPersistent(true);
}
- factory.setDiskStoreName(dataRegion.getAttributes().getDiskStoreName());
+ factory.setDiskStoreName(attributes.getDiskStoreName());
factory.setDiskSynchronous(true);
factory.setForwardExpirationDestroy(true);
return factory;
}
- private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory) {
+ private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String
aeqId) {
if (factory == null) {
return null;
}
LuceneEventListener listener = new LuceneEventListener(repositoryManager);
factory.setGatewayEventSubstitutionListener(new
LuceneEventSubstitutionFilter());
- String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
AsyncEventQueue indexQueue = factory.create(aeqId, listener);
return indexQueue;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
index 75ab5ca..ee2930d 100755
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
@@ -27,7 +27,15 @@ public class LuceneRawIndex extends LuceneIndexImpl {
@Override
protected RepositoryManager createRepositoryManager() {
HeterogeneousLuceneSerializer mapper = new
HeterogeneousLuceneSerializer(getFieldNames());
- return new RawLuceneRepositoryManager(this, mapper);
+ RawLuceneRepositoryManager rawLuceneRepositoryManager =
+ new RawLuceneRepositoryManager(this, mapper);
+ return rawLuceneRepositoryManager;
+ }
+
+ @Override
+ protected void createLuceneListenersAndFileChunkRegions(
+ AbstractPartitionedRepositoryManager partitionedRepositoryManager) {
+ partitionedRepositoryManager.setUserRegionForRepositoryManager();
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
index f4e2a79..48462a0 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
@@ -23,6 +23,7 @@ import org.apache.geode.cache.EvictionAlgorithm;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.RegionListener;
@@ -43,6 +44,8 @@ public class LuceneRegionListener implements RegionListener {
private final String[] fields;
+ private LuceneIndexImpl luceneIndex;
+
public LuceneRegionListener(LuceneServiceImpl service, InternalCache cache,
String indexName,
String regionPath, String[] fields, Analyzer analyzer, Map<String,
Analyzer> fieldAnalyzers) {
this.service = service;
@@ -97,6 +100,9 @@ public class LuceneRegionListener implements RegionListener {
internalRegionArgs.addCacheServiceProfile(new
LuceneIndexCreationProfile(this.indexName,
this.regionPath, this.fields, this.analyzer, this.fieldAnalyzers));
+ luceneIndex = this.service.beforeDataRegionCreated(this.indexName,
this.regionPath, attrs,
+ this.analyzer, this.fieldAnalyzers, aeqId, this.fields);
+
// Add internal async event id
internalRegionArgs.addInternalAsyncEventQueueId(aeqId);
}
@@ -106,8 +112,12 @@ public class LuceneRegionListener implements
RegionListener {
@Override
public void afterCreate(Region region) {
if (region.getFullPath().equals(this.regionPath)) {
- this.service.afterDataRegionCreated(this.indexName, this.analyzer,
this.regionPath,
- this.fieldAnalyzers, this.fields);
+ this.service.afterDataRegionCreated(this.luceneIndex);
+ String aeqId = LuceneServiceImpl.getUniqueIndexName(this.indexName,
this.regionPath);
+ AsyncEventQueueImpl aeq = (AsyncEventQueueImpl)
cache.getAsyncEventQueue(aeqId);
+ AbstractPartitionedRepositoryManager repositoryManager =
+ (AbstractPartitionedRepositoryManager)
luceneIndex.getRepositoryManager();
+ repositoryManager.allowRepositoryComputation();
this.cache.removeRegionListener(this);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 437a552..ebee59e 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -31,10 +31,7 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.EvictionAlgorithm;
-import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.execute.Execution;
@@ -57,7 +54,6 @@ import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.cache.extension.Extensible;
import org.apache.geode.internal.cache.CacheService;
-import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.RegionListener;
import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -167,28 +163,28 @@ public class LuceneServiceImpl implements
InternalLuceneService {
*
* Public because this is called by the Xml parsing code
*/
- public void afterDataRegionCreated(final String indexName, final Analyzer
analyzer,
- final String dataRegionPath, final Map<String, Analyzer> fieldAnalyzers,
- final String... fields) {
- LuceneIndexImpl index = createIndexRegions(indexName, dataRegionPath);
- index.setSearchableFields(fields);
- index.setAnalyzer(analyzer);
- index.setFieldAnalyzers(fieldAnalyzers);
+ public void afterDataRegionCreated(LuceneIndexImpl index) {
index.initialize();
registerIndex(index);
if (this.managementListener != null) {
this.managementListener.afterIndexCreated(index);
}
+
}
- private LuceneIndexImpl createIndexRegions(String indexName, String
regionPath) {
- Region dataregion = this.cache.getRegion(regionPath);
- if (dataregion == null) {
- logger.info("Data region " + regionPath + " not found");
- return null;
- }
- // Convert the region name into a canonical form
- regionPath = dataregion.getFullPath();
+ public LuceneIndexImpl beforeDataRegionCreated(final String indexName, final
String regionPath,
+ RegionAttributes attributes, final Analyzer analyzer,
+ final Map<String, Analyzer> fieldAnalyzers, String aeqId, final
String... fields) {
+ LuceneIndexImpl index = createIndexObject(indexName, regionPath);
+ index.setSearchableFields(fields);
+ index.setAnalyzer(analyzer);
+ index.setFieldAnalyzers(fieldAnalyzers);
+ index.initializeAEQ(attributes, aeqId);
+ return index;
+
+ }
+
+ private LuceneIndexImpl createIndexObject(String indexName, String
regionPath) {
return luceneIndexFactory.create(indexName, regionPath, cache);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java
index 801f6b6..88057e5 100644
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -67,9 +67,7 @@ public class LuceneEventListenerJUnitTest {
Mockito.when(manager.getRepository(eq(region1), any(),
eq(callback1))).thenReturn(repo1);
Mockito.when(manager.getRepository(eq(region2), any(),
eq(null))).thenReturn(repo2);
-
LuceneEventListener listener = new LuceneEventListener(manager);
-
List<AsyncEvent> events = new ArrayList<AsyncEvent>();
int numEntries = 100;
@@ -115,7 +113,6 @@ public class LuceneEventListenerJUnitTest {
Logger log = Mockito.mock(Logger.class);
Mockito.when(manager.getRepository(any(), any(), any()))
.thenThrow(BucketNotFoundException.class);
-
LuceneEventListener listener = new LuceneEventListener(manager);
listener.logger = log;
AsyncEvent event = Mockito.mock(AsyncEvent.class);
@@ -128,7 +125,6 @@ public class LuceneEventListenerJUnitTest {
public void shouldThrowAndCaptureIOException() throws
BucketNotFoundException {
RepositoryManager manager = Mockito.mock(RepositoryManager.class);
Mockito.when(manager.getRepository(any(), any(),
any())).thenThrow(IOException.class);
-
AtomicReference<Throwable> lastException = new AtomicReference<>();
LuceneEventListener.setExceptionObserver(lastException::set);
LuceneEventListener listener = new LuceneEventListener(manager);
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
index 8b379a5..1a092d7 100644
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
@@ -15,15 +15,11 @@
package org.apache.geode.cache.lucene.internal;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
import java.util.function.Consumer;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
-import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.InternalCache;
public class LuceneIndexFactorySpy extends LuceneIndexImplFactory {
@@ -59,19 +55,5 @@ public class LuceneIndexFactorySpy extends
LuceneIndexImplFactory {
super(indexName, regionPath, cache);
}
- @Override
- public RepositoryManager createRepositoryManager() {
- RepositoryManager repositoryManagerSpy =
Mockito.spy(super.createRepositoryManager());
- Answer getRepositoryAnswer = invocation -> {
- getRepositoryConsumer.accept(invocation.getArgumentAt(0,
Object.class));
- return invocation.callRealMethod();
- };
- try {
-
doAnswer(getRepositoryAnswer).when(repositoryManagerSpy).getRepositories(any());
- } catch (BucketNotFoundException e) {
- e.printStackTrace();
- }
- return repositoryManagerSpy;
- }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
index 8e4c179..b2fdd84 100644
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
@@ -194,7 +194,7 @@ public class LuceneIndexForPartitionedRegionTest {
Region region = initializeScenario(withPersistence, regionPath, cache, 0);
LuceneIndexForPartitionedRegion index =
new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- LuceneIndexForPartitionedRegion spy = setupSpy(region, index);
+ LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
spy.initialize();
}
@@ -208,17 +208,18 @@ public class LuceneIndexForPartitionedRegionTest {
LuceneIndexForPartitionedRegion index =
new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- LuceneIndexForPartitionedRegion spy = setupSpy(region, index);
+ LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
- verify(spy).createAEQ(eq(region));
+ verify(spy).createAEQ(eq(region.getAttributes()), eq("aeq"));
}
protected LuceneIndexForPartitionedRegion setupSpy(final Region region,
- final LuceneIndexForPartitionedRegion index) {
+ final LuceneIndexForPartitionedRegion index, final String aeq) {
index.setSearchableFields(new String[] {"field"});
LuceneIndexForPartitionedRegion spy = spy(index);
doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(),
any());
- doReturn(null).when(spy).createAEQ(eq(region));
+ doReturn(null).when(spy).createAEQ(any(), any());
+ spy.initializeAEQ(region.getAttributes(), aeq);
spy.initialize();
return spy;
}
@@ -233,7 +234,7 @@ public class LuceneIndexForPartitionedRegionTest {
LuceneIndexForPartitionedRegion index =
new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- LuceneIndexForPartitionedRegion spy = setupSpy(region, index);
+ LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
verify(spy).createFileRegion(eq(RegionShortcut.PARTITION),
eq(index.createFileRegionName()),
any(), any(), any());
@@ -272,7 +273,8 @@ public class LuceneIndexForPartitionedRegionTest {
index.setSearchableFields(new String[] {"field"});
LuceneIndexForPartitionedRegion spy = spy(index);
doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(),
any());
- doReturn(null).when(spy).createAEQ(any());
+ doReturn(null).when(spy).createAEQ((RegionAttributes) any(), any());
+ spy.initializeAEQ(any(), any());
spy.initialize();
verify(spy).createFileRegion(eq(RegionShortcut.PARTITION_PERSISTENT),
@@ -292,7 +294,8 @@ public class LuceneIndexForPartitionedRegionTest {
index.setSearchableFields(new String[] {"field"});
LuceneIndexForPartitionedRegion spy = spy(index);
doReturn(null).when(spy).createFileRegion(any(), any(), any(), any(),
any());
- doReturn(null).when(spy).createAEQ(any());
+ doReturn(null).when(spy).createAEQ(any(), any());
+ spy.initializeAEQ(any(), any());
spy.initialize();
spy.initialize();
@@ -316,7 +319,8 @@ public class LuceneIndexForPartitionedRegionTest {
new LuceneIndexForPartitionedRegion(name, regionPath, cache);
index = spy(index);
when(index.getFieldNames()).thenReturn(fields);
- doReturn(aeq).when(index).createAEQ(any());
+ doReturn(aeq).when(index).createAEQ(any(), any());
+ index.initializeAEQ(cache.getRegionAttributes(regionPath), aeq.getId());
index.initialize();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
ResultCollector collector = mock(ResultCollector.class);
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index 87317cc..30e64f2 100644
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -113,6 +113,8 @@ public class PartitionedRepositoryManagerJUnitTest {
when(indexForPR.getCache()).thenReturn(cache);
when(indexForPR.getRegionPath()).thenReturn("/testRegion");
repoManager = new PartitionedRepositoryManager(indexForPR, serializer);
+ repoManager.setUserRegionForRepositoryManager();
+ repoManager.allowRepositoryComputation();
}
@Test
http://git-wip-us.apache.org/repos/asf/geode/blob/480a1e05/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
index bca7085..df31bb9 100755
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -78,6 +78,8 @@ public class RawLuceneRepositoryManagerJUnitTest extends
PartitionedRepositoryMa
when(indexForPR.getRegionPath()).thenReturn("/testRegion");
when(indexForPR.withPersistence()).thenReturn(true);
repoManager = new RawLuceneRepositoryManager(indexForPR, serializer);
+ repoManager.setUserRegionForRepositoryManager();
+ repoManager.allowRepositoryComputation();
}
@Test