Refactor into pipline and filter pattern for higher level operations

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b5e60e04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b5e60e04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b5e60e04

Branch: refs/heads/two-dot-o-dev
Commit: b5e60e04fe891ff271448b611aebaa6455669f5b
Parents: 7f2a4bb
Author: Todd Nine <[email protected]>
Authored: Tue Apr 28 14:29:25 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Wed Apr 29 11:48:41 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  17 +-
 .../corepersistence/index/IndexServiceImpl.java |  45 +++-
 .../corepersistence/pipeline/DataPipeline.java  | 136 -----------
 .../corepersistence/pipeline/Pipeline.java      | 123 ++++++++++
 .../pipeline/PipelineContext.java               |  94 ++++++++
 .../pipeline/PipelineModule.java                |   9 +-
 .../pipeline/PipelineResult.java                |  57 +++++
 .../pipeline/cursor/CursorSerializerUtil.java   |   6 -
 .../pipeline/cursor/NoCursorSerializer.java     |  55 -----
 .../pipeline/cursor/RequestCursor.java          |   7 +-
 .../pipeline/cursor/ResponseCursor.java         |  15 +-
 .../pipeline/read/AbstractFilter.java           | 110 ---------
 .../read/AbstractPipelineOperation.java         |  44 ++++
 .../pipeline/read/AbstractSeekingFilter.java    | 103 ++++++++
 .../pipeline/read/CandidateResultsFilter.java   |  31 +++
 .../pipeline/read/Collector.java                |  31 +++
 .../pipeline/read/CollectorFactory.java         |  44 ++++
 .../pipeline/read/CollectorFilter.java          |  36 ---
 .../corepersistence/pipeline/read/Filter.java   |  33 +--
 .../pipeline/read/FilterFactory.java            | 112 +++++++++
 .../pipeline/read/PipelineOperation.java        |  38 +++
 .../pipeline/read/ReadFilterFactory.java        | 102 --------
 .../pipeline/read/ReadFilterFactoryImpl.java    | 234 +++++++++----------
 .../pipeline/read/ReadPipelineBuilder.java      |  30 ++-
 .../pipeline/read/ReadPipelineBuilderImpl.java  | 184 ++++++++++-----
 .../pipeline/read/ResultsPage.java              |  42 ++++
 .../pipeline/read/TraverseFilter.java           |  30 ---
 .../AbstractElasticSearchFilter.java            | 156 +++++++++++++
 ...stractQueryElasticSearchCollectorFilter.java | 132 -----------
 .../CandidateResultsEntityResultsCollector.java | 216 +++++++++++++++++
 .../CandidateResultsIdVerifyFilter.java         | 193 +++++++++++++++
 .../CollectionElasticSearchFilter.java          |  71 ++++++
 .../ConnectionElasticSearchFilter.java          |  72 ++++++
 ...yCollectionElasticSearchCollectorFilter.java |  87 -------
 ...yConnectionElasticSearchCollectorFilter.java |  91 --------
 .../impl/ElasticSearchQueryExecutor.java        |  69 +++---
 .../pipeline/read/entity/EntityIdFilter.java    |  15 +-
 .../read/entity/EntityLoadCollector.java        |  94 ++++++++
 .../read/entity/EntityLoadCollectorFilter.java  | 137 -----------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  17 +-
 .../read/graph/AbstractReadGraphFilter.java     |  14 +-
 .../graph/ReadGraphConnectionByTypeFilter.java  |  16 +-
 .../results/ObservableQueryExecutor.java        |  62 ++++-
 .../org/apache/usergrid/persistence/Query.java  | 100 ++++----
 .../corepersistence/StaleIndexCleanupTest.java  |   2 +-
 .../index/AsyncIndexServiceTest.java            |   2 +-
 .../corepersistence/index/IndexServiceTest.java |   2 +-
 .../pipeline/cursor/CursorTest.java             |   4 +-
 .../usergrid/persistence/CollectionIT.java      |  12 +-
 .../org/apache/usergrid/persistence/GeoIT.java  |   8 +-
 .../apache/usergrid/persistence/IndexIT.java    |   8 +-
 .../PerformanceEntityRebuildIndexTest.java      |   2 +-
 .../query/IntersectionTransitivePagingIT.java   |   2 +-
 .../query/IntersectionUnionPagingIT.java        |   2 +-
 .../persistence/query/IteratingQueryIT.java     |  30 +--
 .../persistence/query/NotSubPropertyIT.java     |   2 +-
 .../index/ApplicationEntityIndex.java           |   4 +-
 .../persistence/index/CandidateResults.java     |  12 +-
 .../impl/EsApplicationEntityIndexImpl.java      |  22 +-
 .../persistence/index/impl/EntityIndexTest.java |  74 +++---
 .../persistence/index/impl/GeoPagingTest.java   |   2 +-
 .../index/impl/IndexLoadTestsIT.java            |   2 +-
 62 files changed, 2087 insertions(+), 1415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index aa0056c..8c3865e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,9 @@ import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -411,7 +413,7 @@ public class CpRelationManager implements RelationManager {
             logger.debug( "Wrote edge {}", edge );
         }
 
-        indexService.queueEntityIndexUpdate( applicationScope, memberEntity );
+        indexService.queueNewEdge( applicationScope, memberEntity, edge );
 
 
         if ( logger.isDebugEnabled() ) {
@@ -628,7 +630,7 @@ public class CpRelationManager implements RelationManager {
             pipelineBuilderFactory.createReadPipelineBuilder( applicationScope 
);
 
         //set our fields applicable to both operations
-        readPipelineBuilder.withCursor( query.getOffsetCursor() );
+        readPipelineBuilder.withCursor( query.getCursor() );
         readPipelineBuilder.withLimit( query.getLimit() );
 
         //TODO, this should be removed when the CP relation manager is removed
@@ -642,7 +644,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<Results> resultsObservable = 
readPipelineBuilder.build();
+        final Observable<PipelineResult<ResultsPage>> resultsObservable = 
readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
@@ -896,7 +898,7 @@ public class CpRelationManager implements RelationManager {
             pipelineBuilderFactory.createReadPipelineBuilder( applicationScope 
);
 
         //set our fields applicable to both operations
-        readPipelineBuilder.withCursor( query.getOffsetCursor() );
+        readPipelineBuilder.withCursor( query.getCursor() );
         readPipelineBuilder.withLimit( query.getLimit() );
 
         //TODO, this should be removed when the CP relation manager is removed
@@ -905,15 +907,12 @@ public class CpRelationManager implements RelationManager 
{
         if ( query.isGraphSearch() ) {
             readPipelineBuilder.getConnection( connection );
         }
-        else if ( entityType != null ) {
-            readPipelineBuilder.connectionWithQuery( connection, 
query.getQl().get(), entityType );
-        }
         else {
-            readPipelineBuilder.connectionWithQuery( connection, 
query.getQl().get() );
+            readPipelineBuilder.connectionWithQuery( connection, 
Optional.fromNullable( entityType ), query.getQl().get() );
         }
 
 
-        final Observable<Results> resultsObservable = 
readPipelineBuilder.build();
+        final Observable<PipelineResult<ResultsPage>> resultsObservable = 
readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 71f02a8..c46542c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -33,9 +33,11 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexOperation;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -47,6 +49,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import rx.functions.Func1;
 import rx.observables.ConnectableObservable;
 
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
@@ -68,6 +71,7 @@ public class IndexServiceImpl implements IndexService {
     private final EdgesObservable edgesObservable;
     private final IndexFig indexFig;
     private final Timer indexTimer;
+    private final Timer addTimer;
 
 
     @Inject
@@ -77,7 +81,8 @@ public class IndexServiceImpl implements IndexService {
         this.entityIndexFactory = entityIndexFactory;
         this.edgesObservable = edgesObservable;
         this.indexFig = indexFig;
-        this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, 
"index.process");
+        this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, 
"index.update_all");
+        this.addTimer = metricsFactory.getTimer( IndexServiceImpl.class, 
"index.add" );
     }
 
 
@@ -128,15 +133,43 @@ public class IndexServiceImpl implements IndexService {
 
 
     @Override
-    public Observable<IndexOperationMessage> indexEdge( final ApplicationScope 
applicationScope, final Entity entity,
-                                                        final Edge edge ) {
-        throw new NotImplementedException( "Implement me" );
+    public Observable<IndexOperationMessage> indexEdge( final ApplicationScope 
applicationScope, final Entity entity, final Edge edge ) {
+
+
+
+        final Observable<IndexOperationMessage> batches =  Observable.just( 
edge ).map( observableEdge -> {
+
+            //if the node is the
+            if ( edge.getTargetNode().equals( entity.getId() ) ) {
+                return generateScopeFromSource( edge );
+            }
+
+            return generateScopeToTarget( edge );
+        } ).flatMap( indexEdge -> {
+
+            final ApplicationEntityIndex ei = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+
+            final EntityIndexBatch batch = ei.createBatch();
+
+            batch.index( indexEdge, entity );
+
+            return batch.execute();
+        } );
+
+        return ObservableTimer.time( batches, addTimer  );
+
+
     }
 
 
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final 
ApplicationScope applicationScope,
                                                               final Edge edge 
) {
+
+
+        //TODO, query ES and remove this edge
+
         throw new NotImplementedException( "Implement me" );
     }
 
@@ -144,6 +177,8 @@ public class IndexServiceImpl implements IndexService {
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final 
ApplicationScope applicationScope,
                                                                   final Id 
entityId ) {
+
+        //TODO query ES and remove this entityId
         throw new NotImplementedException( "Implement me" );
     }
 
@@ -189,4 +224,6 @@ public class IndexServiceImpl implements IndexService {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java
deleted file mode 100644
index 8463df9..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * A pipeline that will allow us to build a traversal command for execution
- *
- * See http://martinfowler.com/articles/collection-pipeline/ for some examples
- *
- * TODO: Re work the cursor and limit phases.  They need to be lazily 
evaluated, not added on build time
- */
-public class DataPipeline {
-
-
-    private final ApplicationScope applicationScope;
-    private final List<TraverseFilter> traverseFilterList;
-
-    private Optional<String> cursor;
-    private int limit;
-
-
-    private int count = 0;
-
-
-    /**
-     * Our first pass, where we implement our start point as an Id until we 
can use this to perform our entire
-     * traversal.  Eventually as we untangle the existing Query service 
nightmare, the sourceId will be remove and
-     * should only be traversed from the root application
-     */
-    public DataPipeline( final ApplicationScope applicationScope ) {
-
-        this.applicationScope = applicationScope;
-
-
-        traverseFilterList = new ArrayList<>();
-    }
-
-
-    /**
-     * Add a read command that will read Ids and produce Ids.  This is an 
intermediate traversal operations
-     */
-    public DataPipeline withTraverseCommand( final TraverseFilter 
traverseCommand ) {
-
-        this.traverseFilterList.add( traverseCommand );
-
-        return this;
-    }
-
-
-    /**
-     * Build the final collection step, and process our filters
-     */
-    public <T> Observable<T> build( final CollectorFilter<T> pipeCollector ) {
-
-        RequestCursor requestCursor = new RequestCursor( this.cursor );
-        ResponseCursor responseCursor = new ResponseCursor();
-
-        Observable<Id> traverseObservable = Observable.just( 
applicationScope.getApplication() );
-
-        //build our traversal commands
-        for ( TraverseFilter filter : traverseFilterList ) {
-            setState( filter, requestCursor, responseCursor );
-
-            traverseObservable = traverseObservable.compose( filter );
-        }
-
-
-        setState( pipeCollector, requestCursor, responseCursor );
-
-        pipeCollector.setLimit( limit );
-
-        return traverseObservable.compose( pipeCollector );
-    }
-
-
-    public void setCursor( Optional<String> cursor ) {
-        this.cursor = cursor;
-    }
-
-
-    public void setLimit( final int limit ) {
-        this.limit = limit;
-    }
-
-
-    /**
-     * Set the id of the state
-     */
-    private void setState( final Filter<?> filter, final RequestCursor 
requestCursor,
-                           final ResponseCursor responseCursor ) {
-
-        //TODO, see if we can wrap this observable in our ObservableTimer so 
we can see how long each filter takes
-
-
-        filter.setId( count );
-        //done for clarity
-        count++;
-
-        filter.setCursorCaches( requestCursor, responseCursor );
-        filter.setApplicationScope( applicationScope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
new file mode 100644
index 0000000..bc93b6c
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A pipeline that will allow us to build a traversal command for execution
+ *
+ * See http://martinfowler.com/articles/collection-pipeline/ for some examples
+ *
+ * TODO: Re work the cursor and limit phases.  They need to be lazily 
evaluated, not added on build time
+ */
+public class Pipeline<R> {
+
+
+    private final ApplicationScope applicationScope;
+    private final List<PipelineOperation> idPipelineOperationList;
+    private final Collector<?, R> collector;
+    private final RequestCursor requestCursor;
+    private final ResponseCursor responseCursor;
+
+    private final int limit;
+
+
+    private int idCount = 0;
+
+
+    /**
+     * Our first pass, where we implement our start point as an Id until we 
can use this to perform our entire
+     * traversal.  Eventually as we untangle the existing Query service 
nightmare, the sourceId will be remove and
+     * should only be traversed from the root application
+     */
+    public Pipeline( final ApplicationScope applicationScope, final 
List<PipelineOperation> pipelineOperations,
+                     final Collector<?, R> collector, final Optional<String> 
cursor, final int limit ) {
+
+        this.applicationScope = applicationScope;
+        this.idPipelineOperationList = pipelineOperations;
+        this.collector = collector;
+        this.limit = limit;
+
+        this.requestCursor = new RequestCursor( cursor );
+        this.responseCursor = new ResponseCursor();
+    }
+
+
+    /**
+     * Execute the pipline construction, returning an observable of results
+     * @return
+     */
+    public Observable<PipelineResult<R>> execute(){
+
+
+        Observable traverseObservable = Observable.just( 
applicationScope.getApplication() );
+
+        //build our traversal commands
+        for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {
+            setState( pipelineOperation );
+
+            //TODO, see if we can wrap this observable in our ObservableTimer 
so we can see how long each filter takes
+
+
+            traverseObservable = traverseObservable.compose( pipelineOperation 
);
+        }
+
+
+        setState( collector );
+
+        final Observable<R> response =  traverseObservable.compose( collector 
);
+
+
+        //append the optional cursor into the response for the caller to use
+        return response.map( result -> new PipelineResult<>( result, 
responseCursor ) );
+    }
+
+
+
+
+    /**
+     * Set the id of the state
+     */
+    private void setState( final PipelineOperation pipelineOperation ) {
+
+
+        final PipelineContext context = new PipelineContext( applicationScope, 
requestCursor, responseCursor,
+            limit, idCount );
+
+        pipelineOperation.setContext( context );
+
+        //done for clarity
+        idCount++;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
new file mode 100644
index 0000000..325f876
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Encapsulates the context of the pipeline for the scope of the filter.
+ */
+public class PipelineContext {
+
+    private final int id;
+    private final ApplicationScope applicationScope;
+    private final RequestCursor requestCursor;
+    private final ResponseCursor responseCursor;
+    private final int limit;
+
+
+    public PipelineContext( final ApplicationScope applicationScope, final 
RequestCursor requestCursor,
+                            final ResponseCursor responseCursor, final int 
limit, final int id ) {
+
+        this.applicationScope = applicationScope;
+        this.requestCursor = requestCursor;
+        this.responseCursor = responseCursor;
+        this.limit = limit;
+        this.id = id;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public int getId() {
+        return id;
+    }
+
+
+    /**
+     * Get our cursor value if present
+     * @param serializer
+     */
+    public <T extends Serializable> Optional<T> getCursor( final 
CursorSerializer<T> serializer ) {
+        final T value = requestCursor.getCursor( id, serializer );
+
+        return Optional.fromNullable( value );
+    }
+
+
+    /**
+     * Set the cursor value into our resposne
+     */
+    public <T extends Serializable> void setCursorValue( final T value, final 
CursorSerializer<T> serializer ) {
+        responseCursor.setCursor( id, value, serializer );
+    }
+
+
+    /**
+     * Get the limit for this execution
+     * @return
+     */
+    public int getLimit() {
+        return limit;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index 55b84af..3018718 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,7 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
 import 
org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl;
@@ -38,7 +39,7 @@ public class PipelineModule extends AbstractModule {
     protected void configure() {
         //Use Guice to create the builder since we don't really need to do 
anything
         //other than DI when creating the filters
-       bind( ReadFilterFactory.class ).to( ReadFilterFactoryImpl.class );
+//       bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class );
 
 
           //Use Guice to create the builder since we don't really need to do 
anything
@@ -47,9 +48,11 @@ public class PipelineModule extends AbstractModule {
                                           .build( PipelineBuilderFactory.class 
) );
 
 
+//        install( new Factory)
 
             //Use Guice to create the builder since we don't really need to do 
anything
         //other than DI when creating the filters
-//       install( new FactoryModuleBuilder().build( ReadFilterFactory.class ) 
);
+       install( new FactoryModuleBuilder().build( FilterFactory.class ) );
+        install( new FactoryModuleBuilder().build( CollectorFactory.class ));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
new file mode 100644
index 0000000..fe8604e
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Intermediate observable that will return results, as well as an optional 
cursor
+ * @param <R>
+ */
+public class PipelineResult<R> {
+
+
+    private final R result;
+
+    private final ResponseCursor responseCursor;
+
+
+    public PipelineResult( final R result, final ResponseCursor responseCursor 
) {
+        this.result = result;
+        this.responseCursor = responseCursor;
+    }
+
+
+    /**
+     * If the user requests our cursor, return the cursor
+     * @return
+     */
+    public Optional<String> getCursor(){
+        return this.responseCursor.encodeAsString();
+    }
+
+    public R getResult(){
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
index 05c1018..fea0364 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -35,15 +35,9 @@ public class CursorSerializerUtil {
 
     private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
 
-    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
-
 
     public static ObjectMapper getMapper() {
         return MAPPER;
     }
 
-
-    public static Base64Variant getBase64() {
-        return VARIANT;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
deleted file mode 100644
index 1d42df4..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.cursor;
-
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-
-/**
- * Interface for cursor serialization
- *
- * TODO, the need for this seems to indicate an issue with our object 
composition.  Refactor this away
- */
-public class NoCursorSerializer<T> implements CursorSerializer<T> {
-
-    private static final NoCursorSerializer<Object> INSTANCE = new 
NoCursorSerializer<>();
-
-
-    @Override
-    public T fromJsonNode( final JsonNode node, final ObjectMapper 
objectMapper ) {
-        return null;
-    }
-
-
-    @Override
-    public JsonNode toNode( final ObjectMapper objectMapper, final T value ) {
-        return objectMapper.createObjectNode();
-    }
-
-
-    /**
-     * convenience for type casting
-     */
-    public static <T> NoCursorSerializer<T> create() {
-        return ( NoCursorSerializer<T> ) INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index b117c21..870edbb 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -45,7 +45,6 @@ public class RequestCursor {
     private static final int MAX_CURSOR_COUNT = 100;
 
     private static final ObjectMapper MAPPER = 
CursorSerializerUtil.getMapper();
-    private static final Base64Variant VARIANT = 
CursorSerializerUtil.getBase64();
 
     private final Map<Integer, JsonNode> parsedCursor;
 
@@ -62,11 +61,17 @@ public class RequestCursor {
 
     /**
      * Get the cursor with the specified id
+     *
+     * May return null if not found
      */
     public <T> T getCursor( final int id, final CursorSerializer<T> serializer 
) {
 
         final JsonNode node = parsedCursor.get( id );
 
+        if(node == null){
+            return null;
+        }
+
         return serializer.fromJsonNode( node, MAPPER );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index e379a34..f1c8c24 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -30,6 +30,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
 
 
 /**
@@ -39,7 +40,6 @@ public class ResponseCursor {
 
 
     private static final ObjectMapper MAPPER = 
CursorSerializerUtil.getMapper();
-    private static final Base64Variant VARIANT = 
CursorSerializerUtil.getBase64();
 
     /**
      * We use a map b/c some indexes might be skipped
@@ -61,8 +61,14 @@ public class ResponseCursor {
     /**
      * now we're done, encode as a string
      */
-    public String encodeAsString() {
+    public Optional<String> encodeAsString() {
         try {
+
+            if(cursors.isEmpty()){
+                return Optional.absent();
+            }
+
+
             final ObjectNode map = MAPPER.createObjectNode();
 
             for ( Map.Entry<Integer, CursorEntry<?>> entry : 
cursors.entrySet() ) {
@@ -78,8 +84,9 @@ public class ResponseCursor {
             final byte[] output = MAPPER.writeValueAsBytes(map);
 
             //generate a base64 url save string
-            return Base64.getUrlEncoder().encodeToString( output );
-//            return MAPPER.writer( VARIANT ).writeValueAsString( map );
+            final String value = Base64.getUrlEncoder().encodeToString( output 
);
+
+            return Optional.of( value );
 
         }
         catch ( JsonProcessingException e ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
deleted file mode 100644
index 3564a79..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- */
-public abstract class AbstractFilter<T, C extends Serializable> implements 
Filter<T> {
-
-    private int id;
-    /**
-     * The cache of the cursor that was set when the read was started
-     */
-    private RequestCursor readCache;
-
-    /**
-     * The current state of the write cache.  Gets updated as we traverse the 
observables
-     */
-    private ResponseCursor writeCache;
-
-
-    /**
-     * The applicationScope
-     */
-    protected ApplicationScope applicationScope;
-
-
-    @Override
-    public void setId( final int id ) {
-        this.id = id;
-    }
-
-
-    @Override
-    public void setCursorCaches( final RequestCursor readCache, final 
ResponseCursor writeCache ) {
-        this.readCache = readCache;
-        this.writeCache = writeCache;
-    }
-
-
-    @Override
-    public void setApplicationScope( final ApplicationScope applicationScope ) 
{
-       this.applicationScope = applicationScope;
-    }
-
-
-    /**
-     * Return the parsed value of the cursor from the last request, if it 
exists
-     */
-    protected Optional<C> getCursor() {
-        final C cursor = readCache.getCursor( id, getCursorSerializer() );
-
-        return Optional.fromNullable( cursor );
-    }
-
-
-
-
-
-    /**
-     * Set the cursor value into the new cursor write cache
-     * @param newValue
-     */
-    protected void setCursor(final C newValue){
-        writeCache.setCursor( id, newValue,  getCursorSerializer() );
-    }
-
-
-    /**
-     * Generate our state as a cursor
-     * @return
-     */
-    protected String generateCursor(){
-        return writeCache.encodeAsString();
-    }
-
-    /**
-     * Return the class to be used when parsing the cursor
-     */
-    protected abstract CursorSerializer<C> getCursorSerializer();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
new file mode 100644
index 0000000..8d7f106
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractPipelineOperation<T, R> implements 
PipelineOperation<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
new file mode 100644
index 0000000..9509678
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractSeekingFilter<T, R, C extends Serializable> 
extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
+
+
+
+    //TODO not a big fan of this, but not sure how to build resume otherwise
+    private CursorSeek<C> cursorSeek;
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it 
exists
+     */
+    protected Optional<C> getSeekValue() {
+
+        if(cursorSeek == null) {
+
+            final Optional<C> cursor = pipelineContext.getCursor( 
getCursorSerializer() );
+            cursorSeek = new CursorSeek<>( cursor );
+        }
+
+        return cursorSeek.getSeekValue();
+
+    }
+
+
+    /**
+     * Sets the cursor into our pipeline context
+     * @param newValue
+     */
+    protected void setCursor(final C newValue){
+        pipelineContext.setCursorValue( newValue, getCursorSerializer() );
+    }
+
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+    /**
+     * An internal class that holds a mutable state.  When resuming, we only 
ever honor the seek value on the first call.  Afterwards, we will seek from the 
beginning on newly emitted values.
+     * Calling get will return the first value to seek, or absent if not 
specified.  Subsequent calls will return absent.  Callers should treat the 
results as seek values for each operation
+     */
+    protected static class CursorSeek<C> {
+
+        private Optional<C> seek;
+
+        private CursorSeek(final Optional<C> cursorValue){
+            seek = cursorValue;
+        }
+
+
+        /**
+         * Get the seek value to use when searching
+         * @return
+         */
+        public Optional<C> getSeekValue(){
+            final Optional<C> toReturn = seek;
+
+            seek = Optional.absent();
+
+            return toReturn;
+        }
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
new file mode 100644
index 0000000..4e6d06e
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an 
observable of ids, and emits
+ * an observable of ids
+ */
+public interface CandidateResultsFilter extends PipelineOperation<Id, 
CandidateResults> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
new file mode 100644
index 0000000..69d929c
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+/**
+ * A command that is used to reduce our stream of results into a final output
+ * @param <T>
+ */
+public interface Collector<T, R> extends PipelineOperation<T, R> {
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
new file mode 100644
index 0000000..6893b34
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
+import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+
+
+/**
+ * A factory for generating collectors
+ */
+public interface CollectorFactory {
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    EntityLoadCollector entityLoadCollector();
+
+    /**
+     * Get the collector for collection candidate results to entities
+     * @return
+     */
+    CandidateResultsEntityResultsCollector 
candidateResultsEntityResultsCollector();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java
deleted file mode 100644
index 883e910..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-/**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
- */
-public interface CollectorFilter<T> extends Filter<T> {
-
-    /**
-     * Set the prefered result size for the command
-     * @param limit
-     */
-    void setLimit( final int limit );
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index f50a2f4..ace62db 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,38 +20,11 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-import rx.Observable;
-
 
 /**
- * Interface for filtering commands.  All filters must take an observable of 
Id's as an input.  Output is then determined by subclasses.
- * This takes an input of Id, performs some operation, and emits values for 
further processing in the Observable
- * pipeline
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an 
observable of ids, and emits
+ * an observable of ids
  */
-public interface Filter<T> extends Observable.Transformer<Id, T> {
-
-
-    /**
-     * Set the id of this filter in it's execution environment
-     */
-    void setId( final int id );
-
-    /**
-     * Set the cursor cache into the filter
-     *
-     * @param readCache Set the cache that was used in the request
-     * @param writeCache Set the cache to be used when writing the results
-     */
-    void setCursorCaches( final RequestCursor readCache, final ResponseCursor 
writeCache );
-
-    /**
-     * Set the application scope of the filter
-     * @param applicationScope
-     */
-    void setApplicationScope(final ApplicationScope applicationScope);
-}
+public interface Filter<T, R> extends PipelineOperation<T, R> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
new file mode 100644
index 0000000..7a61961
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CollectionElasticSearchFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ConnectionElasticSearchFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * A factory for generating read commands
+ */
+public interface FilterFactory {
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param collectionName The collection name to use when reading the graph
+     */
+    ReadGraphCollectionFilter readGraphCollectionFilter( final String 
collectionName );
+
+    /**
+     * Read a connection between two entities, the incoming and the target 
entity
+     *
+     * @param collectionName The collection name to use when reading the edge
+     * @param targetId The target id to use when traversing the graph
+     */
+    ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String 
collectionName, final Id targetId );
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param connectionName The connection name to use when traversing the 
graph
+     */
+    ReadGraphConnectionFilter readGraphConnectionFilter( final String 
connectionName );
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param connectionName The connection name to use when traversing the 
graph
+     * @param entityType The entity type to use when traversing the graph
+     */
+    ReadGraphConnectionByTypeFilter readGraphConnectionByTypeFilter(
+        @Assisted( "connectionName" ) final String connectionName, @Assisted( 
"entityType" ) final String entityType );
+
+
+    /**
+     * Read a connection directly between two identifiers
+     * @param connectionName The connection name to use when traversing the 
graph
+     * @param targetId  The target Id to use when traversing the graph
+     */
+    ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String 
connectionName, final Id targetId );
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param query The query to use when querying the entities in the 
collection
+     * @param collectionName The collection name to use when querying
+     */
+    CollectionElasticSearchFilter collectionElasticSearchFilter( @Assisted( 
"query" ) final String query,
+                                                                 @Assisted( 
"collectionName" )
+                                                                 final String 
collectionName );
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param query The query to use when querying the entities in the 
connection
+     * @param connectionName The type of connection to query
+     * @param connectedEntityType The type of entity in the connection.  Leave 
absent to query all entity types
+     */
+    ConnectionElasticSearchFilter connectionElasticSearchFilter( @Assisted( 
"query" ) final String query,
+                                                                 @Assisted( 
"connectionName" ) final String connectionName,
+                                                                 
@Assisted("connectedEntityType") final Optional<String> connectedEntityType);
+
+
+    /**
+     * Get a candidate ids verifier for collection results.  Should be 
inserted into pipelines where a query filter is an intermediate step,
+     * not a final filter before collectors
+     */
+    CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+
+    /**
+     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not 
doing full traversals
+     * @param entityId The entity id to emit
+     */
+    EntityIdFilter getEntityIdFilter( final Id entityId );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
new file mode 100644
index 0000000..28bba36
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands.  All filters must take an observable of 
Id's as an input.  Output is then determined by subclasses.
+  * This takes an input of Id, performs some operation, and emits values for 
further processing in the Observable
+  * pipeline
+ * @param <T> The input type
+ * @param <R>
+ */
+public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> 
{
+
+    void setContext(final PipelineContext pipelineContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
deleted file mode 100644
index 92bdacb..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-
-/**
- * A factory for generating read commands
- */
-public interface ReadFilterFactory {
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphCollectionFilter readGraphCollectionCommand( final String 
collectionName );
-
-    /**
-     * Read a connection between two entities, the incoming and the target 
entity
-     * @param collectionName
-     * @param targetId
-     * @return
-     */
-    ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String 
collectionName, final Id targetId);
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphConnectionFilter readGraphConnectionCommand( final String 
connectionName );
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String 
connectionName, final String entityType );
-
-
-    /**
-     * Read a connection directly between two identifiers
-     * @param connectionName
-     * @param targetId
-     * @return
-     */
-    ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String 
connectionName, final Id targetId);
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    EntityLoadCollectorFilter entityLoadCollector();
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector( final String collectionName, final 
String query);
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName,final String 
query);
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName, final 
String connectionEntityType, final String query );
-
-
-    /**
-     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not 
doing full traversals
-     */
-    EntityIdFilter getEntityIdFilter( final Id entityId );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
index 19162bb..0f73fb9 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
@@ -20,133 +20,117 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
-import org.apache.usergrid.persistence.Query;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
 @Singleton
-public class ReadFilterFactoryImpl implements ReadFilterFactory {
-
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-
-
-    @Inject
-    public ReadFilterFactoryImpl( final GraphManagerFactory 
graphManagerFactory,
-                                  final EntityIndexFactory entityIndexFactory,
-                                  final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
-
-
-        this.graphManagerFactory = graphManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public ReadGraphCollectionFilter readGraphCollectionCommand( final String 
collectionName ) {
-        return new ReadGraphCollectionFilter( graphManagerFactory, 
collectionName );
-    }
-
-
-    @Override
-    public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final 
String collectionName,
-                                                                        final 
Id targetId ) {
-        return new ReadGraphCollectionByIdFilter( graphManagerFactory, 
collectionName, targetId );
-    }
-
-
-    @Override
-    public ReadGraphConnectionFilter readGraphConnectionCommand( final String 
connectionName ) {
-        return new ReadGraphConnectionFilter( graphManagerFactory, 
connectionName );
-    }
-
-
-    @Override
-    public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final 
String connectionName,
-                                                                       final 
String entityType ) {
-        return new ReadGraphConnectionByTypeFilter( graphManagerFactory, 
connectionName, entityType );
-    }
-
-
-    @Override
-    public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final 
String connectionName,
-                                                                        final 
Id targetId ) {
-        return new ReadGraphConnectionByIdFilter( graphManagerFactory, 
connectionName, targetId );
-    }
-
-
-    @Override
-    public EntityLoadCollectorFilter entityLoadCollector() {
-        return new EntityLoadCollectorFilter( entityCollectionManagerFactory );
-    }
-
-
-    /**
-     * TODO refactor these impls to use RX internally, as well as remove the 
query object
-     */
-    @Override
-    public QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector(
-        final String collectionName, final String query ) {
-
-        final Query queryObject = Query.fromQL( query );
-
-        final QueryCollectionElasticSearchCollectorFilter filter =
-            new QueryCollectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
-                collectionName, queryObject );
-
-        return filter;
-    }
-
-
-    @Override
-    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
-        final String connectionName, final String query ) {
-
-        final Query queryObject = Query.fromQL( query );
-
-        final QueryConnectionElasticSearchCollectorFilter filter =
-            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
-                connectionName, queryObject );
-
-        return filter;
-    }
-
-
-    @Override
-    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
-        final String connectionName, final String connectionEntityType, final 
String query ) {
-
-        final Query queryObject = Query.fromQL( query );
-        queryObject.setConnectionType( connectionEntityType );
-
-        final QueryConnectionElasticSearchCollectorFilter filter =
-            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
-                connectionName, queryObject );
-
-        return filter;
-    }
-
-
-    @Override
-    public EntityIdFilter getEntityIdFilter( final Id entityId ) {
-        return new EntityIdFilter( entityId );
-    }
+public class ReadFilterFactoryImpl { //implements ReadFilterFactory {
+
+//
+//    private final GraphManagerFactory graphManagerFactory;
+//    private final EntityIndexFactory entityIndexFactory;
+//    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+//
+//
+//    @Inject
+//    public ReadFilterFactoryImpl( final GraphManagerFactory 
graphManagerFactory,
+//                                  final EntityIndexFactory 
entityIndexFactory,
+//                                  final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+//
+//
+//        this.graphManagerFactory = graphManagerFactory;
+//        this.entityIndexFactory = entityIndexFactory;
+//        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+//    }
+//
+//
+//    @Override
+//    public ReadGraphCollectionFilter readGraphCollectionCommand( final 
String collectionName ) {
+//        return new ReadGraphCollectionFilter( graphManagerFactory, 
collectionName );
+//    }
+//
+//
+//    @Override
+//    public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( 
final String collectionName,
+//                                                                        
final Id targetId ) {
+//        return new ReadGraphCollectionByIdFilter( graphManagerFactory, 
collectionName, targetId );
+//    }
+//
+//
+//    @Override
+//    public ReadGraphConnectionFilter readGraphConnectionCommand( final 
String connectionName ) {
+//        return new ReadGraphConnectionFilter( graphManagerFactory, 
connectionName );
+//    }
+//
+//
+//    @Override
+//    public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final 
String connectionName,
+//                                                                       final 
String entityType ) {
+//        return new ReadGraphConnectionByTypeFilter( graphManagerFactory, 
connectionName, entityType );
+//    }
+//
+//
+//    @Override
+//    public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( 
final String connectionName,
+//                                                                        
final Id targetId ) {
+//        return new ReadGraphConnectionByIdFilter( graphManagerFactory, 
connectionName, targetId );
+//    }
+//
+//
+//    @Override
+//    public EntityLoadCollector entityLoadCollector() {
+//        return new EntityLoadCollector( entityCollectionManagerFactory );
+//    }
+//
+//
+//    /**
+//     * TODO refactor these impls to use RX internally, as well as remove the 
query object
+//     */
+//    @Override
+//    public QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector(
+//        final String collectionName, final String query ) {
+//
+//        final Query queryObject = Query.fromQL( query );
+//
+//        final QueryCollectionElasticSearchCollectorFilter filter =
+//            new QueryCollectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+//                collectionName, queryObject );
+//
+//        return filter;
+//    }
+//
+//
+//    @Override
+//    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
+//        final String connectionName, final String query ) {
+//
+//        final Query queryObject = Query.fromQL( query );
+//
+//        final QueryConnectionElasticSearchCollectorFilter filter =
+//            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+//                connectionName, queryObject );
+//
+//        return filter;
+//    }
+//
+//
+//    @Override
+//    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
+//        final String connectionName, final String connectionEntityType, 
final String query ) {
+//
+//        final Query queryObject = Query.fromQL( query );
+//        queryObject.setConnectionType( connectionEntityType );
+//
+//        final QueryConnectionElasticSearchCollectorFilter filter =
+//            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+//                connectionName, queryObject );
+//
+//        return filter;
+//    }
+//
+//
+//    @Override
+//    public EntityIdFilter getEntityIdFilter( final Id entityId ) {
+//        return new EntityIdFilter( entityId );
+//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 5d83dac..9da2b03 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,15 +20,21 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
+import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 
 
 /**
- * An instance of a pipeline builder for building commands.
- * Each invocation of the method will assemple the underlying pipe and 
updating it's state
+ * An instance of a pipeline builder for building commands on our read pipline
+ *
+ * Each invocation of the method will assemble the underlying pipe and 
updating it's state
+ *
  * Results are added by invoking execute.
  */
 public interface ReadPipelineBuilder {
@@ -38,14 +44,14 @@ public interface ReadPipelineBuilder {
      * Set the cursor
      * @param cursor
      */
-    ReadPipelineBuilder withCursor(final String cursor);
+    ReadPipelineBuilder withCursor(final Optional<String> cursor);
 
     /**
      * Set the limit of our page sizes
      * @param limit
      * @return
      */
-    ReadPipelineBuilder withLimit(final int limit);
+    ReadPipelineBuilder withLimit(final Optional<Integer> limit);
 
     /**
      * An operation to bridge 2.0-> 1.0.  Should be removed when everyone uses 
the pipeline
@@ -87,24 +93,14 @@ public interface ReadPipelineBuilder {
     ReadPipelineBuilder getConnection( final String connectionName, final 
String entityType );
 
     /**
-     * Get all entities in a connection with a query
-     */
-    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String query );
-
-
-    /**
      * Get all entities in a connection with a query and a target entity type
      */
-    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String entityType, final String query);
-
-
-
-
+    ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final Optional<String> entityType, final String query);
 
 
     /**
-     * Execute final construction of the pipeline and return the results
+     * Load our entity results when our previous filter calls graph
      * @return
      */
-    Observable<Results> build();
+    Observable<PipelineResult<ResultsPage>> execute();
 }

Reply via email to