Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-593 0523dc038 -> 9dc65dc8c


Refactor complete.  Now need to test


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9dc65dc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9dc65dc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9dc65dc8

Branch: refs/heads/USERGRID-593
Commit: 9dc65dc8c3176d28d61119c0c434b47b7fc08254
Parents: 0523dc0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 24 18:32:36 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 24 18:32:36 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 163 +++++++++----------
 .../pipeline/read/FilterFactory.java            |   9 +-
 .../pipeline/read/ReadPipelineBuilder.java      |  29 ++--
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  69 +++++---
 .../results/AbstractGraphQueryExecutor.java     | 133 ---------------
 .../results/CollectionGraphQueryExecutor.java   |  60 -------
 .../results/ConnectionGraphQueryExecutor.java   |  58 -------
 7 files changed, 139 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index d8f9446..af926bf 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,12 +31,9 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.index.AsyncIndexService;
-import 
org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl;
-import 
org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor;
-import org.apache.usergrid.corepersistence.results.QueryExecutor;
+import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
+import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -114,6 +111,7 @@ public class CpRelationManager implements RelationManager {
     private ManagerCache managerCache;
     private EntityCollectionManagerFactory entityCollectionManagerFactory;
     private GraphManagerFactory graphManagerFactory;
+    private PipelineBuilderFactory pipelineBuilderFactory;
 
     private EntityManager em;
 
@@ -131,8 +129,10 @@ public class CpRelationManager implements RelationManager {
     private Timer updateCollectionTimer;
 
 
-    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache, final
-                              EntityCollectionManagerFactory 
entityCollectionManagerFactory, final GraphManagerFactory graphManagerFactory,  
final AsyncIndexService indexService, final EntityManager em, final UUID 
applicationId, final EntityRef headEntity) {
+    public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache,
+                              final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                              final GraphManagerFactory graphManagerFactory, 
final AsyncIndexService indexService,
+                              final EntityManager em, final UUID 
applicationId, final EntityRef headEntity ) {
 
 
         Assert.notNull( em, "Entity manager cannot be null" );
@@ -158,8 +158,8 @@ public class CpRelationManager implements RelationManager {
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from app {}", new 
Object[] {
-                    headEntity.getType(), headEntity.getUuid(), 
applicationScope
-                } );
+                headEntity.getType(), headEntity.getUuid(), applicationScope
+            } );
         }
 
         Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() 
);
@@ -171,24 +171,22 @@ public class CpRelationManager implements RelationManager 
{
             .format( "cpHeadEntity cannot be null for entity id %s, app id 
%s", entityId.getUuid(), applicationId ) );
 
         this.indexService = indexService;
-
-
     }
 
 
     @Override
     public Set<String> getCollectionIndexes( String collectionName ) throws 
Exception {
-       GraphManager gm = managerCache.getGraphManager( applicationScope );
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
         String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( 
collectionName );
 
         logger.debug( "getCollectionIndexes(): Searching for edge type prefix 
{} to target {}:{}", new Object[] {
-                edgeTypePrefix, cpHeadEntity.getId().getType(), 
cpHeadEntity.getId().getUuid()
-            } );
+            edgeTypePrefix, cpHeadEntity.getId().getType(), 
cpHeadEntity.getId().getUuid()
+        } );
 
         Observable<Set<String>> types =
-            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( 
cpHeadEntity.getId(), edgeTypePrefix, null ) ).collect(
-                () -> new HashSet<>(), ( set, type ) -> set.add( type ) );
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( 
cpHeadEntity.getId(), edgeTypePrefix, null ) )
+              .collect( () -> new HashSet<>(), ( set, type ) -> set.add( type 
) );
 
 
         return types.toBlocking().last();
@@ -238,7 +236,7 @@ public class CpRelationManager implements RelationManager {
                   public Observable<Edge> call( final String edgeType ) {
                       return gm.loadEdgesToTarget(
                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), 
edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
+                              SearchByEdgeType.Order.DESCENDING, 
Optional.<Edge>absent() ) );
                   }
               } );
 
@@ -249,22 +247,20 @@ public class CpRelationManager implements RelationManager 
{
 
 
         return edges.collect( () -> new LinkedHashMap<EntityRef, 
Set<String>>(), ( entityRefSetMap, edge ) -> {
-                if ( fromEntityType != null && !fromEntityType.equals( 
edge.getSourceNode().getType() ) ) {
-                    logger.debug( "Ignoring edge from entity type {}", 
edge.getSourceNode().getType() );
-                    return;
-                }
+            if ( fromEntityType != null && !fromEntityType.equals( 
edge.getSourceNode().getType() ) ) {
+                logger.debug( "Ignoring edge from entity type {}", 
edge.getSourceNode().getType() );
+                return;
+            }
 
-                final EntityRef eref =
-                    new SimpleEntityRef( edge.getSourceNode().getType(), 
edge.getSourceNode().getUuid() );
+            final EntityRef eref =
+                new SimpleEntityRef( edge.getSourceNode().getType(), 
edge.getSourceNode().getUuid() );
 
-                String name = getNameFromEdgeType( edge.getType() );
-                addMapSet( entityRefSetMap, eref, name );
-            } ).toBlocking().last();
+            String name = getNameFromEdgeType( edge.getType() );
+            addMapSet( entityRefSetMap, eref, name );
+        } ).toBlocking().last();
     }
 
 
-
-
     @Override
     public boolean isConnectionMember( String connectionType, EntityRef entity 
) throws Exception {
 
@@ -273,8 +269,8 @@ public class CpRelationManager implements RelationManager {
         String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( 
connectionType );
 
         logger.debug( "isConnectionMember(): Checking for edge type {} from 
{}:{} to {}:{}", new Object[] {
-                edgeType, headEntity.getType(), headEntity.getUuid(), 
entity.getType(), entity.getUuid()
-            } );
+            edgeType, headEntity.getType(), headEntity.getUuid(), 
entity.getType(), entity.getUuid()
+        } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         Observable<Edge> edges = gm.loadEdgeVersions(
@@ -294,8 +290,8 @@ public class CpRelationManager implements RelationManager {
         String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( 
collName );
 
         logger.debug( "isCollectionMember(): Checking for edge type {} from 
{}:{} to {}:{}", new Object[] {
-                edgeType, headEntity.getType(), headEntity.getUuid(), 
entity.getType(), entity.getUuid()
-            } );
+            edgeType, headEntity.getType(), headEntity.getUuid(), 
entity.getType(), entity.getUuid()
+        } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         Observable<Edge> edges = gm.loadEdgeVersions(
@@ -306,7 +302,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-
     @Override
     public Set<String> getCollections() throws Exception {
 
@@ -408,8 +403,8 @@ public class CpRelationManager implements RelationManager {
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loaded member entity {}:{} from   app {}\n   " + " 
data {}", new Object[] {
-                    itemRef.getType(), itemRef.getUuid(), applicationScope, 
CpEntityMapUtils.toMap( memberEntity )
-                } );
+                itemRef.getType(), itemRef.getUuid(), applicationScope, 
CpEntityMapUtils.toMap( memberEntity )
+            } );
         }
 
 
@@ -419,14 +414,13 @@ public class CpRelationManager implements RelationManager 
{
         gm.writeEdge( edge ).toBlocking().last();
 
 
-
         //perform indexing
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Wrote edge {}", edge );
         }
 
-        indexService.queueEntityIndexUpdate( applicationScope, memberEntity);
+        indexService.queueEntityIndexUpdate( applicationScope, memberEntity );
 
 
         if ( logger.isDebugEnabled() ) {
@@ -492,7 +486,7 @@ public class CpRelationManager implements RelationManager {
             addToCollection( collName, itemEntity );
 
             if ( collection != null && collection.getLinkedCollection() != 
null ) {
-                Id itemEntityId = new SimpleId( 
itemEntity.getUuid(),itemEntity.getType() );
+                Id itemEntityId = new SimpleId( itemEntity.getUuid(), 
itemEntity.getType() );
                 final Edge edge = createCollectionEdge( cpHeadEntity.getId(), 
collName, itemEntityId );
 
                 GraphManager gm = managerCache.getGraphManager( 
applicationScope );
@@ -528,8 +522,8 @@ public class CpRelationManager implements RelationManager {
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading entity to remove from collection " + "{}:{} 
from app {}\n", new Object[] {
-                    itemRef.getType(), itemRef.getUuid(), applicationScope
-                } );
+                itemRef.getType(), itemRef.getUuid(), applicationScope
+            } );
         }
 
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
@@ -636,38 +630,30 @@ public class CpRelationManager implements RelationManager 
{
         }
 
 
-
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
 
+        final ReadPipelineBuilder readPipelineBuilder =
+            pipelineBuilderFactory.createReadPipelineBuilder( applicationScope 
);
 
-        if ( query.isGraphSearch() ) {
-            QueryExecutor executor =
-                new CollectionGraphQueryExecutor( 
entityCollectionManagerFactory, graphManagerFactory, applicationScope,
-                    headEntity, query.getOffsetCursor(), collName, 
query.getLimit() );
-
-            return executor.next();
-        }
+        //set our fields applicable to both operations
+        readPipelineBuilder.withCursor( query.getOffsetCursor() );
+        readPipelineBuilder.withLimit( query.getLimit() );
 
+        //TODO, this should be removed when the CP relation manager is removed
+        readPipelineBuilder.setStartId( cpHeadEntity.getId() );
 
-
-        final SearchEdge searchEdge = createCollectionSearchEdge( 
cpHeadEntity.getId(), collName );
-
-        final ApplicationEntityIndex ei = managerCache.getEntityIndex( 
applicationScope );
-
-        final SearchTypes types = SearchTypes.fromTypes( collection.getType() 
);
-
-        logger.debug( "Searching scope {}", searchEdge );
-
-        final CollectionResultsLoaderFactoryImpl resultsLoaderFactory =
-            new CollectionResultsLoaderFactoryImpl( managerCache );
+        if ( query.isGraphSearch() ) {
+            readPipelineBuilder.getCollection( collName );
+        }
+        else {
+            readPipelineBuilder.getCollectionWithQuery( collName, 
query.getQl().get() );
+        }
 
 
-        //execute the query and return our next result
-        final QueryExecutor executor =
-            new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, 
applicationScope, searchEdge, types, query );
+        final Observable<Results> resultsObservable = 
readPipelineBuilder.build();
 
-        return executor.next();
+        return new ObservableQueryExecutor( resultsObservable ).next();
     }
 
 
@@ -689,10 +675,10 @@ public class CpRelationManager implements RelationManager 
{
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "createConnection(): " + "Indexing connection type 
'{}'\n   from source {}:{}]\n"
-                    + "   to target {}:{}\n   app {}", new Object[] {
-                    connectionType, headEntity.getType(), 
headEntity.getUuid(), connectedEntityRef.getType(),
-                    connectedEntityRef.getUuid(), applicationScope
-                } );
+                + "   to target {}:{}\n   app {}", new Object[] {
+                connectionType, headEntity.getType(), headEntity.getUuid(), 
connectedEntityRef.getType(),
+                connectedEntityRef.getUuid(), applicationScope
+            } );
         }
 
         Id entityId = new SimpleId( connectedEntityRef.getUuid(), 
connectedEntityRef.getType() );
@@ -778,9 +764,9 @@ public class CpRelationManager implements RelationManager {
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Deleting connection '{}' from source {}:{} \n   to 
target {}:{}", new Object[] {
-                    connectionType, connectingEntityRef.getType(), 
connectingEntityRef.getUuid(),
-                    connectedEntityRef.getType(), connectedEntityRef.getUuid()
-                } );
+                connectionType, connectingEntityRef.getType(), 
connectingEntityRef.getUuid(),
+                connectedEntityRef.getType(), connectedEntityRef.getUuid()
+            } );
         }
 
         Id entityId = new SimpleId( connectedEntityRef.getUuid(), 
connectedEntityRef.getType() );
@@ -909,34 +895,36 @@ public class CpRelationManager implements RelationManager 
{
 
         headEntity = em.validate( headEntity );
 
-        final SearchEdge indexScope = createConnectionSearchEdge( 
cpHeadEntity.getId(), connection );
 
-        final SearchTypes searchTypes = SearchTypes.fromNullableTypes( 
query.getEntityType() );
+        query = adjustQuery( query );
 
-        ApplicationEntityIndex ei = managerCache.getEntityIndex( 
applicationScope );
+        final String entityType = query.getEntityType();
 
-        logger.debug( "Searching {}", indexScope );
 
-        query = adjustQuery( query );
+        final ReadPipelineBuilder readPipelineBuilder =
+            pipelineBuilderFactory.createReadPipelineBuilder( applicationScope 
);
 
+        //set our fields applicable to both operations
+        readPipelineBuilder.withCursor( query.getOffsetCursor() );
+        readPipelineBuilder.withLimit( query.getLimit() );
 
+        //TODO, this should be removed when the CP relation manager is removed
+        readPipelineBuilder.setStartId( cpHeadEntity.getId() );
 
         if ( query.isGraphSearch() ) {
-            QueryExecutor executor =
-                new ConnectionGraphQueryExecutor( 
entityCollectionManagerFactory, graphManagerFactory, applicationScope,
-                    headEntity, query.getOffsetCursor(), connection, 
query.getLimit() );
-
-            return executor.next();
+            readPipelineBuilder.getConnection( connection );
+        }
+        else if ( entityType != null ) {
+            readPipelineBuilder.connectionWithQuery( connection, 
query.getQl().get(), entityType );
+        }
+        else {
+            readPipelineBuilder.connectionWithQuery( connection, 
query.getQl().get() );
         }
 
-        final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory =
-            new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, 
connection );
 
-        final QueryExecutor executor =
-            new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, 
applicationScope, indexScope, searchTypes,
-                query );
+        final Observable<Results> resultsObservable = 
readPipelineBuilder.build();
 
-        return executor.next();
+        return new ObservableQueryExecutor( resultsObservable ).next();
     }
 
 
@@ -1007,7 +995,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-
     /** side effect: converts headEntity into an Entity if it is an EntityRef! 
*/
     private Entity getHeadEntity() throws Exception {
         Entity entity = null;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 525678c..8f9776e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -80,22 +80,19 @@ public interface FilterFactory {
     /**
      * Generate a new instance of the command with the specified parameters
      */
-    QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector( final String collectionName, final 
String query,
-                                                                               
        final Optional<String> cursor, final int limit );
+    QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector( final String collectionName, final 
String query);
 
 
     /**
      * Generate a new instance of the command with the specified parameters
      */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName,final String 
query,
-                                                                               
        final Optional<String> cursor, final int limit );
+    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName,final String 
query);
 
 
     /**
      * Generate a new instance of the command with the specified parameters
      */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName, final 
String connectionEntityType, final String query,
-                                                                               
        final Optional<String> cursor, final int limit );
+    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName, final 
String connectionEntityType, final String query );
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 13aa3eb..d40cb12 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,13 +20,9 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import java.util.UUID;
-
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-import com.google.common.base.Optional;
-
 import rx.Observable;
 
 
@@ -37,6 +33,20 @@ import rx.Observable;
  */
 public interface ReadPipelineBuilder {
 
+
+    /**
+     * Set the cursor
+     * @param cursor
+     */
+    ReadPipelineBuilder withCursor(final String cursor);
+
+    /**
+     * Set the limit of our page sizes
+     * @param limit
+     * @return
+     */
+    ReadPipelineBuilder withLimit(final int limit);
+
     /**
      * An operation to bridge 2.0-> 1.0.  Should be removed when everyone uses 
the pipeline
      * @param id
@@ -58,8 +68,7 @@ public interface ReadPipelineBuilder {
     /**
      * Get all entities with a query
      */
-    ReadPipelineBuilder getCollectionWithQuery( final String collectionName, 
final String query, final int limit,
-                                 final Optional<String> cursor );
+    ReadPipelineBuilder getCollectionWithQuery( final String collectionName, 
final String query);
 
     /**
      * Get an entity via the connection name and entity Id
@@ -79,15 +88,13 @@ public interface ReadPipelineBuilder {
     /**
      * Get all entities in a connection with a query
      */
-    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String query, final int limit,
-                              final Optional<String> cursor );
+    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String query );
 
 
     /**
      * Get all entities in a connection with a query
      */
-    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String entityType, final String query, final int limit,
-                              final Optional<String> cursor );
+    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String entityType, final String query);
 
 
 
@@ -98,5 +105,5 @@ public interface ReadPipelineBuilder {
      * Execute final construction of the pipeline and return the results
      * @return
      */
-    Observable<Results> execute();
+    Observable<Results> build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index a44f4b8..7319f21 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -20,11 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import java.util.Collections;
-import java.util.UUID;
-
 import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
-import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -48,17 +44,41 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     private final DataPipeline pipeline;
 
     /**
-     * Our pointer to our collect filter. Set or cleared with each operation 
that's performed so the correct
-     * results are rendered
+     * Our pointer to our collect filter. Set or cleared with each operation 
that's performed so the correct results are
+     * rendered
      */
     private CollectorFilter<Results> collectorFilter;
 
+    private Optional<String> cursor;
+    private Optional<Integer> limit;
+
 
     @Inject
     public ReadPipelineBuilderImpl( final FilterFactory filterFactory,
                                     @Assisted final ApplicationScope 
applicationScope ) {
         this.filterFactory = filterFactory;
         this.pipeline = new DataPipeline( applicationScope );
+        this.cursor = Optional.absent();
+        //set the default limit
+        this.limit = Optional.absent();
+    }
+
+
+    @Override
+    public ReadPipelineBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        pipeline.setCursor( this.cursor );
+        return this;
+    }
+
+
+    @Override
+    public ReadPipelineBuilder withLimit( final int limit ) {
+        Preconditions.checkArgument( limit > 0, "You must set the limit > 0" );
+        this. limit = Optional.of( limit );
+        //set the default value
+        pipeline.setLimit( this.limit.or( 10 ) );
+        return this;
     }
 
 
@@ -97,11 +117,10 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
 
     @Override
-    public ReadPipelineBuilder getCollectionWithQuery( final String 
collectionName, final String query, final int limit,
-                                                       final Optional<String> 
cursor ) {
+    public ReadPipelineBuilder getCollectionWithQuery( final String 
collectionName, final String query ) {
 
         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = filterFactory.queryCollectionElasticSearchCollector( 
collectionName, query, cursor, limit );
+        collectorFilter = filterFactory.queryCollectionElasticSearchCollector( 
collectionName, query );
         return this;
     }
 
@@ -129,25 +148,23 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     @Override
     public ReadPipelineBuilder getConnection( final String connectionName, 
final String entityType ) {
         pipeline.withTraverseCommand( 
filterFactory.readGraphConnectionCommand( connectionName, entityType ) );
-              setEntityLoaderFilter();
+        setEntityLoaderFilter();
 
         return this;
     }
 
+
     /**
      *
      * @param connectionName
      * @param query
-     * @param limit
-     * @param cursor
      * @return
      */
     @Override
-    public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String query, final int limit,
-                                                    final Optional<String> 
cursor ) {
+    public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String query ) {
 
-         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = filterFactory.queryConnectionElasticSearchCollector( 
connectionName, query, cursor, limit );
+        //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
+        collectorFilter = filterFactory.queryConnectionElasticSearchCollector( 
connectionName, query );
 
         return this;
     }
@@ -155,26 +172,26 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String entityType,
-                                                    final String query, final 
int limit,
-                                                    final Optional<String> 
cursor ) {
+                                                    final String query ) {
 
-          //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = filterFactory.queryConnectionElasticSearchCollector( 
connectionName, entityType, query, cursor, limit );
+        //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
+        collectorFilter =
+            filterFactory.queryConnectionElasticSearchCollector( 
connectionName, entityType, query);
         return this;
     }
 
 
-
-
     @Override
-    public Observable<Results> execute() {
-        Preconditions.checkNotNull(collectorFilter, "You have not specified an 
operation that creates a collection filter.  This is required for loading 
results");
+    public Observable<Results> build() {
+        Preconditions.checkNotNull( collectorFilter,
+            "You have not specified an operation that creates a collection 
filter.  This is required for loading "
+                + "results" );
 
         return pipeline.build( collectorFilter );
     }
 
-    private void setEntityLoaderFilter(){
+
+    private void setEntityLoaderFilter() {
         collectorFilter = filterFactory.entityLoadCollector();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
deleted file mode 100644
index 663c15c..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.results;
-
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
-import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators DO 
NOT use this as a model for moving
- * forward, pandas will die.
- */
-public abstract class AbstractGraphQueryExecutor implements QueryExecutor {
-
-
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final ApplicationScope applicationScope;
-    private final Id sourceId;
-    private final int limit;
-
-    private final Optional<String> requestCursor;
-
-    private Iterator<Results> observableIterator;
-
-
-    public AbstractGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                                       final ApplicationScope 
applicationScope, final EntityRef source,
-                                       final String cursor, final int limit ) {
-
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.applicationScope = applicationScope;
-        this.limit = limit;
-        this.sourceId = new SimpleId( source.getUuid(), source.getType() );
-
-        this.requestCursor = Optional.fromNullable( cursor );
-    }
-
-
-    @Override
-    public Iterator<Results> iterator() {
-        return this;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-
-        //hasn't been set up yet, run through our first setup
-        if ( observableIterator == null ) {
-            //assign them to an iterator.  this now uses an internal buffer 
with backpressure, so we won't load all
-            // results
-            //set up our command builder
-            final DataPipeline dataPipeline = new DataPipeline( 
applicationScope, sourceId, requestCursor, limit );
-
-
-            addTraverseCommand( dataPipeline );
-
-            //construct our results to be observed later. This is a cold 
observable
-            final Observable<Results> resultsObservable =
-                dataPipeline.build( new EntityLoadCollectorFilter( 
entityCollectionManagerFactory, applicationScope ) );
-
-            this.observableIterator = 
resultsObservable.toBlocking().getIterator();
-
-            if(!observableIterator.hasNext()){
-                //no results, generate an empty one
-                this.observableIterator = Collections.singleton(new 
Results()).iterator();
-            }
-        }
-
-
-        //see if our current results are not null
-        return observableIterator.hasNext();
-    }
-
-
-    @Override
-    public Results next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more results present" );
-        }
-
-        final Results results = observableIterator.next();
-
-        //ugly and tight coupling, but we don't have a choice until we finish 
some refactoring
-        results.setQueryExecutor( this );
-
-        return results;
-    }
-
-
-    @Override
-    public void remove() {
-        throw new RuntimeException( "Remove not implemented!!" );
-    }
-
-
-    /**
-     * Add the traverse command to the graph
-     */
-    protected abstract void addTraverseCommand( final DataPipeline 
dataPipeline );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
deleted file mode 100644
index b506c71..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.results;
-
-
-import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import org.apache.usergrid.persistence.EntityRef;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * Create The collection graph query executor
- */
-public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor {
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final String collectionName;
-
-
-    public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                                         final GraphManagerFactory 
graphManagerFactory,
-                                         final ApplicationScope 
applicationScope, final EntityRef source,
-                                         final String cursor, final String 
collectionName, final int limit ) {
-
-        super( entityCollectionManagerFactory, applicationScope, source, 
cursor, limit );
-        this.graphManagerFactory = graphManagerFactory;
-
-        Preconditions.checkNotNull( collectionName, "collectionName is 
required on the query" );
-        this.collectionName = collectionName;
-    }
-
-
-    @Override
-    protected void addTraverseCommand( final DataPipeline dataPipeline ) {
-        //set the traverse command from the source Id to the connect name
-        dataPipeline.withTraverseCommand( new ReadGraphCollectionFilter( 
graphManagerFactory, collectionName ) );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
deleted file mode 100644
index 3dc5cd6..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.results;
-
-
-import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
-import org.apache.usergrid.persistence.EntityRef;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.common.base.Preconditions;
-
-
-public class ConnectionGraphQueryExecutor extends AbstractGraphQueryExecutor {
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final String connectionName;
-
-
-    public ConnectionGraphQueryExecutor( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                                         final GraphManagerFactory 
graphManagerFactory,
-                                         final ApplicationScope 
applicationScope, final EntityRef source,
-                                         final String cursor, final String 
connectionType, final int limit) {
-
-        super( entityCollectionManagerFactory, applicationScope, source, 
cursor, limit );
-        this.graphManagerFactory = graphManagerFactory;
-
-        Preconditions.checkNotNull(connectionType, "connectionType is required 
on the query" );
-        this.connectionName = connectionType;
-    }
-
-
-
-    @Override
-    protected void addTraverseCommand( final DataPipeline dataPipeline ) {
-     //set the traverse command from the source Id to the connect name
-        dataPipeline.withTraverseCommand( new ReadGraphConnectionFilter( 
graphManagerFactory, connectionName ) );
-    }
-}

Reply via email to