Start of rewire of the rebuild process

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3480a363
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3480a363
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3480a363

Branch: refs/heads/USERGRID-593
Commit: 3480a3637cc3cc6c79504642c3c44fbcf5878efd
Parents: ee1a66f
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Apr 20 19:52:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Apr 20 19:52:51 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   7 +-
 .../corepersistence/CpEntityManager.java        |  85 +-----
 .../corepersistence/CpEntityManagerFactory.java | 119 +++-----
 .../corepersistence/CpRelationManager.java      |   9 +-
 .../index/AsyncIndexProvider.java               |  12 +-
 .../index/AsyncIndexService.java                |  49 ----
 .../index/AsyncReIndexService.java              |  42 +++
 .../index/InMemoryAsyncIndexService.java        |  92 -------
 .../index/InMemoryAsyncReIndexService.java      |  89 ++++++
 .../corepersistence/index/ReIndexAction.java    |  33 +++
 .../corepersistence/index/ReIndexService.java   |  39 ++-
 .../index/ReIndexServiceImpl.java               |  43 ++-
 .../index/SQSAsyncIndexService.java             | 270 -------------------
 .../index/SQSAsyncReIndexService.java           | 269 ++++++++++++++++++
 .../rx/impl/AllEntityIdsObservable.java         |   5 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   4 +-
 .../corepersistence/util/CpNamingUtils.java     |   6 +-
 .../usergrid/persistence/EntityManager.java     |   5 -
 .../persistence/EntityManagerFactory.java       |  11 +-
 .../index/SQSAsyncIndexServiceTest.java         |  18 +-
 .../PerformanceEntityRebuildIndexTest.java      |  18 +-
 .../cassandra/EntityManagerFactoryImplIT.java   |  15 +-
 .../graph/serialization/EdgesObservable.java    |   5 +-
 .../serialization/impl/EdgesObservableImpl.java |   6 +-
 24 files changed, 584 insertions(+), 667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index bb936b5..51972d8 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -22,7 +22,7 @@ import 
org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
 import org.apache.usergrid.corepersistence.index.AsyncIndexProvider;
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
 import org.apache.usergrid.corepersistence.index.QueryFig;
@@ -37,9 +37,6 @@ import 
org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
@@ -154,7 +151,7 @@ public class CoreModule  extends AbstractModule {
         bind( IndexService.class ).to( IndexServiceImpl.class );
         //bind the queue provider
 
-        bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class );
+        bind( AsyncReIndexService.class).toProvider( AsyncIndexProvider.class 
);
 
         install( new GuicyFigModule( QueryFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6c3989d..72ca955 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -37,8 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
-import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
@@ -49,7 +48,6 @@ import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.IndexBucketLocator;
 import org.apache.usergrid.persistence.Query;
@@ -68,7 +66,6 @@ import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.FieldSet;
-import 
org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
 import 
org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -81,7 +78,6 @@ import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -179,7 +175,7 @@ public class CpEntityManager implements EntityManager {
 
     private final CounterUtils counterUtils;
 
-    private final AsyncIndexService indexService;
+    private final AsyncReIndexService indexService;
 
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
@@ -219,7 +215,7 @@ public class CpEntityManager implements EntityManager {
      * @param metricsFactory
      * @param applicationId
      */
-    public CpEntityManager(final CassandraService cass, final CounterUtils 
counterUtils, final AsyncIndexService indexService, final ManagerCache 
managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
+    public CpEntityManager(final CassandraService cass, final CounterUtils 
counterUtils, final AsyncReIndexService indexService, final ManagerCache 
managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
 
         Preconditions.checkNotNull( cass, "cass must not be null" );
         Preconditions.checkNotNull( counterUtils, "counterUtils must not be 
null" );
@@ -804,8 +800,8 @@ public class CpEntityManager implements EntityManager {
 
         StringField uniqueLookupRepairField =  new StringField( propertyName, 
aliasType.toString());
 
-        Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( 
Inflector.getInstance().singularize( collectionType ),
-            Arrays.<Field>asList( uniqueLookupRepairField ) );
+        Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields(
+            Inflector.getInstance().singularize( collectionType ), 
Arrays.<Field>asList( uniqueLookupRepairField ) );
 
         if(fieldSetObservable == null){
             logger.debug( "Couldn't return the observable based on unique 
entities." );
@@ -1736,7 +1732,7 @@ public class CpEntityManager implements EntityManager {
         long timestamp = cass.createTimestamp();
         Mutator<ByteBuffer> batch = createMutator( 
cass.getApplicationKeyspace( applicationId ), be);
         CassandraPersistenceUtils.addDeleteToMutator( batch, 
ApplicationCF.ENTITY_DICTIONARIES,
-                getRolePermissionsKey( roleName ), permission, timestamp );
+            getRolePermissionsKey( roleName ), permission, timestamp );
         //Adding graphite metrics
         Timer.Context timeRevokeRolePermission = 
entRevokeRolePermissionsTimer.time();
         CassandraPersistenceUtils.batchExecute( batch, 
CassandraService.RETRY_COUNT );
@@ -1747,8 +1743,8 @@ public class CpEntityManager implements EntityManager {
     @Override
     public Set<String> getRolePermissions( String roleName ) throws Exception {
         roleName = roleName.toLowerCase();
-        return cass.getAllColumnNames( cass.getApplicationKeyspace( 
applicationId ),
-                ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( 
roleName ) );
+        return cass.getAllColumnNames( cass.getApplicationKeyspace( 
applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
+            getRolePermissionsKey( roleName ) );
     }
 
 //TODO: does this need graphite monitoring
@@ -1830,8 +1826,8 @@ public class CpEntityManager implements EntityManager {
     @Override
     public Set<String> getGroupRolePermissions( UUID groupId, String roleName 
) throws Exception {
         roleName = roleName.toLowerCase();
-        return cass.getAllColumnNames( cass.getApplicationKeyspace( 
applicationId ),
-                ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( 
groupId, roleName ) );
+        return cass.getAllColumnNames( cass.getApplicationKeyspace( 
applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
+            getRolePermissionsKey( groupId, roleName ) );
     }
 
 
@@ -1840,7 +1836,7 @@ public class CpEntityManager implements EntityManager {
         roleName = roleName.toLowerCase();
         removeFromDictionary( new SimpleEntityRef( Group.ENTITY_TYPE, groupId 
), DICTIONARY_ROLENAMES, roleName );
         cass.deleteRow( cass.getApplicationKeyspace( applicationId ), 
ApplicationCF.ENTITY_DICTIONARIES,
-                SimpleRoleRef.getIdForGroupIdAndRoleName( groupId, roleName ) 
);
+            SimpleRoleRef.getIdForGroupIdAndRoleName( groupId, roleName ) );
     }
 
 
@@ -1931,7 +1927,7 @@ public class CpEntityManager implements EntityManager {
     @Override
     public EntityRef getGroupRoleRef( UUID groupId, String roleName ) throws 
Exception {
         Results results = this.searchCollection( new SimpleEntityRef( 
Group.ENTITY_TYPE, groupId ),
-                Schema.defaultCollectionName( Role.ENTITY_TYPE ), 
Query.fromQL( "roleName = '" + roleName + "'" ) );
+            Schema.defaultCollectionName( Role.ENTITY_TYPE ), Query.fromQL( 
"roleName = '" + roleName + "'" ) );
         Iterator<Entity> iterator = results.iterator();
         EntityRef roleRef = null;
         while ( iterator.hasNext() ) {
@@ -2830,64 +2826,7 @@ public class CpEntityManager implements EntityManager {
 
 
 
-    /**
-     * Completely reindex the named collection in the application associated 
with this EntityManager.
-     */
-    @Override
-    public void reindexCollection(
-        final EntityManagerFactory.ProgressObserver po, String collectionName, 
boolean reverse) throws Exception {
-
-        CpWalker walker = new CpWalker( );
-
-        walker.walkCollections( this, getApplication(), collectionName, 
reverse, new CpVisitor() {
-
-                @Override
-                public void visitCollectionEntry( EntityManager em, String 
collName, Entity entity ) {
-
-                    try {
-                        em.update( entity );
-                        po.onProgress( entity );
-                    }
-                    catch ( WriteOptimisticVerifyException wo ) {
-                        // swallow this, it just means this was already 
updated, which accomplishes our task
-                        logger.warn( "Someone beat us to updating entity {} in 
collection {}.  Ignoring.",
-                            entity.getName(), collName );
-                    }
-                    catch ( Exception ex ) {
-                        logger.error( "Error repersisting entity", ex );
-                    }
-                }
-            } );
-    }
-
-
-    /**
-     * Completely reindex the application associated with this EntityManager.
-     */
-    public void reindex( final EntityManagerFactory.ProgressObserver po ) 
throws Exception {
-
-        CpWalker walker = new CpWalker( );
-
-        walker.walkCollections( this, getApplication(), null, false, new 
CpVisitor() {
-
-            @Override
-            public void visitCollectionEntry( EntityManager em, String 
collName, Entity entity ) {
 
-            try {
-                em.update( entity );
-                po.onProgress( entity );
-            }
-            catch ( WriteOptimisticVerifyException wo ) {
-                //swallow this, it just means this was already updated, which 
accomplishes our task.
-                logger.warn( "Someone beat us to updating entity {} in 
collection {}.  Ignoring.",
-                    entity.getName(), collName );
-            }
-            catch ( Exception ex ) {
-                logger.error( "Error repersisting entity", ex );
-            }
-            }
-        } );
-    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b6d20b7..46db3f8 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -15,24 +15,16 @@
  */
 package org.apache.usergrid.corepersistence;
 
-import com.google.common.base.*;
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.TypeLiteral;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeansException;
@@ -41,11 +33,19 @@ import org.springframework.context.ApplicationContextAware;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
-import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
-import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.AbstractEntity;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.Setup;
@@ -59,7 +59,6 @@ import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.entities.Application;
 import 
org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
 import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
-import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -67,17 +66,27 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.utils.UUIDUtils;
-import rx.Observable;
 
-import java.util.*;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+
+import rx.Observable;
 
 import static java.lang.String.CASE_INSENSITIVE_ORDER;
-import static org.apache.usergrid.persistence.Schema.*;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_APPLICATION_ID;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 
 
 /**
@@ -113,7 +122,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private Injector injector;
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
-    private final AsyncIndexService indexService;
+    private final AsyncReIndexService indexService;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, 
final CounterUtils counterUtils,
                                    final Injector injector) {
@@ -125,7 +134,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.entityIndexFactory = 
injector.getInstance(EntityIndexFactory.class);
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
-        this.indexService = injector.getInstance( AsyncIndexService.class );
+        this.indexService = injector.getInstance( AsyncReIndexService.class );
         this.applicationIdCache = 
injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
@@ -307,14 +316,9 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         
migrateAppInfo(applicationId,CpNamingUtils.DELETED_APPLICATION_INFO,CpNamingUtils.APPLICATION_INFO)
             .toBlocking().lastOrDefault(null);
 
-        this.rebuildApplicationIndexes(applicationId, new ProgressObserver() {
-            @Override
-            public void onProgress(EntityRef entity) {
-                logger.info("Restored entity {}:{}", entity.getType(), 
entity.getUuid());
-            }
-        });
+        throw new UnsupportedOperationException( "Implement index rebuild" );
 
-        return managementEm.get(new 
SimpleEntityRef(CpNamingUtils.APPLICATION_INFO,applicationId));
+//        return managementEm.get(new 
SimpleEntityRef(CpNamingUtils.APPLICATION_INFO,applicationId));
     }
 
     @Override
@@ -661,44 +665,6 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     }
 
 
-    public void rebuildAllIndexes( ProgressObserver po ) throws Exception {
-
-        logger.info("\n\nRebuilding all indexes\n");
-
-        rebuildInternalIndexes( po );
-
-        Map<String, UUID> appMap = getApplications();
-
-        logger.info("About to rebuild indexes for {} applications", 
appMap.keySet().size());
-
-        for ( UUID appUuid : appMap.values() ) {
-            try {
-                rebuildApplicationIndexes(appUuid, po);
-            } catch ( Exception e) {
-                logger.error("Error rebuilding index for app " + appUuid + " 
continuing...", e );
-            }
-        }
-    }
-
-
-    @Override
-    public void rebuildInternalIndexes( ProgressObserver po ) throws Exception 
{
-
-        // TODO: remove this after appinfo migration done
-        rebuildApplicationIndexes( CpNamingUtils.SYSTEM_APP_ID, po);
-        rebuildApplicationIndexes( CpNamingUtils.MANAGEMENT_APPLICATION_ID, po 
);
-    }
-
-
-    @Override
-    public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) 
throws Exception {
-
-        EntityManager em = getEntityManager( appId );
-        em.reindex( po );
-
-        logger.info("\n\nRebuilt index for applicationId {} \n", appId);
-    }
-
 
 
     @Override
@@ -716,18 +682,19 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     }
 
     @Override
-    public void rebuildCollectionIndex(
-        UUID appId, String collectionName, boolean reverse, ProgressObserver 
po ) throws Exception  {
-
-        EntityManager em = getEntityManager( appId );
-
-        //explicitly invoke create index, we don't know if it exists or not in 
ES during a rebuild.
-        Application app = em.getApplication();
-
-        em.reindexCollection(po, collectionName, reverse);
-
-        logger.info("\n\nRebuilt index for application {} id {} collection 
{}\n",
-            new Object[]{app.getName(), appId, collectionName});
+    public ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> 
appId, Optional<String> collection )   {
+
+        throw new UnsupportedOperationException( "Implement me" );
+//
+//        EntityManager em = getEntityManager( appId );
+//
+//        //explicitly invoke create index, we don't know if it exists or not 
in ES during a rebuild.
+//        Application app = em.getApplication();
+//
+//        em.reindexCollection(po, collectionName, reverse);
+//
+//        logger.info("\n\nRebuilt index for application {} id {} collection 
{}\n",
+//            new Object[]{app.getName(), appId, collectionName});
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index a3d8172..8f125ad 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -30,8 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.index.AsyncIndexService;
-import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.corepersistence.index.AsyncReIndexService;
 import 
org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
 import 
org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
@@ -43,7 +42,6 @@ import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.IndexBucketLocator;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.persistence.RelationManager;
@@ -80,7 +78,6 @@ import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 
 import rx.Observable;
-import rx.functions.Action1;
 import rx.functions.Func1;
 
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
@@ -121,13 +118,13 @@ public class CpRelationManager implements RelationManager 
{
 
     private final ApplicationScope applicationScope;
 
-    private final AsyncIndexService indexService;
+    private final AsyncReIndexService indexService;
 
     private MetricsFactory metricsFactory;
     private Timer updateCollectionTimer;
 
 
-    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache, final AsyncIndexService indexService, final 
EntityManager em, final UUID applicationId, final EntityRef headEntity) {
+    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache, final AsyncReIndexService indexService, final 
EntityManager em, final UUID applicationId, final EntityRef headEntity) {
 
 
         Assert.notNull( em, "Entity manager cannot be null" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
index d00ef8e..2c48c13 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
@@ -35,7 +35,7 @@ import com.google.inject.Singleton;
  * A provider to allow users to configure their queue impl via properties
  */
 @Singleton
-public class AsyncIndexProvider implements Provider<AsyncIndexService> {
+public class AsyncIndexProvider implements Provider<AsyncReIndexService> {
 
     private final QueryFig queryFig;
 
@@ -46,7 +46,7 @@ public class AsyncIndexProvider implements 
Provider<AsyncIndexService> {
     private final AllEntityIdsObservable allEntitiesObservable;
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
 
-    private AsyncIndexService asyncIndexService;
+    private AsyncReIndexService asyncIndexService;
 
 
     @Inject
@@ -67,7 +67,7 @@ public class AsyncIndexProvider implements 
Provider<AsyncIndexService> {
 
     @Override
     @Singleton
-    public AsyncIndexService get() {
+    public AsyncReIndexService get() {
         if ( asyncIndexService == null ) {
             asyncIndexService = getIndexService();
         }
@@ -77,17 +77,17 @@ public class AsyncIndexProvider implements 
Provider<AsyncIndexService> {
     }
 
 
-    private AsyncIndexService getIndexService() {
+    private AsyncReIndexService getIndexService() {
         final String value = queryFig.getQueueImplementation();
 
         final Implementations impl = Implementations.valueOf( value );
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncIndexService( indexService, 
rxTaskScheduler,
+                return new InMemoryAsyncReIndexService( indexService, 
rxTaskScheduler,
                     entityCollectionManagerFactory );
             case SQS:
-                return new SQSAsyncIndexService( queueManagerFactory, 
queryFig, metricsFactory );
+                return new SQSAsyncReIndexService( queueManagerFactory, 
queryFig, metricsFactory );
             default:
                 throw new IllegalArgumentException( "Configuration value of " 
+ getErrorValues() + " are allowed" );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
deleted file mode 100644
index 8b5ced1..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * Low level queue service for indexing entities
- */
-public interface AsyncIndexService extends ReIndexService.IndexAction {
-
-
-    /**
-     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
-     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
-     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
-     * @param applicationScope
-     * @param entity The entity to index
-     */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
new file mode 100644
index 0000000..c6eedd7
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Low level queue service for indexing entities
+ */
+public interface AsyncReIndexService extends ReIndexAction {
+
+
+    /**
+     * Queue an entity to be indexed.  This will start processing immediately. 
For implementations that are realtime (akka, in memory)
+     * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
+     * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
+     * @param applicationScope
+     * @param entity The entity to index
+     */
+    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
deleted file mode 100644
index 0efb964..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-
-
-@Singleton
-public class InMemoryAsyncIndexService implements AsyncIndexService {
-
-    private static final Logger log = 
LoggerFactory.getLogger(InMemoryAsyncIndexService.class);
-    private final IndexService indexService;
-    private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-
-
-    @Inject
-    public InMemoryAsyncIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
-                                      final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
-        this.indexService = indexService;
-        this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity toIndex ) {
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-        Observable.just( toIndex ).doOnNext( entity -> {
-            log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
-            indexService.indexEntity( applicationScope, entity );
-        } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
-    }
-
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-
-        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
-
-        final Id entityId = entityIdScope.getId();
-
-        final Entity
-            entity = entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load(
-            entityId ).toBlocking().lastOrDefault( null );
-
-
-        if(entity == null){
-            log.warn( "Could not find entity with id {} in app scope {} ", 
entityId, applicationScope );
-        }
-
-        indexService.indexEntity(applicationScope, entity  );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
new file mode 100644
index 0000000..5ebda87
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+@Singleton
+public class InMemoryAsyncReIndexService implements AsyncReIndexService {
+
+    private static final Logger log = 
LoggerFactory.getLogger(InMemoryAsyncReIndexService.class);
+    private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+
+    @Inject
+    public InMemoryAsyncReIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
+                                        final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+        this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity toIndex ) {
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+
+        Observable.just( toIndex ).doOnNext( entity -> {
+            log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
+            indexService.indexEntity( applicationScope, entity );
+        } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+    }
+
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+
+        final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+
+        final Id entityId = entityIdScope.getId();
+
+        final Entity
+            entity = entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load(
+            entityId ).toBlocking().lastOrDefault( null );
+
+
+        if(entity == null){
+            log.warn( "Could not find entity with id {} in app scope {} ", 
entityId, applicationScope );
+        }
+
+        indexService.indexEntity( applicationScope, entity );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
new file mode 100644
index 0000000..086b2aa
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+
+
+/**
+ * Callback to perform an index operation based on an scope during bulk 
re-index operations
+ */
+@FunctionalInterface
+public interface ReIndexAction {
+
+    void index( final EntityIdScope entityIdScope );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index dca6cac..91409fe 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,10 +20,12 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
@@ -31,6 +33,7 @@ import com.google.common.base.Optional;
 
 import rx.Observable;
 import rx.Observer;
+import rx.observables.ConnectableObservable;
 
 
 /**
@@ -40,14 +43,14 @@ public interface ReIndexService {
 
 
     /**
-     * Reindex all applications using the cursor provided
-     *
-     * @param startTimestamp The timestamp to start seeking from
-     *
-     * @return a cursor that can be used to resume the operation on the next 
run
+     * Perform an index rebuild
+     * @param appId
+     * @param collection
+     * @return
      */
-    IndexResponse reIndex( final rx.Observable<ApplicationScope> 
applicationScopes, final Optional<String> cursor,
-                    final Optional<Long> startTimestamp, final IndexAction 
indexAction );
+    IndexResponse rebuildIndex( final Optional<UUID> appId, final 
Optional<String> collection, final Optional<String> collectionName, final 
Optional<String> cursor,
+                        final Optional<Long> startTimestamp );
+
 
 
     /**
@@ -55,10 +58,10 @@ public interface ReIndexService {
      */
     class IndexResponse {
         final String cursor;
-        final Observable<Long> indexedEdgecount;
+        final ConnectableObservable<EdgeScope> indexedEdgecount;
 
 
-        public IndexResponse( final String cursor, final Observable<Long> 
indexedEdgecount ) {
+        public IndexResponse( final String cursor, final 
ConnectableObservable<EdgeScope> indexedEdgecount ) {
             this.cursor = cursor;
             this.indexedEdgecount = indexedEdgecount;
         }
@@ -74,23 +77,13 @@ public interface ReIndexService {
 
 
         /**
-         * Return the observable long count of all edges indexed
+         * Return the observable of all edges to be indexed.
+         *
+         * Note that after subscribing "connect" will need to be called to 
ensure that processing begins
          * @return
          */
-        public Observable<Long> getCount() {
+        public ConnectableObservable<EdgeScope> getCount() {
             return indexedEdgecount;
         }
     }
-
-
-
-
-    /**
-     * Callback to perform an index operation based on an scope during bulk 
re-index operations
-     */
-    @FunctionalInterface
-    interface IndexAction {
-
-        void index( final EntityIdScope entityIdScope );
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 5c022e1..3553c87 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,8 +20,10 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -37,11 +39,16 @@ import 
org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 import rx.Observable;
 import rx.observables.ConnectableObservable;
 
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
 
+
+@Singleton
 public class ReIndexServiceImpl implements ReIndexService {
 
     private static final MapScope RESUME_MAP_SCOPTE =
@@ -51,48 +58,52 @@ public class ReIndexServiceImpl implements ReIndexService {
     private static final int INDEX_TTL = 60 * 60 * 24 * 10;
 
 
+    private final AllApplicationsObservable allApplicationsObservable;
     private final AllEntityIdsObservable allEntityIdsObservable;
     private final QueryFig queryFig;
     private final RxTaskScheduler rxTaskScheduler;
     private final MapManager mapManager;
+    private final AsyncReIndexService indexService;
 
 
+    @Inject
     public ReIndexServiceImpl( final AllEntityIdsObservable 
allEntityIdsObservable,
-                               final MapManagerFactory mapManagerFactory, 
final QueryFig queryFig,
-                               final RxTaskScheduler rxTaskScheduler ) {
+                               final MapManagerFactory mapManagerFactory,
+                               final AllApplicationsObservable 
allApplicationsObservable, final QueryFig queryFig,
+                               final RxTaskScheduler rxTaskScheduler, final 
AsyncReIndexService indexService ) {
         this.allEntityIdsObservable = allEntityIdsObservable;
+        this.allApplicationsObservable = allApplicationsObservable;
         this.queryFig = queryFig;
         this.rxTaskScheduler = rxTaskScheduler;
+        this.indexService = indexService;
 
         this.mapManager = mapManagerFactory.createMapManager( 
RESUME_MAP_SCOPTE );
     }
 
 
-    @Override
-    public IndexResponse reIndex( final Observable<ApplicationScope> 
applicationScopes, final Optional<String> cursor,
-                                  final Optional<Long> startTimestamp, final 
IndexAction indexAction ) {
 
+    @Override
+    public IndexResponse rebuildIndex( final Optional<UUID> appId, final 
Optional<String> collection,
+                                       final Optional<String> collectionName, 
final Optional<String> cursor,
+                                       final Optional<Long> startTimestamp ) {
 
         //load our last emitted Scope if a cursor is present
         if ( cursor.isPresent() ) {
             throw new UnsupportedOperationException( "Build this" );
         }
 
+
+        final Observable<ApplicationScope>  applicationScopes = 
appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : 
allApplicationsObservable.getData();
+
         final String newCursor = StringUtils.sanitizeUUID( 
UUIDGenerator.newTimeUUID() );
 
         //create an observable that loads each entity and indexes it, start it 
running with publish
         final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes, 
startTimestamp )
+            allEntityIdsObservable.getEdgesToEntities( applicationScopes, 
collectionName, startTimestamp )
 
                 //for each edge, create our scope and index on it
-                .doOnNext( edge -> indexAction
-                    .index( new EntityIdScope( edge.getApplicationScope(), 
edge.getEdge().getTargetNode() ) ) )
+                .doOnNext( edge -> indexService.index( new EntityIdScope( 
edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
 
-                .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
).publish();
-
-
-        //count our longs
-        final Observable<Long> indexedCount = runningReIndex.countLong();
 
 
         //start our sampler and state persistence
@@ -107,7 +118,11 @@ public class ReIndexServiceImpl implements ReIndexService {
             } ).subscribe();
 
 
-        return new IndexResponse( newCursor, indexedCount );
+        //start pushing to both
+        runningReIndex.connect();
+
+
+        return new IndexResponse( newCursor, runningReIndex );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
deleted file mode 100644
index 6d06637..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class SQSAsyncIndexService implements AsyncIndexService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger( 
SQSAsyncIndexService.class );
-
-    /** Hacky, copied from CPEntityManager b/c we can't access it here */
-    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( 
"b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
-
-
-    /**
-     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
-     */
-    public static final int TTL = 60 * 60 * 24 * 30;
-
-    /**
-     * The name to put in the map
-     */
-    public static final String MAP_NAME = "esqueuedata";
-
-
-    private static final String QUEUE_NAME = "es_queue";
-
-    private static SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    static {
-        SMILE_FACTORY.delegateToTextual( true );
-    }
-
-
-    private final QueueManager queue;
-    private final QueryFig queryFig;
-    private final ObjectMapper mapper;
-    private final Meter readMeter;
-    private final Timer readTimer;
-    private final Meter writeMeter;
-    private final Timer writeTimer;
-
-
-    @Inject
-    public SQSAsyncIndexService( final QueueManagerFactory 
queueManagerFactory, final QueryFig queryFig,
-                                 final MetricsFactory metricsFactory ) {
-        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
-
-        this.queue = queueManagerFactory.getQueueManager( queueScope );
-        this.queryFig = queryFig;
-
-        this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"write.timer" );
-        this.writeMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, 
"write.meter" );
-
-        this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, 
"read.timer" );
-        this.readMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, 
"read.meter" );
-
-        this.mapper = new ObjectMapper( SMILE_FACTORY );
-        //pretty print, disabling for speed
-        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
-
-    }
-
-
-    public void offer( final IndexEntityEvent operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-        this.writeMeter.mark();
-
-        final UUID identifier = UUIDGenerator.newTimeUUID();
-
-        try {
-
-            final String payLoad = toString( operation );
-
-            //signal to SQS
-            this.queue.sendMessage( identifier );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    public List<IndexEntityEvent> take( final int takeSize, final long 
timeout, final TimeUnit timeUnit ) {
-
-        //SQS doesn't support more than 10
-
-        final int actualTake = Math.min( 10, takeSize );
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-
-            List<QueueMessage> messages = queue
-                .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( 
int ) timeUnit.toMillis( timeout ),
-                    String.class );
-
-
-            final List<IndexEntityEvent> response = new ArrayList<>( 
messages.size() );
-
-            final List<String> mapEntries = new ArrayList<>( messages.size() );
-
-
-            if ( messages.size() == 0 ) {
-                return Collections.emptyList();
-            }
-
-            //add all our keys  for a single round trip
-            for ( final QueueMessage message : messages ) {
-                mapEntries.add( message.getBody().toString() );
-            }
-
-
-            //load them into our response
-            for ( final QueueMessage message : messages ) {
-
-                final String payload = getBody( message );
-
-                //now see if the key was there
-
-
-                final IndexEntityEvent messageBody;
-
-                try {
-                    messageBody = fromString( payload );
-                }
-                catch ( IOException e ) {
-                    logger.error( "Unable to deserialize message from string.  
This is a bug", e );
-                    throw new RuntimeException( "Unable to deserialize message 
from string.  This is a bug", e );
-                }
-
-                SqsIndexOperationMessage operation = new 
SqsIndexOperationMessage( message, messageBody );
-
-                response.add( operation );
-            }
-
-            readMeter.mark( response.size() );
-            return response;
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    public void ack( final List<IndexEntityEvent> messages ) {
-
-        //nothing to do
-        if ( messages.size() == 0 ) {
-            return;
-        }
-
-        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
-
-        for ( IndexEntityEvent ioe : messages ) {
-
-
-            final SqsIndexOperationMessage sqsIndexOperationMessage = ( 
SqsIndexOperationMessage ) ioe;
-
-            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
-        }
-
-        queue.commitMessages( toAck );
-    }
-
-
-    /** Read the object from Base64 string. */
-    private IndexEntityEvent fromString( String s ) throws IOException {
-        IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class );
-        return o;
-    }
-
-
-    /** Write the object to a Base64 string. */
-    private String toString( IndexEntityEvent o ) throws IOException {
-        return mapper.writeValueAsString( o );
-    }
-
-
-    private String getBody( final QueueMessage message ) {
-        return message.getBody().toString();
-    }
-
-
-    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-
-    }
-
-
-    /**
-     * The message that subclasses our IndexOperationMessage.  holds a pointer 
to the original message
-     */
-    public class SqsIndexOperationMessage extends IndexEntityEvent {
-
-        private final QueueMessage message;
-
-
-        public SqsIndexOperationMessage( final QueueMessage message, final 
IndexEntityEvent source ) {
-            super( source.getApplicationScope(), source.getEntityId(), 
source.getEntityVersion() );
-            this.message = message;
-        }
-
-
-        /**
-         * Get the message from our queue
-         */
-        public QueueMessage getMessage() {
-            return message;
-        }
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope,  final Entity entity) {
-           throw new UnsupportedOperationException( "Implement me" );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
new file mode 100644
index 0000000..60a804c
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class SQSAsyncReIndexService implements AsyncReIndexService {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( 
SQSAsyncReIndexService.class );
+
+    /** Hacky, copied from CPEntityManager b/c we can't access it here */
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( 
"b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we 
want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
+    private final QueueManager queue;
+    private final QueryFig queryFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
+
+
+    @Inject
+    public SQSAsyncReIndexService( final QueueManagerFactory 
queueManagerFactory, final QueryFig queryFig,
+                                   final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
+
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
+        this.queryFig = queryFig;
+
+        this.writeTimer = metricsFactory.getTimer( 
SQSAsyncReIndexService.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( 
SQSAsyncReIndexService.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( 
SQSAsyncReIndexService.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( 
SQSAsyncReIndexService.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+    }
+
+
+    public void offer( final IndexEntityEvent operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
+
+        try {
+
+            final String payLoad = toString( operation );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    public List<IndexEntityEvent> take( final int takeSize, final long 
timeout, final TimeUnit timeUnit ) {
+
+        //SQS doesn't support more than 10
+
+        final int actualTake = Math.min( 10, takeSize );
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( 
int ) timeUnit.toMillis( timeout ),
+                    String.class );
+
+
+            final List<IndexEntityEvent> response = new ArrayList<>( 
messages.size() );
+
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
+
+
+            if ( messages.size() == 0 ) {
+                return Collections.emptyList();
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String payload = getBody( message );
+
+                //now see if the key was there
+
+
+                final IndexEntityEvent messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  
This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message 
from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new 
SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    public void ack( final List<IndexEntityEvent> messages ) {
+
+        //nothing to do
+        if ( messages.size() == 0 ) {
+            return;
+        }
+
+        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+        for ( IndexEntityEvent ioe : messages ) {
+
+
+            final SqsIndexOperationMessage sqsIndexOperationMessage = ( 
SqsIndexOperationMessage ) ioe;
+
+            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
+        }
+
+        queue.commitMessages( toAck );
+    }
+
+
+    /** Read the object from Base64 string. */
+    private IndexEntityEvent fromString( String s ) throws IOException {
+        IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexEntityEvent o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+
+    private String getBody( final QueueMessage message ) {
+        return message.getBody().toString();
+    }
+
+
+    @Override
+    public void index( final EntityIdScope entityIdScope ) {
+
+    }
+
+
+    /**
+     * The message that subclasses our IndexOperationMessage.  holds a pointer 
to the original message
+     */
+    public class SqsIndexOperationMessage extends IndexEntityEvent {
+
+        private final QueueMessage message;
+
+
+        public SqsIndexOperationMessage( final QueueMessage message, final 
IndexEntityEvent source ) {
+            super( source.getApplicationScope(), source.getEntityId(), 
source.getEntityVersion() );
+            this.message = message;
+        }
+
+
+        /**
+         * Get the message from our queue
+         */
+        public QueueMessage getMessage() {
+            return message;
+        }
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope,  final Entity entity) {
+        throw new UnsupportedOperationException( "Implement index rebuild" );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index c805a59..b9e5373 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -43,9 +43,10 @@ public interface AllEntityIdsObservable {
     /**
      * Get all edges that represent edges to entities in the system
      * @param appScopes
-     * @param startTime The time to
+     * @param edgeType The edge type to use (if specified)
+     * @param startTime The time to start with
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final 
Observable<ApplicationScope> appScopes, final Optional<Long> startTime);
+    Observable<EdgeScope> getEdgesToEntities(final 
Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final 
Optional<Long> startTime);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index f9df1f5..257fab1 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -81,12 +81,12 @@ public class AllEntityIdsObservableImpl implements 
AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final 
Observable<ApplicationScope> appScopes, final Optional<Long> startTime) {
+    public Observable<EdgeScope> getEdgesToEntities( final 
Observable<ApplicationScope> appScopes, final Optional<String> edgeType,  final 
Optional<Long> startTime) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
 
-            return edgesObservable.edgesFromSourceAscending( gm, 
applicationScope.getApplication(), startTime ).map( edge -> new 
EdgeScope(applicationScope, edge ));
+            return edgesObservable.edgesFromSourceAscending( gm, 
applicationScope.getApplication(),edgeType,  startTime ).map( edge -> new 
EdgeScope(applicationScope, edge ));
         } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 67cc0ca..c42ad10 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -205,9 +205,7 @@ public class CpNamingUtils {
     public static ApplicationScope getApplicationScope( UUID applicationId ) {
 
         // We can always generate a scope, it doesn't matter if  the 
application exists yet or not.
-        final ApplicationScopeImpl scope = new ApplicationScopeImpl( 
generateApplicationId( applicationId ) );
-
-        return scope;
+        return  new ApplicationScopeImpl( generateApplicationId( applicationId 
) );
     }
 
 
@@ -229,6 +227,8 @@ public class CpNamingUtils {
         return  generateApplicationId( MANAGEMENT_APPLICATION_ID );
     }
 
+
+
     /**
      * Get the map scope for the applicationId to store entity uuid to type 
mapping
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index fc8b3d5..23b6d6b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -692,11 +692,6 @@ public interface EntityManager {
     /** For testing purposes */
     public void flushManagerCaches();
 
-    void reindexCollection(
-        EntityManagerFactory.ProgressObserver po, String collectionName, 
boolean reverse) throws Exception;
-
-    public void reindex( final EntityManagerFactory.ProgressObserver po ) 
throws Exception;
-
 
     public Entity getUniqueEntityFromAlias( String aliasType, String 
aliasValue );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index e70cd0d..b3f4b62 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -21,6 +21,8 @@ import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import rx.Observable;
@@ -163,12 +165,6 @@ public interface EntityManagerFactory {
 
     public IndexRefreshCommand.IndexRefreshCommandInfo refreshIndex();
 
-    public void rebuildAllIndexes( ProgressObserver po ) throws Exception;
-
-    public void rebuildInternalIndexes( ProgressObserver po ) throws Exception;
-
-    public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) 
throws Exception;
-
 
     /**
      * Perform a realtime count of every entity in the system.  This can be 
slow as it traverses the entire system graph
@@ -178,8 +174,7 @@ public interface EntityManagerFactory {
     /** For testing purposes */
     public void flushEntityManagerCaches();
 
-    void rebuildCollectionIndex(
-        UUID appId, String collection, boolean reverse, ProgressObserver po) 
throws Exception;
+    ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, 
Optional<String> collection );
 
     /**
      * Add a new index to the application for scale

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
index 0977caa..5e9a5a1 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java
@@ -20,19 +20,9 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.usergrid.corepersistence.TestIndexModule;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.index.impl.IndexRequest;
-import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,10 +31,8 @@ import org.junit.runner.RunWith;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import 
org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
 
 import com.google.inject.Inject;
 
@@ -80,11 +68,11 @@ public class SQSAsyncIndexServiceTest {
     public MetricsFactory metricsFactory;
 
 
-    private SQSAsyncIndexService bufferQueueSQS;
+    private SQSAsyncReIndexService bufferQueueSQS;
 
     @Before
     public void setup(){
-        bufferQueueSQS = new SQSAsyncIndexService( queueManagerFactory, 
queryFig, metricsFactory );
+        bufferQueueSQS = new SQSAsyncReIndexService( queueManagerFactory, 
queryFig, metricsFactory );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index b0fc245..a293a0c 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -213,11 +213,12 @@ public class PerformanceEntityRebuildIndexTest extends 
AbstractCoreIT {
 
         try {
 
-            // do it forwards
-            setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), 
"catherders", false, po );
-
-            // and backwards, just to make sure both cases are covered
-            setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), 
"catherders", true, po );
+           fail( "Implement index rebuild" );
+//            // do it forwards
+//            setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), 
"catherders", false, po );
+//
+//            // and backwards, just to make sure both cases are covered
+//            setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), 
"catherders", true, po );
 
             reporter.report();
             registry.remove( meterName );
@@ -354,9 +355,10 @@ public class PerformanceEntityRebuildIndexTest extends 
AbstractCoreIT {
 
         try {
 
-            setup.getEmf().rebuildInternalIndexes( po );
-
-            setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), 
po );
+            fail( "Implement index rebuild" );
+//            setup.getEmf().rebuildInternalIndexes( po );
+//
+//            setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), 
po );
 
             reporter.report();
             registry.remove( meterName );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
 
b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index 1a0a13b..2452606 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -52,6 +52,8 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 @NotThreadSafe
 public class EntityManagerFactoryImplIT extends AbstractCoreIT {
@@ -175,12 +177,13 @@ public class EntityManagerFactoryImplIT extends 
AbstractCoreIT {
 
         // restore the app
         emf.restoreApplication(deletedAppId);
-        emf.rebuildAllIndexes(new EntityManagerFactory.ProgressObserver() {
-            @Override
-            public void onProgress(EntityRef entity) {
-                logger.debug("Reindexing {}:{}", entity.getType(), 
entity.getUuid());
-            }
-        });
+        fail( "Implement index rebuild" );
+//        emf.rebuildAllIndexes(new EntityManagerFactory.ProgressObserver() {
+//            @Override
+//            public void onProgress(EntityRef entity) {
+//                logger.debug("Reindexing {}:{}", entity.getType(), 
entity.getUuid());
+//            }
+//        });
         this.app.refreshIndex();
 
         // test to see that app now works and is happy

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 9e7b8e6..9f0bd60 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -45,10 +45,11 @@ public interface EdgesObservable {
      * Return an observable of all edges from a source node.  Ordered 
ascending, from the startTimestamp if specified
      * @param gm
      * @param sourceNode
-     * @param startTimestamp
+     * @param edgeType The edge type if specified.  Otherwise all types will 
be used
+     * @param startTimestamp The start timestamp if specfiied, otherwise 
Long.MIN will be used
      * @return
      */
-    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id 
sourceNode,
+    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id 
sourceNode,final Optional<String> edgeType,
                                                final Optional<Long> 
startTimestamp );
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 859ca2e..ca9fb03 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -71,10 +71,12 @@ public class EdgesObservableImpl implements EdgesObservable 
{
 
 
     @Override
-    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, 
final Id sourceNode,
+    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, 
final Id sourceNode, final Optional<String> edgeTypeInput,
                                                       final Optional<Long> 
startTimestamp ) {
 
-        final Observable<String> edgeTypes =
+
+
+        final Observable<String> edgeTypes = edgeTypeInput.isPresent()? 
Observable.just( edgeTypeInput.get() ):
                   gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( 
sourceNode, null, null ) );
 
 

Reply via email to