Updated tests and providers for testing all async index services and index 
service itself


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f4d0d1a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f4d0d1a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f4d0d1a9

Branch: refs/heads/two-dot-o-dev
Commit: f4d0d1a95d8a0f5fc27eae732e61cab3d4940783
Parents: 2a58270
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Apr 22 14:23:35 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Apr 22 14:23:35 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   8 +-
 .../corepersistence/CpEntityManager.java        |   6 +-
 .../corepersistence/CpEntityManagerFactory.java |   6 +-
 .../corepersistence/CpRelationManager.java      |   6 +-
 .../index/AsyncIndexProvider.java               |  24 +-
 .../index/AsyncIndexService.java                |  42 +++
 .../index/AsyncReIndexService.java              |  42 ---
 .../index/InMemoryAsyncIndexService.java        |  88 +++++
 .../index/InMemoryAsyncReIndexService.java      | 108 ------
 .../index/IndexProcessorFig.java                |  86 +++++
 .../corepersistence/index/IndexService.java     |   6 +-
 .../corepersistence/index/IndexServiceImpl.java |   9 +-
 .../corepersistence/index/QueryFig.java         | 103 ------
 .../index/ReIndexServiceImpl.java               |  12 +-
 .../index/SQSAsyncIndexService.java             | 325 +++++++++++++++++++
 .../index/SQSAsyncReIndexService.java           | 269 ---------------
 .../corepersistence/util/CpNamingUtils.java     |   8 +-
 .../util/SerializableMapper.java                |   4 +-
 .../index/AsyncIndexServiceTest.java            | 203 ++++++++++++
 .../index/InMemoryAsycIndexServiceTest.java     |  64 ++++
 .../corepersistence/index/IndexServiceTest.java | 233 ++++++++++++-
 .../index/SQSAsyncIndexServiceTest.java         | 143 +++-----
 .../impl/migration/EntityIdScope.java           |  40 ++-
 .../migration/data/MigrationRelationship.java   |  29 +-
 .../migration/data/VersionedMigrationSet.java   |  31 +-
 .../persistence/core/rx/RxSchedulerFig.java     |   4 +-
 .../core/scope/ApplicationScope.java            |   4 +
 .../core/scope/ApplicationScopeImpl.java        |  21 +-
 .../data/VersionedMigrationSetTest.java         |   8 +-
 .../usergrid/persistence/model/entity/Id.java   |   3 +
 .../persistence/model/entity/SimpleId.java      |  37 ++-
 .../index/impl/IndexOperationMessage.java       |   8 -
 32 files changed, 1259 insertions(+), 721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 51972d8..06fe058 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -22,10 +22,10 @@ import 
org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
 import org.apache.usergrid.corepersistence.index.AsyncIndexProvider;
-import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
+import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
-import org.apache.usergrid.corepersistence.index.QueryFig;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin;
 import org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -151,9 +151,9 @@ public class CoreModule  extends AbstractModule {
         bind( IndexService.class ).to( IndexServiceImpl.class );
         //bind the queue provider
 
-        bind( AsyncReIndexService.class).toProvider( AsyncIndexProvider.class 
);
+        bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class );
 
-        install( new GuicyFigModule( QueryFig.class ) );
+        install( new GuicyFigModule( IndexProcessorFig.class ) );
 
 
         install( new GuicyFigModule( ApplicationIdCacheFig.class ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 72ca955..23514c8 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
+import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
@@ -175,7 +175,7 @@ public class CpEntityManager implements EntityManager {
 
     private final CounterUtils counterUtils;
 
-    private final AsyncReIndexService indexService;
+    private final AsyncIndexService indexService;
 
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
@@ -215,7 +215,7 @@ public class CpEntityManager implements EntityManager {
      * @param metricsFactory
      * @param applicationId
      */
-    public CpEntityManager(final CassandraService cass, final CounterUtils 
counterUtils, final AsyncReIndexService indexService, final ManagerCache 
managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
+    public CpEntityManager(final CassandraService cass, final CounterUtils 
counterUtils, final AsyncIndexService indexService, final ManagerCache 
managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
 
         Preconditions.checkNotNull( cass, "cass must not be null" );
         Preconditions.checkNotNull( counterUtils, "counterUtils must not be 
null" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 46db3f8..f08bce4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -33,7 +33,7 @@ import org.springframework.context.ApplicationContextAware;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
+import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
@@ -122,7 +122,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private Injector injector;
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
-    private final AsyncReIndexService indexService;
+    private final AsyncIndexService indexService;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, 
final CounterUtils counterUtils,
                                    final Injector injector) {
@@ -134,7 +134,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.entityIndexFactory = 
injector.getInstance(EntityIndexFactory.class);
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
-        this.indexService = injector.getInstance( AsyncReIndexService.class );
+        this.indexService = injector.getInstance( AsyncIndexService.class );
         this.applicationIdCache = 
injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 8f125ad..ea554a3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
+import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import 
org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
 import 
org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
@@ -118,13 +118,13 @@ public class CpRelationManager implements RelationManager 
{
 
     private final ApplicationScope applicationScope;
 
-    private final AsyncReIndexService indexService;
+    private final AsyncIndexService indexService;
 
     private MetricsFactory metricsFactory;
     private Timer updateCollectionTimer;
 
 
-    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache, final AsyncReIndexService indexService, final 
EntityManager em, final UUID applicationId, final EntityRef headEntity) {
+    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache, final AsyncIndexService indexService, final 
EntityManager em, final UUID applicationId, final EntityRef headEntity) {
 
 
         Assert.notNull( em, "Entity manager cannot be null" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
index 18df824..0043166 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -35,9 +34,9 @@ import com.google.inject.Singleton;
  * A provider to allow users to configure their queue impl via properties
  */
 @Singleton
-public class AsyncIndexProvider implements Provider<AsyncReIndexService> {
+public class AsyncIndexProvider implements Provider<AsyncIndexService> {
 
-    private final QueryFig queryFig;
+    private final IndexProcessorFig indexProcessorFig;
 
     private final QueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
@@ -45,15 +44,15 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
 
-    private AsyncReIndexService asyncIndexService;
+    private AsyncIndexService asyncIndexService;
 
 
     @Inject
-    public AsyncIndexProvider( final QueryFig queryFig, final 
QueueManagerFactory queueManagerFactory,
+    public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, 
final QueueManagerFactory queueManagerFactory,
                                final MetricsFactory metricsFactory, final 
IndexService indexService,
                                final RxTaskScheduler rxTaskScheduler,
                                final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
-        this.queryFig = queryFig;
+        this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
@@ -64,7 +63,7 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
 
     @Override
     @Singleton
-    public AsyncReIndexService get() {
+    public AsyncIndexService get() {
         if ( asyncIndexService == null ) {
             asyncIndexService = getIndexService();
         }
@@ -74,17 +73,18 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
     }
 
 
-    private AsyncReIndexService getIndexService() {
-        final String value = queryFig.getQueueImplementation();
+    private AsyncIndexService getIndexService() {
+        final String value = indexProcessorFig.getQueueImplementation();
 
         final Implementations impl = Implementations.valueOf( value );
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncReIndexService( indexService, 
rxTaskScheduler,
-                    entityCollectionManagerFactory, metricsFactory );
+                return new InMemoryAsyncIndexService( indexService, 
rxTaskScheduler,
+                    entityCollectionManagerFactory );
             case SQS:
-                return new SQSAsyncReIndexService( queueManagerFactory, 
queryFig, metricsFactory );
+                return new SQSAsyncIndexService( queueManagerFactory, 
indexProcessorFig, metricsFactory, indexService,
+                    entityCollectionManagerFactory, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException( "Configuration value of " 
+ getErrorValues() + " are allowed" );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
new file mode 100644
index 0000000..dac92c8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Low level queue service for indexing entities
+ */
+public interface AsyncIndexService extends ReIndexAction {
+
+
+    /**
+     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
+     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
+     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
+     * @param applicationScope
+     * @param entity The entity to index
+     */
+    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
deleted file mode 100644
index c6eedd7..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Low level queue service for indexing entities
- */
-public interface AsyncReIndexService extends ReIndexAction {
-
-
-    /**
-     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
-     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
-     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
-     * @param applicationScope
-     * @param entity The entity to index
-     */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
new file mode 100644
index 0000000..e8d178c
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+@Singleton
+public class InMemoryAsyncIndexService implements AsyncIndexService {
+
+    private static final Logger log = LoggerFactory.getLogger( 
InMemoryAsyncIndexService.class );
+
+    private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+
+    @Inject
+    public InMemoryAsyncIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
+                                      final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+        this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+
+
+        log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
+
+        final Observable<IndexOperationMessage> edgeObservable = 
indexService.indexEntity( applicationScope, entity );
+
+        //start it in the background on an i/o thread
+        edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
).subscribe();
+    }
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+
+        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+
+        final Id entityId = entityIdScope.getId();
+
+        //load the entity
+        entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load( entityId )
+            //perform indexing on the task scheduler and start it
+            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ) )
+            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
deleted file mode 100644
index 4908945..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-
-
-@Singleton
-public class InMemoryAsyncReIndexService implements AsyncReIndexService {
-
-    private static final Logger log = LoggerFactory.getLogger( 
InMemoryAsyncReIndexService.class );
-    private final IndexService indexService;
-    private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final Timer timer;
-
-
-    @Inject
-    public InMemoryAsyncReIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
-                                        final EntityCollectionManagerFactory 
entityCollectionManagerFactory, final
-                                        MetricsFactory metricsFactory ) {
-        this.indexService = indexService;
-        this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-
-        timer = metricsFactory.getTimer( InMemoryAsyncReIndexService.class, 
"IndexTimer" );
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-
-        log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
-
-        final Observable<IndexOperationMessage> edgeObservable = 
indexService.indexEntity( applicationScope, entity );
-
-
-
-        edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
).subscribe();
-
-         //now start it
-//        final Timer.Context time = timer.time();
-//
-//        edgeObservable.connect();
-//
-//        time.stop();
-
-
-    }
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-
-        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
-
-        final Id entityId = entityIdScope.getId();
-
-        final Entity entity =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load( entityId ).toBlocking()
-                                          .lastOrDefault( null );
-
-
-        if ( entity == null ) {
-            log.warn( "Could not find entity with id {} in app scope {} ", 
entityId, applicationScope );
-        }
-
-        indexService.indexEntity( applicationScope, entity );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
new file mode 100644
index 0000000..1e8abff
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Application id cache fig
+ */
+public interface IndexProcessorFig extends GuicyFig {
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
+     * backpressure
+     */
+    String FAILURE_REJECTED_RETRY_WAIT_TIME = 
"elasticsearch.rejected_retry_wait";
+
+    /**
+     * The number of worker threads to consume from the queue
+     */
+    String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
+
+    /**
+     * The queue implementation to use.  Values come from 
<class>QueueProvider.Implementations</class>
+     */
+    String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
+
+
+    /**
+     * The queue implementation to use.  Values come from 
<class>QueueProvider.Implementations</class>
+     */
+    String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = 
"elasticsearch.queue.offer_timeout";
+
+    /**
+     * Amount of time to wait when reading from the queue
+     */
+    String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
+
+
+    @Default( "1000" )
+    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+    long getFailureRetryTime();
+
+    //give us 60 seconds to process the message
+    @Default( "60" )
+    @Key( INDEX_QUEUE_READ_TIMEOUT )
+    int getIndexQueueTimeout();
+
+    @Default( "1" )
+    @Key( ELASTICSEARCH_WORKER_COUNT )
+    int getWorkerCount();
+
+    @Default( "LOCAL" )
+    @Key( ELASTICSEARCH_QUEUE_IMPL )
+    String getQueueImplementation();
+
+
+    @Default("30000")
+    @Key("elasticsearch.reindex.sample.interval")
+    long getReIndexSampleInterval();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 5e9392b..18ab2b7 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -41,9 +41,9 @@ public interface IndexService {
      * @param applicationScope The scope of the entity
      * @param entity The entity
      *
-     * @return A ConnectableObservable with every edge in the batch to index 
the entity.  Note that this a cold observable
-     * and must be subscribed to, then "connect" in order to perform the 
operation.  This also makes no assumptions on scheduling.  It is up to the 
caller
-     * to assign the correct scheduler to the observable
+     * @return An Observable with executed batch future as an observable.  
Note that this a cold observable
+     * and must be subscribed to in order to perform the index operations.  
This also makes no assumptions on scheduling.  It is up to the caller
+     * to assign the correct scheduler to the observable based on their 
threading needs
      */
     Observable<IndexOperationMessage> indexEntity( final ApplicationScope 
applicationScope, final Entity entity );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 87e22f4..81bf6cb 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -99,7 +99,7 @@ public class IndexServiceImpl implements IndexService {
 
 
         //we might or might not need to index from target-> source
-        final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, 
entityId );
+        final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, 
entityId );
 
 
         //merge the edges together
@@ -129,12 +129,13 @@ public class IndexServiceImpl implements IndexService {
 
 
     /**
-     * Get index edges to the target
+     * Get index edges to the target.  Used in only certain entity types, such 
as roles, users, groups etc
+     * where we doubly index on both directions of the edge
      *
      * @param graphManager The graph manager
-     * @param entityId The entitie's id
+     * @param entityId The entity's id
      */
-    private Observable<IndexEdge> getIndexEdgesToTarget( final GraphManager 
graphManager, final Id entityId ) {
+    private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager 
graphManager, final Id entityId ) {
 
         final String collectionName = InflectionUtils.pluralize( 
entityId.getType() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
deleted file mode 100644
index 82ed496..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- * Application id cache fig
- */
-public interface QueryFig extends GuicyFig {
-
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
-     * backpressure
-     */
-    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = 
"elasticsearch.rejected_retry_wait";
-
-    /**
-     * The number of worker threads to consume from the queue
-     */
-    public static final String ELASTICSEARCH_WORKER_COUNT = 
"elasticsearch.worker_count";
-
-    /**
-     * The queue implementation to use.  Values come from 
<class>QueueProvider.Implementations</class>
-     */
-    public static final String ELASTICSEARCH_QUEUE_IMPL = 
"elasticsearch.queue_impl";
-
-
-    /**
-     * The queue implementation to use.  Values come from 
<class>QueueProvider.Implementations</class>
-     */
-    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = 
"elasticsearch.queue.offer_timeout";
-
-    /**
-     * Amount of time to wait when reading from the queue
-     */
-    public static final String INDEX_QUEUE_READ_TIMEOUT = 
"elasticsearch.queue_read_timeout";
-
-    /**
-     * Amount of time to wait when reading from the queue in milliseconds
-     */
-    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = 
"elasticsearch.queue_transaction_timeout";
-
-
-    String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
-
-
-    @Default( "1000" )
-    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
-    long getFailureRetryTime();
-
-    //give us 60 seconds to process the message
-    @Default( "60" )
-    @Key( INDEX_QUEUE_READ_TIMEOUT )
-    int getIndexQueueTimeout();
-
-    @Default( "2" )
-    @Key( ELASTICSEARCH_WORKER_COUNT )
-    int getWorkerCount();
-
-    @Default( "LOCAL" )
-    @Key( ELASTICSEARCH_QUEUE_IMPL )
-    String getQueueImplementation();
-
-    @Default( "1000" )
-    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
-    long getQueueOfferTimeout();
-
-    /**
-     * size of the buffer to build up before you send results
-     */
-    @Default( "1000" )
-    @Key( INDEX_QUEUE_SIZE )
-    int getIndexQueueSize();
-
-
-    @Default("30000")
-    @Key("elasticsearch.reindex.sample.interval")
-    long getReIndexSampleInterval();
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 3553c87..5bf7957 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -60,20 +60,20 @@ public class ReIndexServiceImpl implements ReIndexService {
 
     private final AllApplicationsObservable allApplicationsObservable;
     private final AllEntityIdsObservable allEntityIdsObservable;
-    private final QueryFig queryFig;
+    private final IndexProcessorFig indexProcessorFig;
     private final RxTaskScheduler rxTaskScheduler;
     private final MapManager mapManager;
-    private final AsyncReIndexService indexService;
+    private final AsyncIndexService indexService;
 
 
     @Inject
     public ReIndexServiceImpl( final AllEntityIdsObservable 
allEntityIdsObservable,
                                final MapManagerFactory mapManagerFactory,
-                               final AllApplicationsObservable 
allApplicationsObservable, final QueryFig queryFig,
-                               final RxTaskScheduler rxTaskScheduler, final 
AsyncReIndexService indexService ) {
+                               final AllApplicationsObservable 
allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
+                               final RxTaskScheduler rxTaskScheduler, final 
AsyncIndexService indexService ) {
         this.allEntityIdsObservable = allEntityIdsObservable;
         this.allApplicationsObservable = allApplicationsObservable;
-        this.queryFig = queryFig;
+        this.indexProcessorFig = indexProcessorFig;
         this.rxTaskScheduler = rxTaskScheduler;
         this.indexService = indexService;
 
@@ -108,7 +108,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         //start our sampler and state persistence
         //take a sample every sample interval to allow us to resume state with 
minimal loss
-        runningReIndex.sample( queryFig.getReIndexSampleInterval(), 
TimeUnit.MILLISECONDS,
+        runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), 
TimeUnit.MILLISECONDS,
             rxTaskScheduler.getAsyncIOScheduler() )
             .doOnNext( edge -> {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
new file mode 100644
index 0000000..d060123
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+@Singleton
+public class SQSAsyncIndexService implements AsyncIndexService {
+
+
+    private static final Logger log = LoggerFactory.getLogger( 
SQSAsyncIndexService.class );
+
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+
+    private static final int MAX_TAKE = 10;
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private final QueueManager queue;
+    private final IndexProcessorFig indexProcessorFig;
+    private final IndexService indexService;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
+
+    private final Timer readTimer;
+    private final Timer writeTimer;
+    private final Timer messageProcessingTimer;
+
+    private final Object mutex = new Object();
+
+
+    private final Counter indexErrorCounter;
+    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong inFlight = new AtomicLong();
+
+    //the actively running subscription
+    private List<Subscription> subscriptions = new ArrayList<>();
+
+
+    @Inject
+    public SQSAsyncIndexService( final QueueManagerFactory 
queueManagerFactory, final IndexProcessorFig indexProcessorFig,
+                                 final MetricsFactory metricsFactory, final 
IndexService indexService,
+                                 final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                 final RxTaskScheduler rxTaskScheduler ) {
+
+        this.indexService = indexService;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
+
+        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
+        this.indexProcessorFig = indexProcessorFig;
+
+        this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"write" );
+        this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"read" );
+        this.messageProcessingTimer = metricsFactory.getTimer( 
SQSAsyncIndexService.class, "message.processing" );
+        this.indexErrorCounter = metricsFactory.getCounter( 
SQSAsyncIndexService.class, "error" );
+
+
+        //wire up the gauge of inflight messages
+        metricsFactory.addGauge( SQSAsyncIndexService.class, "inflight.meter", 
new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        } );
+
+        start();
+    }
+
+
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer( final EntityIdScope operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessage( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Take messages from SQS
+     */
+    public List<QueueMessage> take() {
+
+        //SQS doesn't support more than 10
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return queue.getMessages( MAX_TAKE, 
indexProcessorFig.getIndexQueueTimeout(), 
indexProcessorFig.getIndexQueueTimeout(),
+                EntityIdScope.class );
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Ack messages in SQS
+     */
+    public void ack( final List<QueueMessage> messages ) {
+
+        /**
+         * No op
+         */
+        if ( messages.size() == 0 ) {
+            return;
+        }
+
+        queue.commitMessages( messages );
+    }
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+        //queue the re-inex operation
+        offer( entityIdScope );
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
+
+        //create our scope
+        final EntityIdScope entityIdScope = new EntityIdScope( 
applicationScope, entity.getId() );
+
+        //send it to SQS  for indexing
+        index( entityIdScope );
+    }
+
+
+    /**
+     * Index an entity and return an observable of the queue message on success
+     */
+    private Observable<IndexOperationMessage> indexEntity( final QueueMessage 
queueMessage ) {
+        final EntityIdScope entityIdScope = ( EntityIdScope ) 
queueMessage.getBody();
+        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+
+        //run the index operation from the entity
+        return entityCollectionManager.load( entityIdScope.getId() )
+            //invoke the indexing and take the last value
+            .flatMap( entity -> indexService.indexEntity( applicationScope, 
entity ).last() );
+    }
+
+
+    /**
+     * Do the indexing for a list of queue messages
+     */
+    private void doIndex( final List<QueueMessage> queueMessages ) {
+        //create parallel observables to process all 10 messages
+        final Observable<Long> observable = Observable.from( queueMessages 
).flatMap( ( QueueMessage queueMessage ) -> {
+            return indexEntity( queueMessage ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+        }, MAX_TAKE ).countLong()
+
+            //remove our in flight
+            .doOnNext( count -> inFlight.addAndGet( -1 * count ) )
+
+                //do on completed ack messages.  Assumes no expections were 
thrown
+            .doOnCompleted( () -> ack( queueMessages ) );
+
+        //wrap with our timer and fire
+        ObservableTimer.time( observable, messageProcessingTimer ).subscribe();
+    }
+
+
+    /**
+     * Loop throught and start the workers
+     */
+    public void start() {
+        final int count = indexProcessorFig.getWorkerCount();
+
+        for ( int i = 0; i < count; i++ ) {
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+
+            for ( final Subscription subscription : subscriptions ) {
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker() {
+        synchronized ( mutex ) {
+
+            Observable<List<QueueMessage>> consumer =
+                Observable.create( new 
Observable.OnSubscribe<List<QueueMessage>>() {
+                        @Override
+                        public void call( final Subscriber<? super 
List<QueueMessage>> subscriber ) {
+
+                            //name our thread so it's easy to see
+                            Thread.currentThread().setName( "QueueConsumer_" + 
counter.incrementAndGet() );
+
+                            List<QueueMessage> drainList = null;
+
+                            do {
+                                Timer.Context timer = readTimer.time();
+
+                                try {
+                                    drainList = take();
+
+                                    //emit our list in it's entirity to hand 
off to a worker pool
+                                    subscriber.onNext( drainList );
+
+                                    //take since  we're in flight
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+
+                                catch ( Throwable t ) {
+                                    final long sleepTime = 
indexProcessorFig.getFailureRetryTime();
+
+                                    log.error( "Failed to dequeue.  Sleeping 
for {} milliseconds", sleepTime, t );
+
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * 
drainList.size() );
+                                    }
+
+
+                                    try {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
+                                        //swallow
+                                    }
+
+                                    indexErrorCounter.inc();
+                                }
+
+                                finally{
+                                    timer.stop();
+                                }
+                            }
+                            while ( true );
+                        }
+                    } )
+                    //this won't block our read loop, just reads and proceeds
+                    .doOnNext( messages -> doIndex( messages ) ).subscribeOn( 
Schedulers.newThread() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribe();
+
+            subscriptions.add( subscription );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
deleted file mode 100644
index 60a804c..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class SQSAsyncReIndexService implements AsyncReIndexService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger( 
SQSAsyncReIndexService.class );
-
-    /** Hacky, copied from CPEntityManager b/c we can't access it here */
-    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( 
"b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
-
-
-    /**
-     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
-     */
-    public static final int TTL = 60 * 60 * 24 * 30;
-
-    /**
-     * The name to put in the map
-     */
-    public static final String MAP_NAME = "esqueuedata";
-
-
-    private static final String QUEUE_NAME = "es_queue";
-
-    private static SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    static {
-        SMILE_FACTORY.delegateToTextual( true );
-    }
-
-
-    private final QueueManager queue;
-    private final QueryFig queryFig;
-    private final ObjectMapper mapper;
-    private final Meter readMeter;
-    private final Timer readTimer;
-    private final Meter writeMeter;
-    private final Timer writeTimer;
-
-
-    @Inject
-    public SQSAsyncReIndexService( final QueueManagerFactory 
queueManagerFactory, final QueryFig queryFig,
-                                   final MetricsFactory metricsFactory ) {
-        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
-
-        this.queue = queueManagerFactory.getQueueManager( queueScope );
-        this.queryFig = queryFig;
-
-        this.writeTimer = metricsFactory.getTimer( 
SQSAsyncReIndexService.class, "write.timer" );
-        this.writeMeter = metricsFactory.getMeter( 
SQSAsyncReIndexService.class, "write.meter" );
-
-        this.readTimer = metricsFactory.getTimer( 
SQSAsyncReIndexService.class, "read.timer" );
-        this.readMeter = metricsFactory.getMeter( 
SQSAsyncReIndexService.class, "read.meter" );
-
-        this.mapper = new ObjectMapper( SMILE_FACTORY );
-        //pretty print, disabling for speed
-        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
-
-    }
-
-
-    public void offer( final IndexEntityEvent operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-        this.writeMeter.mark();
-
-        final UUID identifier = UUIDGenerator.newTimeUUID();
-
-        try {
-
-            final String payLoad = toString( operation );
-
-            //signal to SQS
-            this.queue.sendMessage( identifier );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    public List<IndexEntityEvent> take( final int takeSize, final long 
timeout, final TimeUnit timeUnit ) {
-
-        //SQS doesn't support more than 10
-
-        final int actualTake = Math.min( 10, takeSize );
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-
-            List<QueueMessage> messages = queue
-                .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( 
int ) timeUnit.toMillis( timeout ),
-                    String.class );
-
-
-            final List<IndexEntityEvent> response = new ArrayList<>( 
messages.size() );
-
-            final List<String> mapEntries = new ArrayList<>( messages.size() );
-
-
-            if ( messages.size() == 0 ) {
-                return Collections.emptyList();
-            }
-
-            //add all our keys  for a single round trip
-            for ( final QueueMessage message : messages ) {
-                mapEntries.add( message.getBody().toString() );
-            }
-
-
-            //load them into our response
-            for ( final QueueMessage message : messages ) {
-
-                final String payload = getBody( message );
-
-                //now see if the key was there
-
-
-                final IndexEntityEvent messageBody;
-
-                try {
-                    messageBody = fromString( payload );
-                }
-                catch ( IOException e ) {
-                    logger.error( "Unable to deserialize message from string.  
This is a bug", e );
-                    throw new RuntimeException( "Unable to deserialize message 
from string.  This is a bug", e );
-                }
-
-                SqsIndexOperationMessage operation = new 
SqsIndexOperationMessage( message, messageBody );
-
-                response.add( operation );
-            }
-
-            readMeter.mark( response.size() );
-            return response;
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    public void ack( final List<IndexEntityEvent> messages ) {
-
-        //nothing to do
-        if ( messages.size() == 0 ) {
-            return;
-        }
-
-        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
-
-        for ( IndexEntityEvent ioe : messages ) {
-
-
-            final SqsIndexOperationMessage sqsIndexOperationMessage = ( 
SqsIndexOperationMessage ) ioe;
-
-            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
-        }
-
-        queue.commitMessages( toAck );
-    }
-
-
-    /** Read the object from Base64 string. */
-    private IndexEntityEvent fromString( String s ) throws IOException {
-        IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class );
-        return o;
-    }
-
-
-    /** Write the object to a Base64 string. */
-    private String toString( IndexEntityEvent o ) throws IOException {
-        return mapper.writeValueAsString( o );
-    }
-
-
-    private String getBody( final QueueMessage message ) {
-        return message.getBody().toString();
-    }
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-
-    }
-
-
-    /**
-     * The message that subclasses our IndexOperationMessage.  holds a pointer 
to the original message
-     */
-    public class SqsIndexOperationMessage extends IndexEntityEvent {
-
-        private final QueueMessage message;
-
-
-        public SqsIndexOperationMessage( final QueueMessage message, final 
IndexEntityEvent source ) {
-            super( source.getApplicationScope(), source.getEntityId(), 
source.getEntityVersion() );
-            this.message = message;
-        }
-
-
-        /**
-         * Get the message from our queue
-         */
-        public QueueMessage getMessage() {
-            return message;
-        }
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope,  final Entity entity) {
-        throw new UnsupportedOperationException( "Implement index rebuild" );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index c42ad10..364b071 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -99,8 +99,7 @@ public class CpNamingUtils {
      * Get the index scope for the edge from the source
      */
     public static IndexEdge generateScopeFromSource( final Edge edge ) {
-        return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), 
SearchEdge.NodeType.SOURCE,
-            edge.getTimestamp() );
+        return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), 
SearchEdge.NodeType.SOURCE, edge.getTimestamp() );
     }
 
 
@@ -108,8 +107,7 @@ public class CpNamingUtils {
      * Get the index scope for the edge from the source
      */
     public static IndexEdge generateScopeToTarget( final Edge edge ) {
-        return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), 
SearchEdge.NodeType.TARGET,
-            edge.getTimestamp() );
+        return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), 
SearchEdge.NodeType.TARGET, edge.getTimestamp() );
     }
 
 
@@ -123,6 +121,8 @@ public class CpNamingUtils {
 
     /**
      *
+     * TODO move sourceId to ApplicationScope
+     *
      * @param sourceId
      * @param collectionName
      * @param entityId

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
index 1ad4115..19ecf6d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 
 import 
org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -46,6 +47,7 @@ public class SerializableMapper {
 
     static{
         MAPPER.enableDefaultTypingAsProperty( 
ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+        SMILE_FACTORY.delegateToTextual( true );
     }
 
     /**
@@ -66,9 +68,9 @@ public class SerializableMapper {
 
     /**
      * Write the value as a string
+     * @param <T>
      * @param serialized
      * @param clazz
-     * @param <T>
      * @return
      */
     public static <T extends Serializable> T fromString(final String 
serialized, final Class<T> clazz){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
new file mode 100644
index 0000000..faef848
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
+public abstract class AsyncIndexServiceTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    public EntityCollectionManagerFactory entityCollectionManagerFactory;
+
+    @Inject
+    public GraphManagerFactory graphManagerFactory;
+
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
+
+    private AsyncIndexService asyncIndexService;
+
+
+    /**
+     * Get the async index service
+     */
+    protected abstract AsyncIndexService getAsyncIndexService();
+
+
+    @Before
+    public void setup() {
+        asyncIndexService = getAsyncIndexService();
+    }
+
+
+    @Test( timeout = 60000 )
+    public void testMessageIndexing() throws InterruptedException {
+
+
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), 
"application" ) );
+
+
+        final Entity testEntity = new Entity( createId( "thing" ), 
UUIDGenerator.newTimeUUID() );
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+
+        //write the entity before indexing
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        collectionManager.write( testEntity ).toBlocking().last();
+
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //create our collection edge
+        final Edge collectionEdge =
+            CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), "things", testEntity.getId() );
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+
+
+        /**
+         * Write 10k edges 10 at a time in parallel
+         */
+        final Edge connectionSearch = Observable.range( 0, 10000 ).flatMap( 
integer -> {
+            final Id connectingId = createId( "connecting" );
+            final Edge edge = CpNamingUtils.createConnectionEdge( 
connectingId, "likes", testEntity.getId() );
+
+            return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() 
);
+        }, 10 ).toBlocking().last();
+
+
+        asyncIndexService.queueEntityIndexUpdate( applicationScope, testEntity 
);
+
+
+        //        Thread.sleep( 1000000000000l );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
+
+
+        //query until it's available
+        final CandidateResults collectionResults = getResults( 
applicationEntityIndex, collectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
+
+        assertEquals( 1, collectionResults.size() );
+
+        assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
+
+
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+
+
+        //query until it's available
+        final CandidateResults connectionResults = getResults( 
applicationEntityIndex, connectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), "select *", 
100, 1, 100 );
+
+        assertEquals( 1, connectionResults.size() );
+
+        assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() );
+    }
+
+
+    private CandidateResults getResults( final ApplicationEntityIndex 
applicationEntityIndex,
+                                         final SearchEdge searchEdge, final 
SearchTypes searchTypes, final String ql,
+                                         final int count, final int 
expectedSize, final int attempts ) {
+
+
+        for ( int i = 0; i < attempts; i++ ) {
+            final CandidateResults candidateResults =
+                applicationEntityIndex.search( searchEdge, searchTypes, 
"select *", 100 );
+
+            if ( candidateResults.size() == expectedSize ) {
+                return candidateResults;
+            }
+
+            try {
+                Thread.sleep( 100 );
+            }
+            catch ( InterruptedException e ) {
+                //swallow
+            }
+        }
+
+        fail( "Could not find candidates of size " + expectedSize + "after " + 
attempts + " attempts" );
+
+        //we'll never reach this, required for compile
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f4d0d1a9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
new file mode 100644
index 0000000..43968de
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
+public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
+
+    @Rule
+    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
+
+    @Inject
+    public IndexService indexService;
+
+    @Inject
+    public RxTaskScheduler rxTaskScheduler;
+
+
+    @Override
+    protected AsyncIndexService getAsyncIndexService() {
+        return  new InMemoryAsyncIndexService( indexService, rxTaskScheduler, 
entityCollectionManagerFactory  );
+    }
+
+
+
+
+
+}

Reply via email to