Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev ba9e9dbc7 -> a59133bb6


Refactored the index service into an async event service. These events will 
affect more than just indexing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a59133bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a59133bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a59133bb

Branch: refs/heads/two-dot-o-dev
Commit: a59133bb6061454540e5506bd0bc81793055a314
Parents: ba9e9db
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Apr 27 16:46:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Apr 27 16:46:27 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   6 +-
 .../corepersistence/CpEntityManager.java        |   8 +-
 .../corepersistence/CpEntityManagerFactory.java |   9 +-
 .../corepersistence/CpRelationManager.java      |  10 +-
 .../asyncevents/AsyncEventService.java          |  71 ++++
 .../asyncevents/AsyncIndexProvider.java         | 122 +++++++
 .../asyncevents/InMemoryAsyncEventService.java  | 116 ++++++
 .../asyncevents/SQSAsyncEventService.java       | 349 +++++++++++++++++++
 .../index/AsyncIndexProvider.java               | 120 -------
 .../index/AsyncIndexService.java                |  42 ---
 .../index/InMemoryAsyncIndexService.java        |  95 -----
 .../index/ReIndexServiceImpl.java               |   5 +-
 .../index/SQSAsyncIndexService.java             | 325 -----------------
 .../index/AsyncIndexServiceTest.java            |  14 +-
 .../index/InMemoryAsycIndexServiceTest.java     |   8 +-
 .../index/SQSAsyncEventServiceTest.java         |  84 +++++
 .../index/SQSAsyncIndexServiceTest.java         | 108 ------
 17 files changed, 766 insertions(+), 726 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ca20e23..b5544cb 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -21,8 +21,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
-import org.apache.usergrid.corepersistence.index.AsyncIndexProvider;
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
@@ -152,7 +152,7 @@ public class CoreModule  extends AbstractModule {
         bind( IndexService.class ).to( IndexServiceImpl.class );
         //bind the queue provider
 
-        bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class );
+        bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class );
 
         install( new GuicyFigModule( IndexProcessorFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2d419b6..6ffefe3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -66,7 +66,6 @@ import 
org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.FieldSet;
 import 
org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -80,7 +79,6 @@ import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -178,7 +176,7 @@ public class CpEntityManager implements EntityManager {
 
     private final CounterUtils counterUtils;
 
-    private final AsyncIndexService indexService;
+    private final AsyncEventService indexService;
 
     private PipelineBuilderFactory pipelineBuilderFactory;
 
@@ -222,7 +220,7 @@ public class CpEntityManager implements EntityManager {
      * @param entityCollectionManagerFactory
      * @param graphManagerFactory
      */
-    public CpEntityManager( final CassandraService cass, final CounterUtils 
counterUtils, final AsyncIndexService indexService, final ManagerCache 
managerCache,
+    public CpEntityManager( final CassandraService cass, final CounterUtils 
counterUtils, final AsyncEventService indexService, final ManagerCache 
managerCache,
                             final MetricsFactory metricsFactory,final 
PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b242401..6c375ef 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContextAware;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -51,7 +51,6 @@ import 
org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.Setup;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import 
org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -63,7 +62,6 @@ import 
org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
@@ -75,7 +73,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.utils.UUIDUtils;
 
-import com.amazonaws.services.elastictranscoder.model.Pipeline;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -126,7 +123,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private Injector injector;
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
-    private final AsyncIndexService indexService;
+    private final AsyncEventService indexService;
     private final PipelineBuilderFactory pipelineBuilderFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, 
final CounterUtils counterUtils,
@@ -139,7 +136,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.entityIndexFactory = 
injector.getInstance(EntityIndexFactory.class);
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
-        this.indexService = injector.getInstance( AsyncIndexService.class );
+        this.indexService = injector.getInstance( AsyncEventService.class );
         this.pipelineBuilderFactory = injector.getInstance( 
PipelineBuilderFactory.class );
         this.applicationIdCache = 
injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index e7ef0ff..aa0056c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -50,14 +50,12 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.SimpleRoleRef;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
@@ -68,7 +66,6 @@ import 
org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -85,7 +82,6 @@ import rx.functions.Func1;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
@@ -121,14 +117,14 @@ public class CpRelationManager implements RelationManager 
{
 
     private final ApplicationScope applicationScope;
 
-    private final AsyncIndexService indexService;
+    private final AsyncEventService indexService;
 
     private MetricsFactory metricsFactory;
     private Timer updateCollectionTimer;
 
 
     public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache,
-                              final PipelineBuilderFactory 
pipelineBuilderFactory, final AsyncIndexService indexService,
+                              final PipelineBuilderFactory 
pipelineBuilderFactory, final AsyncEventService indexService,
                               final EntityManager em, final UUID 
applicationId, final EntityRef headEntity ) {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
new file mode 100644
index 0000000..9fbed39
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.apache.usergrid.corepersistence.index.ReIndexAction;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Low level queue service for events in the entity.  These events are fire 
and forget, and will always be asynchronous
+ */
+public interface AsyncEventService extends ReIndexAction {
+
+
+    /**
+     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
+     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
+     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
+     * @param applicationScope
+     * @param entity The entity to index.  Should be fired when an entity is 
updated
+     */
+    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
+
+
+    /**
+     * Fired when a new edge is added to an entity. Such as initial entity 
creation, adding to a collection, or creating a connection
+     *
+     * @param applicationScope
+     * @param entity
+     * @param newEdge
+     */
+    void queueNewEdge(final ApplicationScope applicationScope, final Entity 
entity, final Edge newEdge);
+
+    /**
+     * Queue the deletion of an edge
+     * @param applicationScope
+     * @param edge
+     */
+    void queueDeleteEdge(final ApplicationScope applicationScope, final Edge 
edge);
+
+    /**
+     * The entity has been deleted, queue it's cleanup
+     * @param applicationScope
+     * @param entityId
+     */
+    void queueEntityDelete(final ApplicationScope applicationScope, final Id 
entityId);
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
new file mode 100644
index 0000000..9f801b4
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+
+/**
+ * A provider to allow users to configure their queue impl via properties
+ */
+@Singleton
+public class AsyncIndexProvider implements Provider<AsyncEventService> {
+
+    private final IndexProcessorFig indexProcessorFig;
+
+    private final QueueManagerFactory queueManagerFactory;
+    private final MetricsFactory metricsFactory;
+    private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+    private AsyncEventService asyncEventService;
+
+
+    @Inject
+    public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, 
final QueueManagerFactory queueManagerFactory,
+                               final MetricsFactory metricsFactory, final 
IndexService indexService,
+                               final RxTaskScheduler rxTaskScheduler,
+                               final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+        this.indexProcessorFig = indexProcessorFig;
+        this.queueManagerFactory = queueManagerFactory;
+        this.metricsFactory = metricsFactory;
+        this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    @Singleton
+    public AsyncEventService get() {
+        if ( asyncEventService == null ) {
+            asyncEventService = getIndexService();
+        }
+
+
+        return asyncEventService;
+    }
+
+
+    private AsyncEventService getIndexService() {
+        final String value = indexProcessorFig.getQueueImplementation();
+
+        final Implementations impl = Implementations.valueOf( value );
+
+        switch ( impl ) {
+            case LOCAL:
+                return new InMemoryAsyncEventService( indexService, 
rxTaskScheduler,
+                    entityCollectionManagerFactory, 
indexProcessorFig.resolveSynchronously());
+            case SQS:
+                return new SQSAsyncEventService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
+                    entityCollectionManagerFactory, rxTaskScheduler );
+            default:
+                throw new IllegalArgumentException( "Configuration value of " 
+ getErrorValues() + " are allowed" );
+        }
+    }
+
+
+    private String getErrorValues() {
+        String values = "";
+
+        for ( final Implementations impl : Implementations.values() ) {
+            values += impl + ", ";
+        }
+
+        values = values.substring( 0, values.length() - 2 );
+
+        return values;
+    }
+
+
+    /**
+     * Different implementations
+     */
+    public static enum Implementations {
+        TEST,
+        LOCAL,
+        SQS;
+
+
+        public String asString() {
+            return toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
new file mode 100644
index 0000000..2d842fb
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.exception.NotImplementedException;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+@Singleton
+public class InMemoryAsyncEventService implements AsyncEventService {
+
+    private static final Logger log = LoggerFactory.getLogger( 
InMemoryAsyncEventService.class );
+
+    private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final boolean resolveSynchronously;
+
+
+    @Inject
+    public InMemoryAsyncEventService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
+                                      final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                      boolean resolveSynchronously ) {
+        this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.resolveSynchronously = resolveSynchronously;
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity) {
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+
+
+        log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
+
+        final Observable<IndexOperationMessage> edgeObservable = 
indexService.indexEntity( applicationScope, entity );
+
+        //start it in the background on an i/o thread
+        if(!resolveSynchronously){
+            edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
);
+        }else {
+            edgeObservable.toBlocking().last();
+        }
+    }
+
+
+    @Override
+    public void queueNewEdge( final ApplicationScope applicationScope, final 
Entity entity, final Edge newEdge ) {
+        throw new NotImplementedException( "Implement me" );
+    }
+
+
+    @Override
+    public void queueDeleteEdge( final ApplicationScope applicationScope, 
final Edge edge ) {
+        throw new NotImplementedException( "Implement me" );
+    }
+
+
+    @Override
+    public void queueEntityDelete( final ApplicationScope applicationScope, 
final Id entityId ) {
+        throw new NotImplementedException( "Implement me" );
+    }
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+
+        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+
+        final Id entityId = entityIdScope.getId();
+
+        //load the entity
+        entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load( entityId )
+            //perform indexing on the task scheduler and start it
+            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ) )
+            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
new file mode 100644
index 0000000..415e5e8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+@Singleton
+public class SQSAsyncEventService implements AsyncEventService {
+
+
+    private static final Logger log = LoggerFactory.getLogger( 
SQSAsyncEventService.class );
+
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+
+    private static final int MAX_TAKE = 10;
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private final QueueManager queue;
+    private final IndexProcessorFig indexProcessorFig;
+    private final IndexService indexService;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
+
+    private final Timer readTimer;
+    private final Timer writeTimer;
+    private final Timer messageProcessingTimer;
+
+    private final Object mutex = new Object();
+
+
+    private final Counter indexErrorCounter;
+    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong inFlight = new AtomicLong();
+
+    //the actively running subscription
+    private List<Subscription> subscriptions = new ArrayList<>();
+
+
+    @Inject
+    public SQSAsyncEventService( final QueueManagerFactory queueManagerFactory,
+                                 final IndexProcessorFig indexProcessorFig, 
final MetricsFactory metricsFactory,
+                                 final IndexService indexService,
+                                 final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                 final RxTaskScheduler rxTaskScheduler ) {
+
+        this.indexService = indexService;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
+
+        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
+        this.indexProcessorFig = indexProcessorFig;
+
+        this.writeTimer = metricsFactory.getTimer( SQSAsyncEventService.class, 
"write" );
+        this.readTimer = metricsFactory.getTimer( SQSAsyncEventService.class, 
"read" );
+        this.messageProcessingTimer = metricsFactory.getTimer( 
SQSAsyncEventService.class, "message.processing" );
+        this.indexErrorCounter = metricsFactory.getCounter( 
SQSAsyncEventService.class, "error" );
+
+
+        //wire up the gauge of inflight messages
+        metricsFactory.addGauge( SQSAsyncEventService.class, "inflight.meter", 
new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        } );
+
+        start();
+    }
+
+
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer( final EntityIdScope operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessage( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Take messages from SQS
+     */
+    public List<QueueMessage> take() {
+
+        //SQS doesn't support more than 10
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return queue.getMessages( MAX_TAKE, 
indexProcessorFig.getIndexQueueTimeout(), 
indexProcessorFig.getIndexQueueTimeout(),
+                EntityIdScope.class );
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Ack messages in SQS
+     */
+    public void ack( final List<QueueMessage> messages ) {
+
+        /**
+         * No op
+         */
+        if ( messages.size() == 0 ) {
+            return;
+        }
+
+        queue.commitMessages( messages );
+    }
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+        //queue the re-inex operation
+        offer( entityIdScope );
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
+
+        //create our scope
+        final EntityIdScope entityIdScope = new EntityIdScope( 
applicationScope, entity.getId() );
+
+        //send it to SQS  for indexing
+        index( entityIdScope );
+    }
+
+
+    @Override
+    public void queueNewEdge( final ApplicationScope applicationScope, final 
Entity entity, final Edge newEdge ) {
+       throw new NotImplementedException( "Implement me" );
+    }
+
+
+    @Override
+    public void queueDeleteEdge( final ApplicationScope applicationScope, 
final Edge edge ) {
+        throw new NotImplementedException( "Implement me" );
+    }
+
+
+    @Override
+    public void queueEntityDelete( final ApplicationScope applicationScope, 
final Id entityId ) {
+        throw new NotImplementedException( "Implement me" );
+    }
+
+
+    /**
+     * Index an entity and return an observable of the queue message on success
+     */
+    private Observable<IndexOperationMessage> indexEntity( final QueueMessage 
queueMessage ) {
+        final EntityIdScope entityIdScope = ( EntityIdScope ) 
queueMessage.getBody();
+        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+
+        //run the index operation from the entity
+        return entityCollectionManager.load( entityIdScope.getId() )
+            //invoke the indexing and take the last value
+            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ).last() );
+    }
+
+
+    /**
+     * Do the indexing for a list of queue messages
+     */
+    private void doIndex( final List<QueueMessage> queueMessages ) {
+        //create parallel observables to process all 10 messages
+        final Observable<Long> observable = Observable.from( queueMessages 
).flatMap( ( QueueMessage queueMessage ) -> {
+            return indexEntity( queueMessage ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+        }, MAX_TAKE ).countLong()
+
+            //remove our in flight
+            .doOnNext( count -> inFlight.addAndGet( -1 * count ) )
+
+                //do on completed ack messages.  Assumes no expections were 
thrown
+            .doOnCompleted( () -> ack( queueMessages ) );
+
+        //wrap with our timer and fire
+        ObservableTimer.time( observable, messageProcessingTimer ).subscribe();
+    }
+
+
+    /**
+     * Loop throught and start the workers
+     */
+    public void start() {
+        final int count = indexProcessorFig.getWorkerCount();
+
+        for ( int i = 0; i < count; i++ ) {
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+
+            for ( final Subscription subscription : subscriptions ) {
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker() {
+        synchronized ( mutex ) {
+
+            Observable<List<QueueMessage>> consumer =
+                Observable.create( new 
Observable.OnSubscribe<List<QueueMessage>>() {
+                        @Override
+                        public void call( final Subscriber<? super 
List<QueueMessage>> subscriber ) {
+
+                            //name our thread so it's easy to see
+                            Thread.currentThread().setName( "QueueConsumer_" + 
counter.incrementAndGet() );
+
+                            List<QueueMessage> drainList = null;
+
+                            do {
+                                Timer.Context timer = readTimer.time();
+
+                                try {
+                                    drainList = take();
+
+                                    //emit our list in it's entirity to hand 
off to a worker pool
+                                    subscriber.onNext( drainList );
+
+                                    //take since  we're in flight
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+
+                                catch ( Throwable t ) {
+                                    final long sleepTime = 
indexProcessorFig.getFailureRetryTime();
+
+                                    log.error( "Failed to dequeue.  Sleeping 
for {} milliseconds", sleepTime, t );
+
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * 
drainList.size() );
+                                    }
+
+
+                                    try {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
+                                        //swallow
+                                    }
+
+                                    indexErrorCounter.inc();
+                                }
+
+                                finally{
+                                    timer.stop();
+                                }
+                            }
+                            while ( true );
+                        }
+                    } )
+                    //this won't block our read loop, just reads and proceeds
+                    .doOnNext( messages -> doIndex( messages ) ).subscribeOn( 
Schedulers.newThread() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribe();
+
+            subscriptions.add( subscription );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
deleted file mode 100644
index c4f34de..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-
-
-/**
- * A provider to allow users to configure their queue impl via properties
- */
-@Singleton
-public class AsyncIndexProvider implements Provider<AsyncIndexService> {
-
-    private final IndexProcessorFig indexProcessorFig;
-
-    private final QueueManagerFactory queueManagerFactory;
-    private final MetricsFactory metricsFactory;
-    private final IndexService indexService;
-    private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-
-    private AsyncIndexService asyncIndexService;
-
-
-    @Inject
-    public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, 
final QueueManagerFactory queueManagerFactory,
-                               final MetricsFactory metricsFactory, final 
IndexService indexService,
-                               final RxTaskScheduler rxTaskScheduler,
-                               final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
-        this.indexProcessorFig = indexProcessorFig;
-        this.queueManagerFactory = queueManagerFactory;
-        this.metricsFactory = metricsFactory;
-        this.indexService = indexService;
-        this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    @Singleton
-    public AsyncIndexService get() {
-        if ( asyncIndexService == null ) {
-            asyncIndexService = getIndexService();
-        }
-
-
-        return asyncIndexService;
-    }
-
-
-    private AsyncIndexService getIndexService() {
-        final String value = indexProcessorFig.getQueueImplementation();
-
-        final Implementations impl = Implementations.valueOf( value );
-
-        switch ( impl ) {
-            case LOCAL:
-                return new InMemoryAsyncIndexService( indexService, 
rxTaskScheduler,
-                    entityCollectionManagerFactory, 
indexProcessorFig.resolveSynchronously());
-            case SQS:
-                return new SQSAsyncIndexService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, rxTaskScheduler );
-            default:
-                throw new IllegalArgumentException( "Configuration value of " 
+ getErrorValues() + " are allowed" );
-        }
-    }
-
-
-    private String getErrorValues() {
-        String values = "";
-
-        for ( final Implementations impl : Implementations.values() ) {
-            values += impl + ", ";
-        }
-
-        values = values.substring( 0, values.length() - 2 );
-
-        return values;
-    }
-
-
-    /**
-     * Different implementations
-     */
-    public static enum Implementations {
-        TEST,
-        LOCAL,
-        SQS;
-
-
-        public String asString() {
-            return toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
deleted file mode 100644
index dac92c8..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Low level queue service for indexing entities
- */
-public interface AsyncIndexService extends ReIndexAction {
-
-
-    /**
-     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
-     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
-     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
-     * @param applicationScope
-     * @param entity The entity to index
-     */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
deleted file mode 100644
index 3f59b0c..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Observer;
-
-
-@Singleton
-public class InMemoryAsyncIndexService implements AsyncIndexService {
-
-    private static final Logger log = LoggerFactory.getLogger( 
InMemoryAsyncIndexService.class );
-
-    private final IndexService indexService;
-    private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final boolean resolveSynchronously;
-
-
-    @Inject
-    public InMemoryAsyncIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
-                                      final EntityCollectionManagerFactory 
entityCollectionManagerFactory, boolean resolveSynchronously ) {
-        this.indexService = indexService;
-        this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.resolveSynchronously = resolveSynchronously;
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity) {
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-
-        log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
-
-        final Observable<IndexOperationMessage> edgeObservable = 
indexService.indexEntity( applicationScope, entity );
-
-        //start it in the background on an i/o thread
-        if(!resolveSynchronously){
-            edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
);
-        }else {
-            edgeObservable.toBlocking().last();
-        }
-    }
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-
-        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
-
-        final Id entityId = entityIdScope.getId();
-
-        //load the entity
-        entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load( entityId )
-            //perform indexing on the task scheduler and start it
-            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ) )
-            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 5bf7957..be5bcab 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.index;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
@@ -63,14 +64,14 @@ public class ReIndexServiceImpl implements ReIndexService {
     private final IndexProcessorFig indexProcessorFig;
     private final RxTaskScheduler rxTaskScheduler;
     private final MapManager mapManager;
-    private final AsyncIndexService indexService;
+    private final AsyncEventService indexService;
 
 
     @Inject
     public ReIndexServiceImpl( final AllEntityIdsObservable 
allEntityIdsObservable,
                                final MapManagerFactory mapManagerFactory,
                                final AllApplicationsObservable 
allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
-                               final RxTaskScheduler rxTaskScheduler, final 
AsyncIndexService indexService ) {
+                               final RxTaskScheduler rxTaskScheduler, final 
AsyncEventService indexService ) {
         this.allEntityIdsObservable = allEntityIdsObservable;
         this.allApplicationsObservable = allApplicationsObservable;
         this.indexProcessorFig = indexProcessorFig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
deleted file mode 100644
index d060123..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-@Singleton
-public class SQSAsyncIndexService implements AsyncIndexService {
-
-
-    private static final Logger log = LoggerFactory.getLogger( 
SQSAsyncIndexService.class );
-
-    /**
-     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
-     */
-    public static final int TTL = 60 * 60 * 24 * 30;
-
-
-    private static final int MAX_TAKE = 10;
-
-    private static final String QUEUE_NAME = "es_queue";
-
-    private final QueueManager queue;
-    private final IndexProcessorFig indexProcessorFig;
-    private final IndexService indexService;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final RxTaskScheduler rxTaskScheduler;
-
-    private final Timer readTimer;
-    private final Timer writeTimer;
-    private final Timer messageProcessingTimer;
-
-    private final Object mutex = new Object();
-
-
-    private final Counter indexErrorCounter;
-    private final AtomicLong counter = new AtomicLong();
-    private final AtomicLong inFlight = new AtomicLong();
-
-    //the actively running subscription
-    private List<Subscription> subscriptions = new ArrayList<>();
-
-
-    @Inject
-    public SQSAsyncIndexService( final QueueManagerFactory 
queueManagerFactory, final IndexProcessorFig indexProcessorFig,
-                                 final MetricsFactory metricsFactory, final 
IndexService indexService,
-                                 final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                                 final RxTaskScheduler rxTaskScheduler ) {
-
-        this.indexService = indexService;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.rxTaskScheduler = rxTaskScheduler;
-
-        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
-        this.queue = queueManagerFactory.getQueueManager( queueScope );
-        this.indexProcessorFig = indexProcessorFig;
-
-        this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"write" );
-        this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"read" );
-        this.messageProcessingTimer = metricsFactory.getTimer( 
SQSAsyncIndexService.class, "message.processing" );
-        this.indexErrorCounter = metricsFactory.getCounter( 
SQSAsyncIndexService.class, "error" );
-
-
-        //wire up the gauge of inflight messages
-        metricsFactory.addGauge( SQSAsyncIndexService.class, "inflight.meter", 
new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return inFlight.longValue();
-            }
-        } );
-
-        start();
-    }
-
-
-    /**
-     * Offer the EntityIdScope to SQS
-     */
-    private void offer( final EntityIdScope operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessage( operation );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Take messages from SQS
-     */
-    public List<QueueMessage> take() {
-
-        //SQS doesn't support more than 10
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return queue.getMessages( MAX_TAKE, 
indexProcessorFig.getIndexQueueTimeout(), 
indexProcessorFig.getIndexQueueTimeout(),
-                EntityIdScope.class );
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Ack messages in SQS
-     */
-    public void ack( final List<QueueMessage> messages ) {
-
-        /**
-         * No op
-         */
-        if ( messages.size() == 0 ) {
-            return;
-        }
-
-        queue.commitMessages( messages );
-    }
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-        //queue the re-inex operation
-        offer( entityIdScope );
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
-
-        //create our scope
-        final EntityIdScope entityIdScope = new EntityIdScope( 
applicationScope, entity.getId() );
-
-        //send it to SQS  for indexing
-        index( entityIdScope );
-    }
-
-
-    /**
-     * Index an entity and return an observable of the queue message on success
-     */
-    private Observable<IndexOperationMessage> indexEntity( final QueueMessage 
queueMessage ) {
-        final EntityIdScope entityIdScope = ( EntityIdScope ) 
queueMessage.getBody();
-        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
-
-
-        //run the index operation from the entity
-        return entityCollectionManager.load( entityIdScope.getId() )
-            //invoke the indexing and take the last value
-            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ).last() );
-    }
-
-
-    /**
-     * Do the indexing for a list of queue messages
-     */
-    private void doIndex( final List<QueueMessage> queueMessages ) {
-        //create parallel observables to process all 10 messages
-        final Observable<Long> observable = Observable.from( queueMessages 
).flatMap( ( QueueMessage queueMessage ) -> {
-            return indexEntity( queueMessage ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
-        }, MAX_TAKE ).countLong()
-
-            //remove our in flight
-            .doOnNext( count -> inFlight.addAndGet( -1 * count ) )
-
-                //do on completed ack messages.  Assumes no expections were 
thrown
-            .doOnCompleted( () -> ack( queueMessages ) );
-
-        //wrap with our timer and fire
-        ObservableTimer.time( observable, messageProcessingTimer ).subscribe();
-    }
-
-
-    /**
-     * Loop throught and start the workers
-     */
-    public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
-
-        for ( int i = 0; i < count; i++ ) {
-            startWorker();
-        }
-    }
-
-
-    /**
-     * Stop the workers
-     */
-    public void stop() {
-        synchronized ( mutex ) {
-            //stop consuming
-
-            for ( final Subscription subscription : subscriptions ) {
-                subscription.unsubscribe();
-            }
-        }
-    }
-
-
-    private void startWorker() {
-        synchronized ( mutex ) {
-
-            Observable<List<QueueMessage>> consumer =
-                Observable.create( new 
Observable.OnSubscribe<List<QueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super 
List<QueueMessage>> subscriber ) {
-
-                            //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + 
counter.incrementAndGet() );
-
-                            List<QueueMessage> drainList = null;
-
-                            do {
-                                Timer.Context timer = readTimer.time();
-
-                                try {
-                                    drainList = take();
-
-                                    //emit our list in it's entirity to hand 
off to a worker pool
-                                    subscriber.onNext( drainList );
-
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
-                                }
-
-                                catch ( Throwable t ) {
-                                    final long sleepTime = 
indexProcessorFig.getFailureRetryTime();
-
-                                    log.error( "Failed to dequeue.  Sleeping 
for {} milliseconds", sleepTime, t );
-
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * 
drainList.size() );
-                                    }
-
-
-                                    try {
-                                        Thread.sleep( sleepTime );
-                                    }
-                                    catch ( InterruptedException ie ) {
-                                        //swallow
-                                    }
-
-                                    indexErrorCounter.inc();
-                                }
-
-                                finally{
-                                    timer.stop();
-                                }
-                            }
-                            while ( true );
-                        }
-                    } )
-                    //this won't block our read loop, just reads and proceeds
-                    .doOnNext( messages -> doIndex( messages ) ).subscribeOn( 
Schedulers.newThread() );
-
-            //start in the background
-
-            final Subscription subscription = consumer.subscribe();
-
-            subscriptions.add( subscription );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index faef848..8dfa971 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -28,13 +28,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
@@ -43,7 +41,6 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
@@ -53,7 +50,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
 
@@ -89,18 +85,18 @@ public abstract class AsyncIndexServiceTest {
     public EntityIndexFactory entityIndexFactory;
 
 
-    private AsyncIndexService asyncIndexService;
+    private AsyncEventService asyncEventService;
 
 
     /**
      * Get the async index service
      */
-    protected abstract AsyncIndexService getAsyncIndexService();
+    protected abstract AsyncEventService getAsyncEventService();
 
 
     @Before
     public void setup() {
-        asyncIndexService = getAsyncIndexService();
+        asyncEventService = getAsyncEventService();
     }
 
 
@@ -141,7 +137,7 @@ public abstract class AsyncIndexServiceTest {
         }, 10 ).toBlocking().last();
 
 
-        asyncIndexService.queueEntityIndexUpdate( applicationScope, testEntity 
);
+        asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity 
);
 
 
         //        Thread.sleep( 1000000000000l );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index e3c59c0..2860c89 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -24,12 +24,12 @@ import org.junit.Rule;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import 
org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService;
 import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
 
@@ -53,8 +53,8 @@ public class InMemoryAsycIndexServiceTest extends 
AsyncIndexServiceTest {
 
 
     @Override
-    protected AsyncIndexService getAsyncIndexService() {
-        return  new InMemoryAsyncIndexService( indexService, rxTaskScheduler, 
entityCollectionManagerFactory,false  );
+    protected AsyncEventService getAsyncEventService() {
+        return  new InMemoryAsyncEventService( indexService, rxTaskScheduler, 
entityCollectionManagerFactory,false  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java
new file mode 100644
index 0000000..dff88cb
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncEventServiceTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.SQSAsyncEventService;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
+public class SQSAsyncEventServiceTest extends AsyncIndexServiceTest {
+
+
+
+    @Rule
+    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
+
+
+    @Inject
+    public QueueManagerFactory queueManagerFactory;
+
+    @Inject
+    public IndexProcessorFig indexProcessorFig;
+
+
+    @Inject
+    public MetricsFactory metricsFactory;
+
+    @Inject
+    public IndexService indexService;
+
+    @Inject
+    public RxTaskScheduler rxTaskScheduler;
+
+
+    @Override
+    protected AsyncEventService getAsyncEventService() {
+        return  new SQSAsyncEventService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
+                    entityCollectionManagerFactory, rxTaskScheduler );
+    }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a59133bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
deleted file mode 100644
index aa95ae8..0000000
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.EsRunner;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-
-import com.google.inject.Inject;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-@RunWith( EsRunner.class )
-@UseModules( { TestIndexModule.class } )
-@NotThreadSafe
-public class SQSAsyncIndexServiceTest extends AsyncIndexServiceTest {
-
-
-
-    @Rule
-    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
-
-
-
-    @Inject
-    public QueueManagerFactory queueManagerFactory;
-
-    @Inject
-    public IndexProcessorFig indexProcessorFig;
-
-
-    @Inject
-    public MetricsFactory metricsFactory;
-
-    @Inject
-    public IndexService indexService;
-
-    @Inject
-    public RxTaskScheduler rxTaskScheduler;
-
-
-    @Override
-    protected AsyncIndexService getAsyncIndexService() {
-        return  new SQSAsyncIndexService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, rxTaskScheduler );
-    }
-
-
-
-
-
-}

Reply via email to