Hiding the lucene async event queue from the list of queues.

I added a new flag to the queue - isMetaQueue, which controls whether a
queue will show up in getAllQueues. This also has the effect of
preventing the queue from showing up in a generated xml file.

This is necessary because our queue needs to be constructed with an
async event listener that has a reference to the index. If the queue is
generated and added to the xml, it will end up trying to create a
listener before the index is created.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d88ef883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d88ef883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d88ef883

Branch: refs/heads/develop
Commit: d88ef883b23eb8dd465376ccb5038d789f797bb6
Parents: abe9d47
Author: Dan Smith <[email protected]>
Authored: Wed Oct 7 15:04:39 2015 -0700
Committer: Dan Smith <[email protected]>
Committed: Thu Oct 8 10:26:02 2015 -0700

----------------------------------------------------------------------
 .../internal/AsyncEventQueueFactoryImpl.java          |  9 +++++++--
 .../asyncqueue/internal/AsyncEventQueueImpl.java      |  3 +++
 .../gemfire/internal/cache/GemFireCacheImpl.java      | 14 ++++++++++++--
 .../internal/cache/wan/AbstractGatewaySender.java     |  7 +++++++
 .../internal/cache/wan/GatewaySenderAttributes.java   |  5 +++++
 .../internal/LuceneIndexForPartitionedRegion.java     |  7 ++++++-
 .../lucene/internal/LuceneServiceImplJUnitTest.java   |  6 +++++-
 7 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
 
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index f413218..caef0fc 100644
--- 
a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ 
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -184,8 +184,9 @@ public class AsyncEventQueueFactoryImpl implements 
AsyncEventQueueFactory {
           //  
AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
       addAsyncEventListener(listener);
       GatewaySender sender = 
create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
-      asyncEventQueue = new AsyncEventQueueImpl(sender, listener);
-      ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue);
+      AsyncEventQueueImpl queue = new AsyncEventQueueImpl(sender, listener);
+      asyncEventQueue = queue;
+      ((GemFireCacheImpl) cache).addAsyncEventQueue(queue);
     } else if (this.cache instanceof CacheCreation) {
       asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, 
listener);
       ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
@@ -282,4 +283,8 @@ public class AsyncEventQueueFactoryImpl implements 
AsyncEventQueueFactory {
     this.attrs.isHDFSQueue = isHDFSQueue;
     return this;
   }
+  public AsyncEventQueueFactory setIsMetaQueue(boolean isMetaQueue) {
+    this.attrs.isMetaQueue = isMetaQueue;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
 
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 71e8d2a..9a7698a 100644
--- 
a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ 
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -180,6 +180,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue 
{
     return sender.isParallel();
   }
 
+  public boolean isMetaQueue() {
+    return ((AbstractGatewaySender)sender).getIsMetaQueue();
+  }
 
   public void destroy() {
     ((AbstractGatewaySender)this.sender).destroy();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 6fe639b..cf93dac 100644
--- 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -387,6 +387,12 @@ public class GemFireCacheImpl implements InternalCache, 
ClientCache, HasCachePer
    * {@link #allGatewaySendersLock}
    */
   private volatile Set<GatewaySender> allGatewaySenders = 
Collections.emptySet();
+  
+  /**
+   * The list of all async event queues added to the cache. 
+   * CopyOnWriteArrayList is used to allow concurrent add, remove and 
retrieval operations.
+   */
+  private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new 
CopyOnWriteArraySet<AsyncEventQueue>();
 
   /**
    * The list of all async event queues added to the cache. 
@@ -3881,8 +3887,11 @@ public class GemFireCacheImpl implements InternalCache, 
ClientCache, HasCachePer
     }
   }
 
-  public void addAsyncEventQueue(AsyncEventQueue asyncQueue) {
+  public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
     this.allAsyncEventQueues.add(asyncQueue);
+    if(!asyncQueue.isMetaQueue()) {
+      this.allVisibleAsyncEventQueues.add(asyncQueue);
+    }
     system
         .handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue);
   }
@@ -3925,7 +3934,7 @@ public class GemFireCacheImpl implements InternalCache, 
ClientCache, HasCachePer
   }
 
   public Set<AsyncEventQueue> getAsyncEventQueues() {
-    return this.allAsyncEventQueues;
+    return this.allVisibleAsyncEventQueues;
   }
   
   public AsyncEventQueue getAsyncEventQueue(String id) {
@@ -3949,6 +3958,7 @@ public class GemFireCacheImpl implements InternalCache, 
ClientCache, HasCachePer
     // using gateway senders lock since async queue uses a gateway sender
     synchronized (allGatewaySendersLock) {
       this.allAsyncEventQueues.remove(asyncQueue);
+      this.allVisibleAsyncEventQueues.remove(asyncQueue);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 3bd2992..e49708f 100644
--- 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -152,6 +152,8 @@ public abstract class AbstractGatewaySender implements 
GatewaySender,
   
   protected boolean isHDFSQueue;
   
+  protected boolean isMetaQueue;
+  
   private int parallelismForReplicatedRegion;
   
   protected AbstractGatewaySenderEventProcessor eventProcessor;
@@ -252,6 +254,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender,
     this.myDSId = 
InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
     this.serialNumber = DistributionAdvisor.createSerialNumber();
     this.isHDFSQueue = attrs.isHDFSQueue();
+    this.isMetaQueue = attrs.isMetaQueue();
     if (!(this.cache instanceof CacheCreation)) {
       this.stopper = new Stopper(cache.getCancelCriterion());
       this.senderAdvisor = 
GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
@@ -476,6 +479,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender,
     return this.isHDFSQueue;
   }
   
+  public boolean getIsMetaQueue() {
+    return this.isMetaQueue;
+  }
+  
   public InternalDistributedSystem getSystem() {
     return (InternalDistributedSystem)this.cache.getDistributedSystem();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1d0b4f1..2df11aa 100644
--- 
a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ 
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -22,6 +22,7 @@ public class GatewaySenderAttributes {
 
   public static final boolean DEFAULT_IS_BUCKETSORTED = true;
   public static final boolean DEFAULT_IS_HDFSQUEUE = false;
+  public static final boolean DEFAULT_IS_META_QUEUE = false;
 
 
   public int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
@@ -73,6 +74,7 @@ public class GatewaySenderAttributes {
   public boolean isBucketSorted = 
GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
   
   public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
+  public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
   
   public int getSocketBufferSize() {
     return this.socketBufferSize;
@@ -183,4 +185,7 @@ public class GatewaySenderAttributes {
   public boolean isHDFSQueue() {
     return this.isHDFSQueue;
   }
+  public boolean isMetaQueue() {
+    return this.isMetaQueue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
 
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 60085e4..f9e2c1d 100644
--- 
a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ 
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -14,9 +14,11 @@ import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import 
com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
 import com.gemstone.gemfire.cache.lucene.LuceneIndex;
@@ -26,7 +28,9 @@ import 
com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
 import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionFactoryImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
 /* wrapper of IndexWriter */
@@ -102,13 +106,14 @@ public class LuceneIndexForPartitionedRegion extends 
LuceneIndexImpl {
       repositoryManager = new PartitionedRepositoryManager(dataRegion, 
(PartitionedRegion)fileRegion, (PartitionedRegion)chunkRegion, mapper, 
analyzer);
       
       // create AEQ, AEQ listner and specify the listener to repositoryManager
-      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+      AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) 
cache.createAsyncEventQueueFactory();
       if (withPersistence) {
         factory.setPersistent(true);
       }
       factory.setParallel(true); // parallel AEQ for PR
       factory.setMaximumQueueMemory(1000);
       factory.setDispatcherThreads(1);
+      factory.setIsMetaQueue(true);
       
       LuceneEventListener listener = new 
LuceneEventListener(repositoryManager);
       String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), 
regionPath);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
 
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 5ec2725..eff2813 100644
--- 
a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ 
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -6,6 +6,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -123,7 +124,7 @@ public class LuceneServiceImplJUnitTest {
   @Test
   public void testCreateIndexForPR() throws IOException, ParseException {
     getService();
-    createPR("PR1", false);
+    LocalRegion userRegion = createPR("PR1", false);
     LuceneIndexImpl index1 = (LuceneIndexImpl)service.createIndex("index1", 
"PR1", "field1", "field2", "field3");
     assertTrue(index1 instanceof LuceneIndexForPartitionedRegion);
     LuceneIndexForPartitionedRegion index1PR = 
(LuceneIndexForPartitionedRegion)index1;
@@ -146,6 +147,9 @@ public class LuceneServiceImplJUnitTest {
     String aeqId = LuceneServiceImpl.getUniqueIndexName(index1.getName(), 
index1.getRegionPath());
     AsyncEventQueueImpl aeq = 
(AsyncEventQueueImpl)cache.getAsyncEventQueue(aeqId);
     assertTrue(aeq != null);
+    
+    //Make sure our queue doesn't show up in the list of async event queues 
+    assertEquals(Collections.emptySet(), cache.getAsyncEventQueues());
   }
 
   @Test

Reply via email to