Initial setup of command pattern and wiring into legacy code

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6098d9b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6098d9b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6098d9b6

Branch: refs/heads/two-dot-o-dev
Commit: 6098d9b60a2de306beb02d34f1bf34a42c67285f
Parents: 472ccaf
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 23 16:20:12 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 23 22:49:05 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  12 --
 .../corepersistence/CpEntityManager.java        |  29 +--
 .../corepersistence/CpEntityManagerFactory.java |  21 +-
 .../corepersistence/CpRelationManager.java      |  71 +++++--
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../corepersistence/command/CommandBuilder.java | 111 +++++++++++
 .../command/cursor/CursorSerializer.java        |  86 +++++++++
 .../command/cursor/RequestCursor.java           |  45 +++++
 .../command/cursor/ResponseCursor.java          |  56 ++++++
 .../command/read/AbstractCommand.java           | 111 +++++++++++
 .../command/read/CollectCommand.java            |  36 ++++
 .../corepersistence/command/read/Command.java   |  56 ++++++
 .../command/read/TraverseCommand.java           |  30 +++
 .../command/read/entity/EntityLoadCommand.java  | 191 +++++++++++++++++++
 .../read/graph/AbstractReadGraphCommand.java    | 110 +++++++++++
 .../read/graph/ReadGraphCollectionCommand.java  |  49 +++++
 .../read/graph/ReadGraphConnectionCommand.java  |  49 +++++
 .../results/AbstractGraphQueryExecutor.java     | 127 ++++++++++++
 .../results/CollectionGraphQueryExecutor.java   |  60 ++++++
 .../CollectionResultsLoaderFactoryImpl.java     |   1 -
 .../results/ConnectionGraphQueryExecutor.java   |  58 ++++++
 .../rx/ApplicationObservable.java               |  42 ++--
 .../rx/EdgesFromSourceObservable.java           |   4 +-
 .../rx/EdgesToTargetObservable.java             |   4 +-
 .../usergrid/persistence/EntityManager.java     |   7 +-
 .../cassandra/EntityManagerImpl.java            |  45 +++--
 .../org/apache/usergrid/CoreApplication.java    |   3 +
 .../persistence/collection/EntitySet.java       |  14 +-
 .../serialization/impl/EntitySetImpl.java       |  10 +-
 .../persistence/core/rx/OrderedMerge.java       |  28 +--
 .../apache/usergrid/persistence/graph/Edge.java |   4 +
 .../persistence/graph/SearchByEdgeType.java     |   2 +-
 .../graph/impl/SimpleSearchByEdgeType.java      |  26 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      |   5 +-
 .../impl/shard/DirectedEdgeMeta.java            |   7 +-
 .../persistence/graph/GraphManagerLoadTest.java |   6 +-
 .../graph/GraphManagerShardConsistencyIT.java   |   3 +-
 .../graph/GraphManagerStressTest.java           |   8 +-
 .../usergrid/persistence/model/entity/Id.java   |   3 +
 stack/corepersistence/pom.xml                   |   4 +-
 .../usergrid/persistence/index/query/Query.java |   8 +
 .../scenarios/ConnectionScenarios.scala         |   2 +-
 stack/pom.xml                                   |   1 -
 43 files changed, 1419 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index d91208d..f12c0b8 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -473,18 +473,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>com.clearspring.analytics</groupId>
       <artifactId>stream</artifactId>
       <version>2.7.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 789e640..038be44 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -34,6 +34,8 @@ import java.util.TreeSet;
 import java.util.UUID;
 
 import com.codahale.metrics.Meter;
+
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.FieldSet;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.slf4j.Logger;
@@ -86,6 +88,7 @@ import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
@@ -178,8 +181,6 @@ public class CpEntityManager implements EntityManager {
     private UUID applicationId;
     private Application application;
 
-    private CpEntityManagerFactory emf;
-
     private ManagerCache managerCache;
 
     private ApplicationScope applicationScope;
@@ -188,6 +189,10 @@ public class CpEntityManager implements EntityManager {
 
     private CounterUtils counterUtils;
 
+
+    private EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private GraphManagerFactory graphManagerFactory;
+
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
     private Timer aggCounterTimer;
@@ -220,22 +225,24 @@ public class CpEntityManager implements EntityManager {
     }
 
     @Override
-    public void init( EntityManagerFactory emf, UUID applicationId ) {
+    public void init( final CassandraService cassandraService, final 
CounterUtils counterUtils, final MetricsFactory metricsFactory, final 
GraphManagerFactory graphManagerFactory, final EntityCollectionManagerFactory 
entityCollectionManagerFactory, final ManagerCache managerCache, UUID 
applicationId ) {
 
-        Preconditions.checkNotNull( emf, "emf must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be 
null" );
 
-        this.emf = ( CpEntityManagerFactory ) emf;
-        this.managerCache = this.emf.getManagerCache();
+        this.managerCache = managerCache;
         this.applicationId = applicationId;
 
         applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
-        this.cass = this.emf.getCassandraService();
-        this.counterUtils = this.emf.getCounterUtils();
+        this.cass = cassandraService;
+        this.counterUtils =  counterUtils;
 
         //Timer Setup
-        this.metricsFactory = this.emf.getMetricsFactory();
+        this.metricsFactory = metricsFactory;
+
+        this.graphManagerFactory = graphManagerFactory;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+
         this.aggCounterTimer =this.metricsFactory.getTimer( 
CpEntityManager.class,
             "cp.entity.get.aggregate.counters.timer" );
         this.entCreateTimer =this.metricsFactory.getTimer( 
CpEntityManager.class, "cp.entity.create.timer" );
@@ -766,7 +773,7 @@ public class CpEntityManager implements EntityManager {
     public RelationManager getRelationManager( EntityRef entityRef ) {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
         CpRelationManager rmi = new CpRelationManager();
-        rmi.init( this, emf, applicationId, entityRef, null, metricsFactory );
+        rmi.init( this,  applicationId, entityRef, null, metricsFactory, 
managerCache, entityCollectionManagerFactory, graphManagerFactory );
         return rmi;
     }
 
@@ -2885,7 +2892,7 @@ public class CpEntityManager implements EntityManager {
     public void refreshIndex() {
 
         // refresh factory indexes
-        emf.refreshIndex();
+
 
         // refresh this Entity Manager's application's index
         EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index fe4d828..5b8e715 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.Setup;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import 
org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
@@ -57,6 +58,7 @@ import 
org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.OrganizationAlreadyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.EntityIndex;
@@ -99,12 +101,14 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private final OrgApplicationCache orgApplicationCache;
 
 
-    private ManagerCache managerCache;
-    private DataMigrationManager dataMigrationManager;
+    private final ManagerCache managerCache;
+    private final DataMigrationManager dataMigrationManager;
+    private final GraphManagerFactory graphManagerFactory;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
 
-    private CassandraService cassandraService;
-    private CounterUtils counterUtils;
-    private Injector injector;
+    private final CassandraService cassandraService;
+    private final CounterUtils counterUtils;
+    private final Injector injector;
     private final MetricsFactory metricsFactory;
 
     public CpEntityManagerFactory(
@@ -115,8 +119,11 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.injector = injector;
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.dataMigrationManager = injector.getInstance( 
DataMigrationManager.class );
+        this.graphManagerFactory = injector.getInstance( 
GraphManagerFactory.class );
+        this.entityCollectionManagerFactory = injector.getInstance( 
EntityCollectionManagerFactory.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
 
+
         this.orgApplicationCache = new OrgApplicationCacheImpl( this );
     }
 
@@ -172,7 +179,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private EntityManager _getEntityManager( UUID applicationId ) {
 
         EntityManager em = new CpEntityManager();
-        em.init( this, applicationId );
+        em.init( cassandraService, counterUtils, metricsFactory, 
graphManagerFactory, entityCollectionManagerFactory, managerCache, 
applicationId );
 
         return em;
     }
@@ -427,7 +434,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
 
         Observable<Edge> edges = gm.loadEdgesFromSource( new 
SimpleSearchByEdgeType(
                 fromEntityId, edgeType, Long.MAX_VALUE,
-                SearchByEdgeType.Order.DESCENDING, null ));
+                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
 
         Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index da39ea9..edd352e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -33,6 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import 
org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor;
+import 
org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor;
 import 
org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
 import org.apache.usergrid.corepersistence.results.QueryExecutor;
@@ -61,6 +63,8 @@ import 
org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -71,6 +75,7 @@ import org.apache.usergrid.persistence.geo.EntityLocationRef;
 import org.apache.usergrid.persistence.geo.model.Point;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -106,6 +111,7 @@ import org.apache.usergrid.utils.MapUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import me.prettyprint.hector.api.Keyspace;
@@ -167,9 +173,9 @@ public class CpRelationManager implements RelationManager {
     private static final Logger logger = LoggerFactory.getLogger( 
CpRelationManager.class );
 
 
-    private CpEntityManagerFactory emf;
-
     private ManagerCache managerCache;
+    private EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private GraphManagerFactory graphManagerFactory;
 
     private EntityManager em;
 
@@ -196,15 +202,16 @@ public class CpRelationManager implements RelationManager 
{
 
 
     public CpRelationManager init(
-        EntityManager em,
-            CpEntityManagerFactory emf,
+            EntityManager em,
             UUID applicationId,
             EntityRef headEntity,
             IndexBucketLocator indexBucketLocator,
-            MetricsFactory metricsFactory) {
+            MetricsFactory metricsFactory,
+            ManagerCache managerCache,
+            EntityCollectionManagerFactory entityCollectionManagerFactory,
+            GraphManagerFactory graphManagerFactory ) {
 
         Assert.notNull( em, "Entity manager cannot be null" );
-        Assert.notNull( emf, "Entity manager factory cannot be null" );
         Assert.notNull( applicationId, "Application Id cannot be null" );
         Assert.notNull( headEntity, "Head entity cannot be null" );
         Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be 
null" );
@@ -213,10 +220,9 @@ public class CpRelationManager implements RelationManager {
         //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be 
null" );
 
         this.em = em;
-        this.emf = emf;
         this.applicationId = applicationId;
         this.headEntity = headEntity;
-        this.managerCache = emf.getManagerCache();
+        this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( 
applicationId );
 
         this.cass = em.getCass(); // TODO: eliminate need for this via Core 
Persistence
@@ -232,6 +238,8 @@ public class CpRelationManager implements RelationManager {
         // load the Core Persistence version of the head entity as well
         this.headEntityScope = getCollectionScopeNameFromEntityType(
                 applicationScope.getApplication(), headEntity.getType());
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from scope\n   app {}\n   
owner {}\n   name {}",
@@ -331,7 +339,7 @@ public class CpRelationManager implements RelationManager {
                   public Observable<Edge> call( final String edgeType ) {
                       return gm.loadEdgesToTarget(
                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), 
edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING, null ) );
+                              SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
 
                   }
               } );
@@ -400,7 +408,7 @@ public class CpRelationManager implements RelationManager {
                     public Observable<Edge> call( final String etype ) {
                         return gm.loadEdgesToTarget( new 
SimpleSearchByEdgeType(
                             cpHeadEntity.getId(), etype, Long.MAX_VALUE,
-                            SearchByEdgeType.Order.DESCENDING, null ) );
+                            SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
                     }
                 } )
 
@@ -516,7 +524,7 @@ public class CpRelationManager implements RelationManager {
             CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
             System.currentTimeMillis(),
             SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
+            Optional.<Edge>absent() ) ); // last
 
         Iterator<Edge> iterator = 
edgesToTarget.toBlockingObservable().getIterator();
         int count = 0;
@@ -541,7 +549,7 @@ public class CpRelationManager implements RelationManager {
             CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
             System.currentTimeMillis(),
             SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
+            Optional.<Edge>absent() ) ); // last
 
         int count = edgesFromSource.take( 2 ).count().toBlocking().last();
 
@@ -927,6 +935,21 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity 
.getUuid() );
         }
 
+        query.setEntityType( collection.getType() );
+        query = adjustQuery( query );
+
+        /**
+         * It's a graph query, execute the graph query executor.
+         *
+         * TODO refactor all of this away into commands
+         */
+        if(query.isGraphQuery()){
+            final QueryExecutor executor = new CollectionGraphQueryExecutor( 
this.entityCollectionManagerFactory,graphManagerFactory, applicationScope, 
headEntity, query.getCursor(), collName, query.getLimit() );
+            return executor.next();
+        }
+
+
+
         final IndexScope indexScope = new IndexScopeImpl(
             cpHeadEntity.getId(),
             CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) 
);
@@ -939,9 +962,6 @@ public class CpRelationManager implements RelationManager {
 
             indexScope.getOwner().toString(), indexScope.getName() );
 
-        query.setEntityType( collection.getType() );
-        query = adjustQuery( query );
-
 
         final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = new 
CollectionResultsLoaderFactoryImpl( managerCache );
 
@@ -1362,6 +1382,21 @@ public class CpRelationManager implements 
RelationManager {
 
         headEntity = em.validate( headEntity );
 
+
+        query = adjustQuery( query );
+
+
+        /**
+         * It's a graph query, execute the graph query executor.
+         *
+         * TODO refactor all of this away into commands
+         */
+        if(query.isGraphQuery()){
+            final QueryExecutor executor = new ConnectionGraphQueryExecutor( 
this.entityCollectionManagerFactory,graphManagerFactory, applicationScope, 
headEntity, query.getCursor(), connection, query.getLimit() );
+            return executor.next();
+        }
+
+
         final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
                 CpNamingUtils.getConnectionScopeName( connection ) );
 
@@ -1373,7 +1408,6 @@ public class CpRelationManager implements RelationManager 
{
                         indexScope.getOwner().toString(), 
indexScope.getName(), searchTypes
                 } );
 
-        query = adjustQuery( query );
 
         final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = new 
ConnectionResultsLoaderFactoryImpl( managerCache,
             headEntity, connection );
@@ -1381,9 +1415,6 @@ public class CpRelationManager implements RelationManager 
{
         final QueryExecutor executor = new 
ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, 
indexScope, searchTypes, query);
 
         return executor.next();
-//        CandidateResults crs = ei.search( indexScope, searchTypes, query );
-
-//        return buildConnectionResults( indexScope, query, crs, connection );
     }
 
 
@@ -1455,7 +1486,7 @@ public class CpRelationManager implements RelationManager 
{
 
     private CpRelationManager getRelationManager( EntityRef headEntity ) {
         CpRelationManager rmi = new CpRelationManager();
-        rmi.init( em, emf, applicationId, headEntity, null, metricsFactory);
+        rmi.init( em, applicationId, headEntity, null, metricsFactory, 
managerCache, entityCollectionManagerFactory, graphManagerFactory);
         return rmi;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 4b902d8..b2354a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
@@ -111,7 +113,7 @@ public class CpWalker {
                 logger.debug( "Loading edges of type {} from node {}", 
edgeType, applicationId );
 
                 return gm.loadEdgesFromSource(  new SimpleSearchByEdgeType(
-                    applicationId, edgeType, Long.MAX_VALUE, order , null ) );
+                    applicationId, edgeType, Long.MAX_VALUE, order ,  
Optional.<Edge>absent() ) );
 
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
new file mode 100644
index 0000000..de76f50
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command;
+
+
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.command.read.CollectCommand;
+import org.apache.usergrid.corepersistence.command.read.Command;
+import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A builder that will allow us to build a traversal command for execution
+ */
+public class CommandBuilder {
+
+
+    private final ApplicationScope applicationScope;
+    private final RequestCursor requestCursor;
+    private final ResponseCursor responseCursor;
+    private final int limit;
+
+    private int count = 0;
+
+    private Observable<Id> currentObservable;
+
+
+       /**
+     * Our first pass, where we implement our start point as an Id until we 
can use this to perform our entire
+     * traversal.  Eventually as we untangle the existing Query service 
nightmare, the sourceId will be remove and should
+     * only be traversed from the root application
+     */
+    public CommandBuilder( final ApplicationScope applicationScope, final Id 
sourceId,
+                           final Optional<String> requestCursor, final int 
limit ) {
+
+        this.applicationScope = applicationScope;
+        this.limit = limit;
+
+        //set the request cursor
+        this.requestCursor = new RequestCursor( requestCursor );
+
+        //set the response cursor
+        this.responseCursor = new ResponseCursor();
+
+
+        this.currentObservable = Observable.just( sourceId );
+    }
+
+
+    /**
+     * Add a read command that will read Ids and produce Ids.  This is an 
intermediate traversal operations
+     */
+    public CommandBuilder withTraverseCommand( final TraverseCommand 
traverseCommand ) {
+
+        setState( traverseCommand );
+
+        this.currentObservable = currentObservable.compose( traverseCommand );
+
+        return this;
+    }
+
+
+    /**
+     * Build the final collection step, and
+     */
+    public <T> Observable<T> build( final CollectCommand<T> collectCommand ) {
+        setState( collectCommand );
+
+        collectCommand.setLimit( limit );
+
+        return currentObservable.compose( collectCommand );
+    }
+
+
+    /**
+     * Set the id of the state
+     * @param command
+     */
+    private void setState( Command<?> command ) {
+        command.setId( count );
+        //done for clarity
+        count++;
+
+        command.setCursorCaches( requestCursor, responseCursor );
+        command.setApplicationScope( applicationScope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
new file mode 100644
index 0000000..b45e7da
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.Base64Variant;
+import com.fasterxml.jackson.core.Base64Variants;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+
+
+/**
+ * A utility to serialize objects to/from cursors
+ */
+public class CursorSerializer {
+
+
+    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
+
+    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
+
+    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
+
+
+    /**
+     * Serialize the serializable object as a cursor
+     */
+    public static String asCursor( final Serializable cursor ) {
+
+        try {
+            return MAPPER.writer( VARIANT ).writeValueAsString( cursor );
+        }
+        catch ( JsonProcessingException e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
+    }
+
+
+    /**
+     * Deserialize from the cursor
+     * @param cursor
+     * @return
+     * @throws CursorParseException
+     */
+    public <T extends Serializable> T fromCursor( final String cursor, final 
Class<T> cursorClass ) throws CursorParseException {
+        try {
+
+            final JsonParser parser = MAPPER.getFactory().createParser( cursor 
);
+            return MAPPER.reader( VARIANT ).readValue( parser, cursorClass);
+        }
+        catch ( Exception e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
+    }
+
+
+    /**
+     * Thrown when we can't parse a cursor
+     */
+    public static class CursorParseException extends RuntimeException {
+        public CursorParseException( final String message, final Throwable 
cause ) {
+            super( message, cause );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
new file mode 100644
index 0000000..60d54ff
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A cursor that has been passed in with our request.  Adds utils for parsing 
values
+ */
+public class RequestCursor {
+
+    public RequestCursor(final Optional<String> cursor){
+
+    }
+
+
+    /**
+     * Get the cursor with the specified id
+     */
+    public <T extends Serializable> T getCursor( final int id, final Class<T> 
cursorType ) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
new file mode 100644
index 0000000..02aae34
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A cursor used in rendering a response
+ */
+public class ResponseCursor {
+
+
+
+    /**
+     * We use a map b/c some indexes might be skipped
+     */
+    private Map<Integer, ? super Serializable> cursors = new HashMap<>();
+
+    /**
+     * Set the possible cursor value into the index. DOES NOT parse the 
cursor.  This is intentional for performance
+     */
+    public <T extends Serializable> void setCursor( final int id, final T 
cursor ) {
+        cursors.put( id, cursor );
+    }
+
+
+    private void ensureCapacity() {
+
+    }
+
+
+    public String encodeAsString() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
new file mode 100644
index 0000000..59e1848
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+import java.io.Serializable;
+
+import javax.xml.ws.Response;
+
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ */
+public abstract class AbstractCommand<T, C extends Serializable> implements 
Command<T> {
+
+    private int id;
+    /**
+     * The cache of the cursor that was set when the read was started
+     */
+    private RequestCursor readCache;
+
+    /**
+     * The current state of the write cache.  Gets updated as we traverse the 
observables
+     */
+    private ResponseCursor writeCache;
+
+
+    /**
+     * The applicationScope
+     */
+    protected ApplicationScope applicationScope;
+
+
+    @Override
+    public void setId( final int id ) {
+        this.id = id;
+    }
+
+
+    @Override
+    public void setCursorCaches( final RequestCursor readCache, final 
ResponseCursor writeCache ) {
+        this.readCache = readCache;
+        this.writeCache = writeCache;
+    }
+
+
+    @Override
+    public void setApplicationScope( final ApplicationScope applicationScope ) 
{
+       this.applicationScope = applicationScope;
+    }
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it 
exists
+     */
+    protected Optional<C> getCursor() {
+        final C cursor = readCache.getCursor( id, getCursorClass() );
+
+        return Optional.fromNullable( cursor );
+    }
+
+
+
+
+
+    /**
+     * Set the cursor value into the new cursor write cache
+     * @param newValue
+     */
+    protected void setCursor(final C newValue){
+        writeCache.setCursor( id, newValue );
+    }
+
+
+    /**
+     * Generate our state as a cursor
+     * @return
+     */
+    protected String generateCursor(){
+        return writeCache.encodeAsString();
+    }
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract Class<C> getCursorClass();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
new file mode 100644
index 0000000..c2ad931
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+/**
+ * A command that is used to reduce our stream of results into a final output
+ * @param <T>
+ */
+public interface CollectCommand<T> extends Command<T>{
+
+    /**
+     * Set the prefered result size for the command
+     * @param resultSize
+     */
+    void setLimit( final int resultSize );
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
new file mode 100644
index 0000000..ada47d2
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for a read command.  This takes an input of Id, performs some 
operation, and emits Id for further
+ * processing
+ */
+public interface Command<T> extends Observable.Transformer<Id, T> {
+
+
+    /**
+     * Set the id of this command in it's execution environment
+     */
+    void setId( final int id );
+
+    /**
+     * Set the cursor cache into the command
+     *
+     * @param readCache Set the cache that was used in the request
+     * @param writeCache Set the cache to be used when writing the results
+     */
+    void setCursorCaches( final RequestCursor readCache, final ResponseCursor 
writeCache );
+
+    /**
+     * Set the application scope of the command
+     * @param applicationScope
+     */
+    void setApplicationScope(final ApplicationScope applicationScope);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java
new file mode 100644
index 0000000..c8fe2f8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an 
observable of ids, and emits
+ * an observable of ids
+ */
+public interface TraverseCommand extends Command<Id> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
new file mode 100644
index 0000000..f4de9e0
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.entity;
+
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.CollectCommand;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+import rx.observables.GroupedObservable;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
+ */
+public class EntityLoadCommand extends AbstractCommand<Results, Serializable> 
implements CollectCommand<Results> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+    //TODO get rid of this when merged into 2.0 dev
+    private final ApplicationScope applicationScope;
+    private int resultSize;
+
+
+    public EntityLoadCommand( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                              final ApplicationScope applicationScope ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.applicationScope = applicationScope;
+    }
+
+
+    @Override
+    protected Class<Serializable> getCursorClass() {
+        return null;
+    }
+
+
+    @Override
+    public Observable<Results> call( final Observable<? extends Id> observable 
) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
+         * objects
+         */
+
+
+        return observable.buffer( resultSize ).flatMap( new Func1<List<? 
extends Id>, Observable<Results>>() {
+            @Override
+            public Observable<Results> call( final List<? extends Id> ids ) {
+
+                return Observable.from( ids )
+                    //group them by type so we can load them, in 2.0 dev this 
step will be removed
+                    .groupBy( new Func1<Id, String>() {
+                        @Override
+                        public String call( final Id id ) {
+                            return id.getType();
+                        }
+                    } )
+
+                        //take all our groups and load them as id sets
+
+                    .flatMap( new Func1<GroupedObservable<String, Id>, 
Observable<EntitySet>>() {
+                        @Override
+                        public Observable<EntitySet> call(
+                            final GroupedObservable<String, Id> 
stringIdGroupedObservable ) {
+
+
+                            final String entityType = 
stringIdGroupedObservable.getKey();
+
+                            final CollectionScope collectionScope =
+                                getCollectionScopeNameFromEntityType( 
applicationScope.getApplication(), entityType );
+
+
+                            return stringIdGroupedObservable.toList()
+                                                            .flatMap( new 
Func1<List<Id>, Observable<EntitySet>>() {
+                                                                @Override
+                                                                public 
Observable<EntitySet> call(
+                                                                    final 
List<Id> ids ) {
+
+                                                                    final 
EntityCollectionManager ecm =
+                                                                        
entityCollectionManagerFactory
+                                                                            
.createCollectionManager( collectionScope );
+                                                                    return 
ecm.load( ids );
+                                                                }
+                                                            } );
+                        }
+                    } )
+                        //emit our groups of entities as a stream of entities
+                    .flatMap( new Func1<EntitySet, 
Observable<org.apache.usergrid.persistence.Entity>>() {
+                        @Override
+                        public 
Observable<org.apache.usergrid.persistence.Entity> call( final EntitySet 
entitySet ) {
+                            //emit our entities, and filter out deleted entites
+                            return Observable.from( entitySet.getEntities() 
).map(
+                                new Func1<MvccEntity, 
org.apache.usergrid.persistence.Entity>() {
+
+                                    @Override
+                                    public 
org.apache.usergrid.persistence.Entity call( final MvccEntity mvccEntity ) {
+                                        return mapEntity( mvccEntity );
+                                    }
+                                } )
+                                //filter null entities
+                                .filter( new 
Func1<org.apache.usergrid.persistence.Entity, Boolean>() {
+                                    @Override
+                                    public Boolean call( final 
org.apache.usergrid.persistence.Entity entity ) {
+                                        return entity == null;
+                                    }
+                                } );
+                        }
+                    } )
+
+                        //convert them to a list, then map them into results
+                    .toList().map( new 
Func1<List<org.apache.usergrid.persistence.Entity>, Results>() {
+                        @Override
+                        public Results call( final 
List<org.apache.usergrid.persistence.Entity> entities ) {
+                            final Results results = Results.fromEntities( 
entities );
+                            results.setCursor( generateCursor() );
+
+                            return results;
+                        }
+                    } );
+            }
+        } );
+    }
+
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
+        if ( !mvccEntity.getEntity().isPresent() ) {
+            return null;
+        }
+
+
+        final Entity cpEntity = mvccEntity.getEntity().get();
+        final Id entityId = cpEntity.getId();
+
+        org.apache.usergrid.persistence.Entity entity =
+            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+        entity.addProperties( entityMap );
+
+        return entity;
+    }
+
+
+    @Override
+    public void setLimit( final int resultSize ) {
+        this.resultSize = resultSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
new file mode 100644
index 0000000..141fb06
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractReadGraphCommand extends AbstractCommand<Id, 
Edge> implements TraverseCommand {
+
+    private final GraphManagerFactory graphManagerFactory;
+
+
+    /**
+     * Create a new instance of our command
+     * @param graphManagerFactory
+     */
+    public AbstractReadGraphCommand( final GraphManagerFactory 
graphManagerFactory ) {
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<? extends Id> observable ) {
+
+        //get the graph manager
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //set our our constant state
+        final Optional<Edge> startFromCursor = getCursor();
+
+        final String edgeName = getEdgeTypeName();
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( new Func1<Id, Observable<Id>>() {
+
+            @Override
+            public Observable<Id> call( final Id id ) {
+
+                final SimpleSearchByEdgeType search = new 
SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE,
+                    SearchByEdgeType.Order.DESCENDING, startFromCursor   );
+
+                /**
+                 * TODO, pass a message with pointers to our cursor values to 
be generated later
+                 */
+                return graphManager.loadEdgesFromSource( search ).doOnNext( 
new Action1<Edge>() {
+                    @Override
+                    public void call( final Edge edge ) {
+                        setCursor( edge );
+                    }
+                } ).map( new Func1<Edge, Id>() {
+                    @Override
+                    public Id call( final Edge edge ) {
+                        return edge.getTargetNode();
+                    }
+                } );
+            }
+        } );
+    }
+
+
+    @Override
+    protected Class<Edge> getCursorClass() {
+        return Edge.class;
+    }
+
+
+
+    /**
+     * Get the edge type name we should use when traversing
+     * @return
+     */
+    protected abstract String getEdgeTypeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
new file mode 100644
index 0000000..9ccb969
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+
+
+/**
+ * Command for reading graph edges on a collection
+ */
+public class ReadGraphCollectionCommand extends AbstractReadGraphCommand {
+
+    private final String collectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public ReadGraphCollectionCommand( final GraphManagerFactory 
graphManagerFactory, final String collectionName ) {
+        super( graphManagerFactory );
+        this.collectionName = collectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getCollectionScopeNameFromCollectionName( collectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
new file mode 100644
index 0000000..adebd45
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionCommand extends AbstractReadGraphCommand {
+
+    private final String connectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public ReadGraphConnectionCommand( final GraphManagerFactory 
graphManagerFactory, final String connectionName ) {
+        super( graphManagerFactory );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getConnectionScopeName( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
new file mode 100644
index 0000000..0bea013
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.corepersistence.command.CommandBuilder;
+import 
org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCommand;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators DO 
NOT use this as a model for moving
+ * forward, pandas will die.
+ */
+public abstract class AbstractGraphQueryExecutor implements QueryExecutor {
+
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final ApplicationScope applicationScope;
+    private final Id sourceId;
+    private final int limit;
+
+    private final Optional<String> requestCursor;
+
+    private Iterator<Results> observableIterator;
+
+
+    public AbstractGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                       final ApplicationScope 
applicationScope, final EntityRef source,
+                                       final String cursor, final int limit ) {
+
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.applicationScope = applicationScope;
+        this.limit = limit;
+        this.sourceId = new SimpleId( source.getUuid(), source.getType() );
+
+        this.requestCursor = Optional.fromNullable( cursor );
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //hasn't been set up yet, run through our first setup
+        if ( observableIterator == null ) {
+            //assign them to an interator.  this now uses an internal buffer 
with backpressure, so we won't load all
+            // results
+            //set up our command builder
+            final CommandBuilder commandBuilder = new CommandBuilder( 
applicationScope, sourceId, requestCursor, limit );
+
+
+            addTraverseCommand( commandBuilder );
+
+            //construct our results to be observed later. This is a cold 
observable
+            final Observable<Results> resultsObservable =
+                commandBuilder.build( new EntityLoadCommand( 
entityCollectionManagerFactory, applicationScope ) );
+
+            this.observableIterator = 
resultsObservable.toBlocking().getIterator();
+        }
+
+
+        //see if our current results are not null
+        return observableIterator.hasNext();
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results results = observableIterator.next();
+
+        //ugly and tight coupling, but we don't have a choice until we finish 
some refactoring
+        results.setQueryExecutor( this );
+
+        return results;
+    }
+
+
+    @Override
+    public void remove() {
+        throw new RuntimeException( "Remove not implemented!!" );
+    }
+
+
+    /**
+     * Add the traverse command to the graph
+     */
+    protected abstract void addTraverseCommand( final CommandBuilder 
commandBuilder );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
new file mode 100644
index 0000000..651c27e
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.command.CommandBuilder;
+import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand;
+import org.apache.usergrid.persistence.EntityRef;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Create The collection graph query executor
+ */
+public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final String collectionName;
+
+
+    public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                         final GraphManagerFactory 
graphManagerFactory,
+                                         final ApplicationScope 
applicationScope, final EntityRef source,
+                                         final String cursor, final String 
connectionName, final int limit ) {
+
+        super( entityCollectionManagerFactory, applicationScope, source, 
cursor, limit );
+        this.graphManagerFactory = graphManagerFactory;
+
+        Preconditions.checkNotNull( connectionName, "connectionName is 
required on the query" );
+        this.collectionName = connectionName;
+    }
+
+
+    @Override
+    protected void addTraverseCommand( final CommandBuilder commandBuilder ) {
+        //set the traverse command from the source Id to the connect name
+        commandBuilder.withTraverseCommand( new ReadGraphCollectionCommand( 
graphManagerFactory, collectionName ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
index b79700b..4b43142 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
@@ -48,7 +48,6 @@ public class CollectionResultsLoaderFactoryImpl implements 
ResultsLoaderFactory
             verifier = new CollectionRefsVerifier();
         }
         else if ( resultsLevel == Query.Level.IDS ) {
-//            verifier = new RefsVerifier();
             verifier = new IdsVerifier();
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
new file mode 100644
index 0000000..61726b7
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.command.CommandBuilder;
+import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand;
+import org.apache.usergrid.persistence.EntityRef;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.common.base.Preconditions;
+
+
+public class ConnectionGraphQueryExecutor extends AbstractGraphQueryExecutor {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final String connectionName;
+
+
+    public ConnectionGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                         final GraphManagerFactory 
graphManagerFactory,
+                                         final ApplicationScope 
applicationScope, final EntityRef source,
+                                         final String cursor, final String 
connectionType, final int limit) {
+
+        super( entityCollectionManagerFactory, applicationScope, source, 
cursor, limit );
+        this.graphManagerFactory = graphManagerFactory;
+
+        Preconditions.checkNotNull(connectionType, "connectionType is required 
on the query" );
+        this.connectionName = connectionType;
+    }
+
+
+
+    @Override
+    protected void addTraverseCommand( final CommandBuilder commandBuilder ) {
+     //set the traverse command from the source Id to the connect name
+        commandBuilder.withTraverseCommand( new ReadGraphConnectionCommand( 
graphManagerFactory, connectionName ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 6019bca..988cd3b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -39,6 +39,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -89,37 +91,37 @@ public class ApplicationObservable {
         //we have app infos.  For each of these app infos, we have to load the 
application itself
         Observable<Id> appIds = gm.loadEdgesFromSource(
                 new SimpleSearchByEdgeType( rootAppId, edgeType, 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
+                        Optional.<Edge>absent() ) ).flatMap( new Func1<Edge, 
Observable<Id>>() {
             @Override
             public Observable<Id> call( final Edge edge ) {
                 //get the app info and load it
                 final Id appInfo = edge.getTargetNode();
 
                 return collectionManager.load( appInfo )
-                        //filter out null entities
-                        .filter( new Func1<Entity, Boolean>() {
-                            @Override
-                            public Boolean call( final Entity entity ) {
-                                if ( entity == null ) {
-                                    logger.warn( "Encountered a null 
application info for id {}", appInfo );
-                                    return false;
-                                }
-
-                                return true;
+                    //filter out null entities
+                    .filter( new Func1<Entity, Boolean>() {
+                        @Override
+                        public Boolean call( final Entity entity ) {
+                            if ( entity == null ) {
+                                logger.warn( "Encountered a null application 
info for id {}", appInfo );
+                                return false;
                             }
-                        } )
-                                //get the id from the entity
-                        .map( new 
Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
+                            return true;
+                        }
+                    } )
+                        //get the id from the entity
+                    .map( new 
Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
-                            @Override
-                            public Id call( final 
org.apache.usergrid.persistence.model.entity.Entity entity ) {
 
-                                final UUID uuid = ( UUID ) entity.getField( 
"applicationUuid" ).getValue();
+                        @Override
+                        public Id call( final 
org.apache.usergrid.persistence.model.entity.Entity entity ) {
 
-                                return CpNamingUtils.generateApplicationId( 
uuid );
-                            }
-                        } );
+                            final UUID uuid = ( UUID ) entity.getField( 
"applicationUuid" ).getValue();
+
+                            return CpNamingUtils.generateApplicationId( uuid );
+                        }
+                    } );
             }
         } );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
index d3e2ee5..b23886b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -56,7 +58,7 @@ public class EdgesFromSourceObservable {
                 logger.debug( "Loading edges of edgeType {} from {}", 
edgeType, sourceNode );
 
                 return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( 
sourceNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
+                        SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent()  ) );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
index c5dc54d..3130a72 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -56,7 +58,7 @@ public class EdgesToTargetObservable {
                 logger.debug( "Loading edges of edgeType {} to {}", edgeType, 
targetNode);
 
                 return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( 
targetNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
+                        SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent()  ) );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 65fac8d..645618c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -17,6 +17,11 @@
 package org.apache.usergrid.persistence;
 
 
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.cassandra.CounterUtils;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.query.Query;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -698,7 +703,7 @@ public interface EntityManager {
     */
     void deleteIndex();
 
-    public void init( EntityManagerFactory emf, UUID applicationId);
+    public void init( final CassandraService cassandraService, final 
CounterUtils counterUtils, final MetricsFactory metricsFactory, final 
GraphManagerFactory graphManagerFactory, final EntityCollectionManagerFactory 
entityCollectionManagerFactory, final ManagerCache managerCache, UUID 
applicationId);
 
     /** For testing purposes */
     public void flushManagerCaches();

Reply via email to