Hiding the lucene async event queue from the list of queues. I added a new flag to the queue - isMetaQueue, which controls whether a queue will show up in getAllQueues. This also has the effect of preventing the queue from showing up in a generated xml file.
This is necessary because our queue needs to be constructed with an async event listener that has a reference to the index. If the queue is generated and added to the xml, it will end up trying to create a listener before the index is created. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d88ef883 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d88ef883 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d88ef883 Branch: refs/heads/develop Commit: d88ef883b23eb8dd465376ccb5038d789f797bb6 Parents: abe9d47 Author: Dan Smith <[email protected]> Authored: Wed Oct 7 15:04:39 2015 -0700 Committer: Dan Smith <[email protected]> Committed: Thu Oct 8 10:26:02 2015 -0700 ---------------------------------------------------------------------- .../internal/AsyncEventQueueFactoryImpl.java | 9 +++++++-- .../asyncqueue/internal/AsyncEventQueueImpl.java | 3 +++ .../gemfire/internal/cache/GemFireCacheImpl.java | 14 ++++++++++++-- .../internal/cache/wan/AbstractGatewaySender.java | 7 +++++++ .../internal/cache/wan/GatewaySenderAttributes.java | 5 +++++ .../internal/LuceneIndexForPartitionedRegion.java | 7 ++++++- .../lucene/internal/LuceneServiceImplJUnitTest.java | 6 +++++- 7 files changed, 45 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java index f413218..caef0fc 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java @@ -184,8 +184,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { // AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId)); addAsyncEventListener(listener); GatewaySender sender = create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId)); - asyncEventQueue = new AsyncEventQueueImpl(sender, listener); - ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue); + AsyncEventQueueImpl queue = new AsyncEventQueueImpl(sender, listener); + asyncEventQueue = queue; + ((GemFireCacheImpl) cache).addAsyncEventQueue(queue); } else if (this.cache instanceof CacheCreation) { asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, listener); ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue); @@ -282,4 +283,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { this.attrs.isHDFSQueue = isHDFSQueue; return this; } + public AsyncEventQueueFactory setIsMetaQueue(boolean isMetaQueue) { + this.attrs.isMetaQueue = isMetaQueue; + return this; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java index 71e8d2a..9a7698a 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -180,6 +180,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { return sender.isParallel(); } + public boolean isMetaQueue() { + return ((AbstractGatewaySender)sender).getIsMetaQueue(); + } public void destroy() { ((AbstractGatewaySender)this.sender).destroy(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index 6fe639b..cf93dac 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -387,6 +387,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer * {@link #allGatewaySendersLock} */ private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet(); + + /** + * The list of all async event queues added to the cache. + * CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations. + */ + private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>(); /** * The list of all async event queues added to the cache. @@ -3881,8 +3887,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } } - public void addAsyncEventQueue(AsyncEventQueue asyncQueue) { + public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) { this.allAsyncEventQueues.add(asyncQueue); + if(!asyncQueue.isMetaQueue()) { + this.allVisibleAsyncEventQueues.add(asyncQueue); + } system .handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue); } @@ -3925,7 +3934,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } public Set<AsyncEventQueue> getAsyncEventQueues() { - return this.allAsyncEventQueues; + return this.allVisibleAsyncEventQueues; } public AsyncEventQueue getAsyncEventQueue(String id) { @@ -3949,6 +3958,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer // using gateway senders lock since async queue uses a gateway sender synchronized (allGatewaySendersLock) { this.allAsyncEventQueues.remove(asyncQueue); + this.allVisibleAsyncEventQueues.remove(asyncQueue); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java index 3bd2992..e49708f 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java @@ -152,6 +152,8 @@ public abstract class AbstractGatewaySender implements GatewaySender, protected boolean isHDFSQueue; + protected boolean isMetaQueue; + private int parallelismForReplicatedRegion; protected AbstractGatewaySenderEventProcessor eventProcessor; @@ -252,6 +254,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId(); this.serialNumber = DistributionAdvisor.createSerialNumber(); this.isHDFSQueue = attrs.isHDFSQueue(); + this.isMetaQueue = attrs.isMetaQueue(); if (!(this.cache instanceof CacheCreation)) { this.stopper = new Stopper(cache.getCancelCriterion()); this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this); @@ -476,6 +479,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, return this.isHDFSQueue; } + public boolean getIsMetaQueue() { + return this.isMetaQueue; + } + public InternalDistributedSystem getSystem() { return (InternalDistributedSystem)this.cache.getDistributedSystem(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java index 1d0b4f1..2df11aa 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java @@ -22,6 +22,7 @@ public class GatewaySenderAttributes { public static final boolean DEFAULT_IS_BUCKETSORTED = true; public static final boolean DEFAULT_IS_HDFSQUEUE = false; + public static final boolean DEFAULT_IS_META_QUEUE = false; public int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE; @@ -73,6 +74,7 @@ public class GatewaySenderAttributes { public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED; public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE; + public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE; public int getSocketBufferSize() { return this.socketBufferSize; @@ -183,4 +185,7 @@ public class GatewaySenderAttributes { public boolean isHDFSQueue() { return this.isHDFSQueue; } + public boolean isMetaQueue() { + return this.isMetaQueue; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 60085e4..f9e2c1d 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -14,9 +14,11 @@ import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.RegionFactory; import com.gemstone.gemfire.cache.RegionShortcut; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; import com.gemstone.gemfire.cache.execute.RegionFunctionContext; import com.gemstone.gemfire.cache.lucene.LuceneIndex; @@ -26,7 +28,9 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer; import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; +import com.gemstone.gemfire.internal.cache.InternalRegionArguments; import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.RegionFactoryImpl; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; /* wrapper of IndexWriter */ @@ -102,13 +106,14 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { repositoryManager = new PartitionedRepositoryManager(dataRegion, (PartitionedRegion)fileRegion, (PartitionedRegion)chunkRegion, mapper, analyzer); // create AEQ, AEQ listner and specify the listener to repositoryManager - AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); if (withPersistence) { factory.setPersistent(true); } factory.setParallel(true); // parallel AEQ for PR factory.setMaximumQueueMemory(1000); factory.setDispatcherThreads(1); + factory.setIsMetaQueue(true); LuceneEventListener listener = new LuceneEventListener(repositoryManager); String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java index 5ec2725..eff2813 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java @@ -6,6 +6,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -123,7 +124,7 @@ public class LuceneServiceImplJUnitTest { @Test public void testCreateIndexForPR() throws IOException, ParseException { getService(); - createPR("PR1", false); + LocalRegion userRegion = createPR("PR1", false); LuceneIndexImpl index1 = (LuceneIndexImpl)service.createIndex("index1", "PR1", "field1", "field2", "field3"); assertTrue(index1 instanceof LuceneIndexForPartitionedRegion); LuceneIndexForPartitionedRegion index1PR = (LuceneIndexForPartitionedRegion)index1; @@ -146,6 +147,9 @@ public class LuceneServiceImplJUnitTest { String aeqId = LuceneServiceImpl.getUniqueIndexName(index1.getName(), index1.getRegionPath()); AsyncEventQueueImpl aeq = (AsyncEventQueueImpl)cache.getAsyncEventQueue(aeqId); assertTrue(aeq != null); + + //Make sure our queue doesn't show up in the list of async event queues + assertEquals(Collections.emptySet(), cache.getAsyncEventQueues()); } @Test
