Added factory for RX I/O Scheduler.

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9a1b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9a1b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9a1b02

Branch: refs/heads/two-dot-o-dev
Commit: 9b9a1b02f877ee6f35d7c3a7ba74001a17c237f1
Parents: 48d8060
Author: Todd Nine <[email protected]>
Authored: Mon Apr 20 08:01:12 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Mon Apr 20 10:36:06 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   5 +-
 .../corepersistence/CpEntityManager.java        |   9 +-
 .../corepersistence/CpRelationManager.java      |  57 +-------
 .../index/AsyncIndexProvider.java               |  15 +--
 .../index/AsyncIndexService.java                |   5 +-
 .../index/InMemoryAsyncIndexService.java        |  34 ++---
 .../corepersistence/index/IndexServiceImpl.java |   9 +-
 .../index/SQSAsyncIndexService.java             |   4 +-
 .../EntityCollectionManagerFactoryImpl.java     |  15 ++-
 .../impl/EntityCollectionManagerImpl.java       |  38 +++---
 .../mvcc/stage/write/RollbackAction.java        |   2 -
 .../persistence/core/guice/CommonModule.java    |  12 ++
 .../persistence/core/rx/RxSchedulerFig.java     |  60 +++++++++
 .../persistence/core/rx/RxTaskScheduler.java    |  40 ++++++
 .../core/rx/RxTaskSchedulerImpl.java            | 129 +++++++++++++++++++
 .../index/impl/IndexRefreshCommandImpl.java     |  14 +-
 .../persistence/index/impl/IndexingUtils.java   |   6 +-
 .../persistence/index/usergrid-mappings.json    |   4 +-
 18 files changed, 314 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 4758456..6ebff53 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,9 @@ import 
org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugi
 import 
org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
@@ -144,7 +147,7 @@ public class CoreModule  extends AbstractModule {
          *****/
 
 
-        bind(IndexService.class).to(IndexServiceImpl.class);
+        bind(IndexService.class).to( IndexServiceImpl.class );
         //bind the queue provider
 
         bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 844892a..6c3989d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -620,9 +620,8 @@ public class CpEntityManager implements EntityManager {
         }
 
         // update in all containing collections and connection indexes
-        CpRelationManager rm = ( CpRelationManager ) getRelationManager( 
entity );
-        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
-        timer.stop();
+
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
     }
 
 
@@ -1067,9 +1066,7 @@ public class CpEntityManager implements EntityManager {
 
         //Adding graphite metrics
 
-        // update in all containing collections and connection indexes
-        CpRelationManager rm = ( CpRelationManager ) getRelationManager( 
entityRef );
-        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index db9816e..be8605e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -255,47 +255,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    public void updateContainingCollectionAndCollectionIndexes(
-        final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
-
-
-        throw new UnsupportedOperationException( "Use the new interface" );
-
-//        final GraphManager gm = managerCache.getGraphManager( 
applicationScope );
-//
-//        // loop through all types of edge to target
-//
-//
-//        final ApplicationEntityIndex ei = managerCache.getEntityIndex( 
applicationScope );
-//
-//        final EntityIndexBatch entityIndexBatch = ei.createBatch();
-//
-//        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( 
cpHeadEntity.getId(), null, null ) )
-//
-//            // for each edge type, emit all the edges of that type
-//            .flatMap( etype -> gm.loadEdgesToTarget(
-//                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, 
Long.MAX_VALUE,
-//                    SearchByEdgeType.Order.DESCENDING, null ) ) )
-//
-//                //for each edge we receive index and add to the batch
-//            .doOnNext( edge -> {
-//                // reindex the entity in the source entity's collection or 
connection index
-//
-//                IndexEdge indexScope = generateScopeFromSource( edge );
-//
-//                entityIndexBatch.index( indexScope, cpEntity );
-//
-//            } ).doOnCompleted( () -> {
-//                    Timer.Context timeElasticIndexBatch = 
updateCollectionTimer.time();
-//                    entityIndexBatch.execute();
-//                    timeElasticIndexBatch.stop();
-//              } ).count().toBlocking().lastOrDefault( 0 );
-//
-//        //Adding graphite metrics
-//
-//
-//        logger.debug( "updateContainingCollectionsAndCollections() updated 
{} indexes", count );
-    }
 
 
     @Override
@@ -473,8 +432,6 @@ public class CpRelationManager implements RelationManager {
         gm.writeEdge( edge ).toBlocking().last();
 
 
-        //This is broken and needs fixed 
updateContainingCollectionAndCollectionIndexes See USERGRID-541
-
 
         //perform indexing
 
@@ -482,7 +439,7 @@ public class CpRelationManager implements RelationManager {
             logger.debug( "Wrote edge {}", edge );
         }
 
-        indexService.queueEntityIndexUpdate( applicationScope, 
memberEntity.getId(), memberEntity.getVersion() );
+        indexService.queueEntityIndexUpdate( applicationScope, memberEntity);
 
 
         if ( logger.isDebugEnabled() ) {
@@ -490,17 +447,7 @@ public class CpRelationManager implements RelationManager {
                 itemRef.getUuid().toString(), itemRef.getType(), collName
             } );
         }
-        //        logger.debug("With head entity scope is {}:{}:{}", new 
Object[] {
-        //            headEntityScope.getApplication().toString(),
-        //            headEntityScope.getOwner().toString(),
-        //            headEntityScope.getName()});
-
-        if ( connectBack && collection != null && 
collection.getLinkedCollection() != null ) {
-            throw new UnsupportedOperationException( "Implement me directly in 
graph " );
-//            getRelationManager( itemEntity )
-//                .addToCollection( collection.getLinkedCollection(), 
headEntity, cpHeadEntity, false );
-//            getRelationManager( itemEntity ).addToCollection( 
collection.getLinkedCollection(), headEntity, false );
-        }
+
 
         return itemEntity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
index 8257c94..77b4990 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
@@ -37,24 +37,23 @@ public class AsyncIndexProvider implements 
Provider<AsyncIndexService> {
 
     private final QueryFig queryFig;
 
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
     private final QueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
     private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private AsyncIndexService asyncIndexService;
 
 
     @Inject
-    public AsyncIndexProvider( final QueryFig queryFig,
-                               final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                               final QueueManagerFactory queueManagerFactory, 
final MetricsFactory metricsFactory,
-                               final IndexService indexService ) {
+    public AsyncIndexProvider( final QueryFig queryFig, final 
QueueManagerFactory queueManagerFactory, final
+    MetricsFactory metricsFactory,
+                               final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler ) {
         this.queryFig = queryFig;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -77,7 +76,7 @@ public class AsyncIndexProvider implements 
Provider<AsyncIndexService> {
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncIndexService( indexService, 
entityCollectionManagerFactory );
+                return new InMemoryAsyncIndexService( indexService, 
rxTaskScheduler );
             case SQS:
                 return new SQSAsyncIndexService( queueManagerFactory, 
queryFig, metricsFactory );
             default:

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
index d1f2fb6..06310ae 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
@@ -40,8 +40,7 @@ public interface AsyncIndexService {
      * We will return a distributed future.  For SQS impls, this will return 
immediately, and the result will not be available.
      * After SQS is removed, the tests should be enhanced to ensure that we're 
processing our queues correctly.
      * @param applicationScope
-     * @param entityId
-     * @param version
+     * @param entity The entity to index
      */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Id entityId, final UUID version );
+    void queueEntityIndexUpdate( final ApplicationScope applicationScope, 
final Entity entity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
index fdc9c65..3e2a271 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
@@ -20,56 +20,40 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
 @Singleton
 public class InMemoryAsyncIndexService implements AsyncIndexService {
 
     private final IndexService indexService;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     @Inject
-    public InMemoryAsyncIndexService( final IndexService indexService,
-                                      final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {this.indexService = indexService;
+    public InMemoryAsyncIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler ) {
+        this.indexService = indexService;
 
 
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
     @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Id entityId,
-                                        final UUID version ) {
-
-        final IndexEntityEvent event = new IndexEntityEvent( applicationScope, 
entityId, version );
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity toIndex ) {
 
         //process the entity immediately
         //only process the same version, otherwise ignore
 
-        getEntity( applicationScope, entityId).filter( entity-> 
version.equals(entity.hasVersion() )).doOnNext( entity -> {
-           indexService.indexEntity( applicationScope, entity );
-        } ).subscribeOn( Schedulers.io() ).subscribe();
-
-
-    }
-
-    private Observable<Entity> getEntity( final ApplicationScope 
applicationScope, final Id entityId){
-
-        final EntityCollectionManager ecm = 
entityCollectionManagerFactory.createCollectionManager( applicationScope );
-        return ecm.load( entityId );
+        Observable.just( toIndex ).doOnNext( entity -> {
+            indexService.indexEntity( applicationScope, entity );
+        } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 2a7533a..873e2b6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -83,8 +83,8 @@ public class IndexServiceImpl implements IndexService {
         //we always index in the target scope
         final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( 
gm, entityId );
 
-        //we may have to index
-        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( 
edge -> generateScopeToTarget( edge ) );
+        //we may have to index  we're indexing from source->target here
+        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( 
edge -> generateScopeFromSource( edge ) );
 
 
         //we might or might not need to index from target-> source
@@ -141,8 +141,9 @@ public class IndexServiceImpl implements IndexService {
 
         /**
          * An observable of sizes as we execute batches
+         *
+         * we're indexing from target->source here
          */
-        return edgesObservable.getEdgesFromSource( graphManager, entityId, 
linkedCollection )
-                              .map( edge -> generateScopeFromSource( edge ) );
+        return edgesObservable.getEdgesFromSource( graphManager, entityId, 
linkedCollection ).map( edge -> generateScopeToTarget( edge ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
index cad20bd..dfcb97a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -256,8 +257,7 @@ public class SQSAsyncIndexService implements 
AsyncIndexService {
 
 
     @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Id entityId,
-                                        final UUID version ) {
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope,  final Entity entity) {
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 9191c06..761c4b5 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.usergrid.persistence.collection.serialization.MvccEntitySerial
 import 
org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import 
org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
@@ -74,6 +75,7 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
     private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private LoadingCache<ApplicationScope, EntityCollectionManager> ecmCache =
         CacheBuilder.newBuilder().maximumSize( 1000 )
@@ -84,7 +86,8 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
                                 writeStart, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, 
markStart, markCommit,
                                 entitySerializationStrategy, 
uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, 
keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
+                                mvccLogEntrySerializationStrategy, 
keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory,
+                                rxTaskScheduler );
 
 
                             final EntityCollectionManager proxy = new 
CachedEntityCollectionManager(entityCacheFig, target  );
@@ -95,8 +98,7 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
 
 
     @Inject
-    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart,
-                                               final WriteUniqueVerify 
writeVerifyUnique,
+    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, 
final WriteUniqueVerify writeVerifyUnique,
                                                final WriteOptimisticVerify 
writeOptimisticVerify,
                                                final WriteCommit writeCommit, 
final RollbackAction rollback,
                                                final MarkStart markStart, 
final MarkCommit markCommit,
@@ -105,9 +107,9 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
                                                final 
MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace,
                                                final EntityVersionTaskFactory 
entityVersionTaskFactory,
-                                               @CollectionTaskExecutor final 
TaskExecutor taskExecutor,
-                                              final EntityCacheFig 
entityCacheFig,
-                                               MetricsFactory metricsFactory) {
+                                               @CollectionTaskExecutor final 
TaskExecutor taskExecutor, final
+                                                   EntityCacheFig 
entityCacheFig,
+                                               MetricsFactory metricsFactory, 
final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -124,6 +126,7 @@ public class EntityCollectionManagerFactoryImpl implements 
EntityCollectionManag
         this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
     @Override
     public EntityCollectionManager createCollectionManager(ApplicationScope 
applicationScope) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index f0b070c..6f10e86 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -49,6 +49,7 @@ import 
org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import 
org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -109,6 +110,8 @@ public class EntityCollectionManagerImpl implements 
EntityCollectionManager {
     private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
 
+    private final RxTaskScheduler rxTaskScheduler;
+
     private final Keyspace keyspace;
     private final Timer writeTimer;
     private final Meter writeMeter;
@@ -125,26 +128,21 @@ public class EntityCollectionManagerImpl implements 
EntityCollectionManager {
 
 
     @Inject
-    public EntityCollectionManagerImpl(
-        final WriteStart                    writeStart,
-        final WriteUniqueVerify                    writeVerifyUnique,
-        final WriteOptimisticVerify                writeOptimisticVerify,
-        final WriteCommit                          writeCommit,
-        final RollbackAction                       rollback,
-        final MarkStart                            markStart,
-        final MarkCommit                           markCommit,
-        final MvccEntitySerializationStrategy entitySerializationStrategy,
-        final UniqueValueSerializationStrategy     
uniqueValueSerializationStrategy,
-        final MvccLogEntrySerializationStrategy    
mvccLogEntrySerializationStrategy,
-        final Keyspace                             keyspace,
-        final EntityVersionTaskFactory entityVersionTaskFactory,
-        @CollectionTaskExecutor final TaskExecutor taskExecutor,
-        @Assisted final ApplicationScope applicationScope,
-        final MetricsFactory metricsFactory
-
-    ) {
+    public EntityCollectionManagerImpl( final WriteStart writeStart, final 
WriteUniqueVerify writeVerifyUnique,
+                                        final WriteOptimisticVerify 
writeOptimisticVerify, final WriteCommit
+                                                writeCommit,
+                                        final RollbackAction rollback, final 
MarkStart markStart,
+                                        final MarkCommit markCommit, final 
MvccEntitySerializationStrategy entitySerializationStrategy,
+                                        final UniqueValueSerializationStrategy 
uniqueValueSerializationStrategy,
+                                        final 
MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+                                        final Keyspace keyspace, final 
EntityVersionTaskFactory entityVersionTaskFactory,
+                                        @CollectionTaskExecutor final 
TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope,
+                                        final MetricsFactory metricsFactory,
+
+                                        final RxTaskScheduler rxTaskScheduler 
) {
         this.uniqueValueSerializationStrategy = 
uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.rxTaskScheduler = rxTaskScheduler;
 
         ValidationUtils.validateApplicationScope( applicationScope );
 
@@ -453,13 +451,13 @@ public class EntityCollectionManagerImpl implements 
EntityCollectionManager {
                     public void call( final CollectionIoEvent<MvccEntity> 
mvccEntityCollectionIoEvent ) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.just( mvccEntityCollectionIoEvent 
).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent 
).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
                                           .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.just( mvccEntityCollectionIoEvent 
).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent 
).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
                                           .doOnNext( writeOptimisticVerify );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index cd15c26..8342e55 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -49,7 +49,6 @@ public class RollbackAction implements Action1<Throwable> {
 
     private static final Logger log = LoggerFactory.getLogger( 
RollbackAction.class );
 
-    private final Scheduler scheduler;
     private final UniqueValueSerializationStrategy uniqueValueStrat;
     private final MvccLogEntrySerializationStrategy logEntryStrat;
 
@@ -58,7 +57,6 @@ public class RollbackAction implements Action1<Throwable> {
     public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat,
                            UniqueValueSerializationStrategy uniqueValueStrat ) 
{
 
-        scheduler = Schedulers.io();
         this.uniqueValueStrat = uniqueValueStrat;
         this.logEntryStrat = logEntryStrat;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index bc84b6b..f266643 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -36,6 +36,9 @@ import 
org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import 
org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
 import 
org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
@@ -86,6 +89,15 @@ public class CommonModule extends AbstractModule {
         //do multibindings for migrations
         //create the empty multibinder so other plugins can use it
          Multibinder.newSetBinder( binder(), MigrationPlugin.class);
+
+
+        /**
+         * RX java scheduler configuration
+         */
+
+        install ( new GuicyFigModule( RxSchedulerFig.class ));
+
+        bind( RxTaskScheduler.class).to( RxTaskSchedulerImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
new file mode 100644
index 0000000..7986132
--- /dev/null
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+public interface RxSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
+     * backpressure
+     */
+    public static final String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request 
before retrying.  Provides simple
+     * backpressure
+     */
+    public static final String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+
+
+    @Default( "100" )
+    @Key( IO_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-RxIOPool" )
+    @Key(IO_SCHEDULER_NAME)
+    String getIoSchedulerName();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
new file mode 100644
index 0000000..d6cc5e8
--- /dev/null
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import org.apache.usergrid.persistence.core.task.Task;
+
+import rx.Scheduler;
+
+
+/**
+ * An interface for returning task schedulers
+ */
+public interface RxTaskScheduler {
+
+    /**
+     * Get the scheduler for tasks that perform blocking I/O
+     * @return
+     */
+    Scheduler getAsyncIOScheduler();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
new file mode 100644
index 0000000..219cde6
--- /dev/null
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * An implementation of the task scheduler that allows us to control the 
number of I/O threads
+ */
+@Singleton
+public class RxTaskSchedulerImpl implements RxTaskScheduler {
+
+    private static final Logger log = LoggerFactory.getLogger( 
RxTaskSchedulerImpl.class );
+
+    private final Scheduler scheduler;
+    private final String poolName;
+
+    @Inject
+    public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
+
+        this.poolName = schedulerFig.getIoSchedulerName();
+
+        final int poolSize = schedulerFig.getMaxIoThreads();
+
+
+        final BlockingQueue<Runnable> queue = new 
ArrayBlockingQueue<Runnable>(poolSize);
+
+
+        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, 
poolSize );
+
+
+        this.scheduler = Schedulers.from(threadPool);
+
+
+    }
+
+
+    @Override
+    public Scheduler getAsyncIOScheduler() {
+        return scheduler;
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become 
overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final 
int maxPoolSize ) {
+
+            super( 1, maxPoolSize, 30, TimeUnit.SECONDS, queue, new 
CountingThreadFactory( ),
+                    new RejectedHandler() );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, poolName + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the 
interface
+     */
+    private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final 
ThreadPoolExecutor executor ) {
+            log.warn( "{} task queue full, rejecting task {}", poolName, r );
+
+            //TODO T.N. do we want to run this on the caller thread?
+
+            throw new RejectedExecutionException( "Unable to run task, queue 
full" );
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 07150bb..c997e5a 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.index.AliasedEntityIndex;
@@ -48,7 +49,6 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 import rx.util.async.Async;
 
 
@@ -64,12 +64,13 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
     private final IndexBufferConsumer producer;
     private final IndexFig indexFig;
     private final Timer timer;
-
+    private final RxTaskScheduler rxTaskScheduler;
 
     @Inject
     public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, 
EsProvider esProvider,
                                     IndexBufferConsumer producer, IndexFig 
indexFig, MetricsFactory metricsFactory,
-                                    final IndexCache indexCache ) {
+                                    final IndexCache indexCache, final 
RxTaskScheduler rxTaskScheduler ) {
+
 
 
         this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, 
"index.refresh.timer" );
@@ -78,6 +79,7 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
         this.producer = producer;
         this.indexFig = indexFig;
         this.indexCache = indexCache;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -141,14 +143,14 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
                 logger.error( "Failed during refresh search for " + uuid, ee );
                 throw new RuntimeException( "Failed during refresh search for 
" + uuid, ee );
             }
-        }, Schedulers.io() ).call();
+        }, rxTaskScheduler.getAsyncIOScheduler() ).call();
 
 
         return future.doOnNext( found -> {
             if ( !found.hasFinished() ) {
-                logger.error(String.format("Couldn't find record during 
refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()));
+                logger.error("Couldn't find record during refresh uuid: {} 
took ms:{} ", uuid, found.getExecutionTime());
             }else{
-                logger.info(String.format("found record during refresh uuid: 
{} took ms:{} ", uuid, found.getExecutionTime()));
+                logger.info("found record during refresh uuid: {} took ms:{} 
", uuid, found.getExecutionTime());
             }
         } ).doOnCompleted(() -> {
             //clean up our data

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 7c33084..de5e29d 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -54,11 +54,11 @@ public class IndexingUtils {
 
     public static final String ENTITY_TYPE_FIELDNAME = "entityType";
 
-    public static final String EDGE_NODE_ID_FIELDNAME = "edgeNodeId";
+    public static final String EDGE_NODE_ID_FIELDNAME = "nodeId";
 
     public static final String EDGE_NAME_FIELDNAME = "edgeName";
 
-    public static final String EDGE_NODE_TYPE_FIELDNAME = "edgeType";
+    public static final String EDGE_NODE_TYPE_FIELDNAME = "entityNodeType";
 
     public static final String EDGE_TIMESTAMP_FIELDNAME = "edgeTimestamp";
 
@@ -107,8 +107,6 @@ public class IndexingUtils {
         idString( sb, scope.getNodeId() );
         sb.append( FIELD_SEPERATOR );
         sb.append( scope.getEdgeName() );
-        sb.append( FIELD_SEPERATOR );
-        sb.append( scope.getNodeType() );
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
 
b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index 4da1902..d4c34cd 100644
--- 
a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ 
b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -21,7 +21,7 @@
         "index": "not_analyzed",
         "doc_values": true
       },
-      "edgeNodeId": {
+      "nodeId": {
         "type": "string",
         "index": "not_analyzed",
         "doc_values": true
@@ -31,7 +31,7 @@
         "index": "not_analyzed",
         "doc_values": true
       },
-      "edgeType": {
+      "entityNodeType": {
         "type": "string",
         "index": "not_analyzed",
         "doc_values": true

Reply via email to