http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 7ffe957..2ec0082 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -20,9 +20,15 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
-import org.apache.usergrid.persistence.Results;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
+import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
+import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -38,60 +44,69 @@ import rx.Observable;
  */
 public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
+    private static final int DEFAULT_LIMIT = 10;
+
+    private final FilterFactory filterFactory;
+
+    private final CollectorState collectorState;
 
-    private final ReadFilterFactory readFilterFactory;
+    private final ApplicationScope applicationScope;
 
-    private final DataPipeline pipeline;
 
     /**
      * Our pointer to our collect filter. Set or cleared with each operation 
that's performed so the correct results are
      * rendered
      */
-    private CollectorFilter<Results> collectorFilter;
+    private List<Filter> filters;
+
 
     private Optional<String> cursor;
-    private Optional<Integer> limit;
+    private int limit;
 
 
     @Inject
-    public ReadPipelineBuilderImpl( final ReadFilterFactory readFilterFactory,
+    public ReadPipelineBuilderImpl( final FilterFactory filterFactory, final 
CollectorFactory collectorFactory,
                                     @Assisted final ApplicationScope 
applicationScope ) {
-        this.readFilterFactory = readFilterFactory;
+        this.filterFactory = filterFactory;
 
-        //set up our pipeline with our application scope
-        this.pipeline = new DataPipeline( applicationScope );
+        this.applicationScope = applicationScope;
 
         //init our cursor to empty
         this.cursor = Optional.absent();
 
         //set the default limit
-        this.limit = Optional.absent();
+        this.limit = DEFAULT_LIMIT;
+
+
+        this.collectorState = new CollectorState( collectorFactory );
+
+        this.filters = new ArrayList<>();
     }
 
 
     @Override
-    public ReadPipelineBuilder withCursor( final String cursor ) {
-        this.cursor = Optional.fromNullable( cursor );
-        pipeline.setCursor( this.cursor );
+    public ReadPipelineBuilder withCursor( final Optional<String> cursor ) {
+        Preconditions.checkNotNull( cursor, "cursor must not be null" );
+        this.cursor = cursor;
         return this;
     }
 
 
     @Override
-    public ReadPipelineBuilder withLimit( final int limit ) {
-        Preconditions.checkArgument( limit > 0, "You must set the limit > 0" );
-        this. limit = Optional.of( limit );
-        //set the default value
-        pipeline.setLimit( this.limit.or( 10 ) );
+    public ReadPipelineBuilder withLimit( final Optional<Integer> limit ) {
+        Preconditions.checkNotNull( limit, "limit must not be null" );
+        this.limit = limit.or( DEFAULT_LIMIT );
         return this;
     }
 
 
     @Override
     public ReadPipelineBuilder setStartId( final Id id ) {
-        pipeline.withTraverseCommand( readFilterFactory.getEntityIdFilter( id 
) );
+        ValidationUtils.verifyIdentity( id );
+
+        filters.add( filterFactory.getEntityIdFilter( id ) );
 
-        this.collectorFilter = null;
+        this.collectorState.clear();
 
 
         return this;
@@ -100,10 +115,12 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getEntityViaCollection( final String 
collectionName, final Id entityId ) {
+        Preconditions.checkNotNull( collectionName, "collectionName must not 
be null" );
+        ValidationUtils.verifyIdentity( entityId );
 
-        pipeline.withTraverseCommand( 
readFilterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
+        filters.add( filterFactory.readGraphCollectionByIdFilter( 
collectionName, entityId ) );
 
-        setEntityLoaderFilter();
+        this.collectorState.setEntityLoaderCollector();
 
         return this;
     }
@@ -111,11 +128,11 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getCollection( final String collectionName ) {
+        Preconditions.checkNotNull( collectionName, "collectionName must not 
be null" );
 
+        filters.add( filterFactory.readGraphCollectionFilter( collectionName ) 
);
 
-        pipeline.withTraverseCommand( 
readFilterFactory.readGraphCollectionCommand( collectionName ) );
-
-        setEntityLoaderFilter();
+        this.collectorState.setEntityLoaderCollector();
 
         return this;
     }
@@ -123,18 +140,26 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getCollectionWithQuery( final String 
collectionName, final String query ) {
+        Preconditions.checkNotNull( collectionName, "collectionName must not 
be null" );
+        Preconditions.checkNotNull( query, "query must not be null" );
 
         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = 
readFilterFactory.queryCollectionElasticSearchCollector( collectionName, query 
);
+
+        filters.add( filterFactory.collectionElasticSearchFilter( query, 
collectionName ) );
+
+        this.collectorState.setCandidateResultsEntityResultsCollector();
+
         return this;
     }
 
 
     @Override
     public ReadPipelineBuilder getEntityViaConnection( final String 
connectionName, final Id entityId ) {
+        Preconditions.checkNotNull( connectionName, "connectionName must not 
be null" );
+        ValidationUtils.verifyIdentity( entityId );
 
-        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
-        setEntityLoaderFilter();
+        filters.add( filterFactory.readGraphConnectionByIdFilter( 
connectionName, entityId ) );
+        collectorState.setEntityLoaderCollector();
 
         return this;
     }
@@ -142,9 +167,9 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getConnection( final String connectionName ) {
-
-        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionCommand( connectionName ) );
-        setEntityLoaderFilter();
+        Preconditions.checkNotNull( connectionName, "connectionName must not 
be null" );
+        filters.add( filterFactory.readGraphConnectionFilter( connectionName ) 
);
+        collectorState.setEntityLoaderCollector();
 
         return this;
     }
@@ -152,51 +177,98 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getConnection( final String connectionName, 
final String entityType ) {
-        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionCommand( connectionName, entityType ) );
-        setEntityLoaderFilter();
+        Preconditions.checkNotNull( connectionName, "connectionName must not 
be null" );
+        Preconditions.checkNotNull( connectionName, "entityType must not be 
null" );
+
+        filters.add( filterFactory.readGraphConnectionByTypeFilter( 
connectionName, entityType ) );
 
+        collectorState.setEntityLoaderCollector();
         return this;
     }
 
 
-    /**
-     *
-     * @param connectionName
-     * @param query
-     * @return
-     */
     @Override
-    public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String query ) {
+    public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final Optional<String> entityType,
+                                                    final String query ) {
 
-        //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = 
readFilterFactory.queryConnectionElasticSearchCollector( connectionName, query 
);
+        Preconditions.checkNotNull( connectionName, "connectionName must not 
be null" );
+        Preconditions.checkNotNull( connectionName, "entityType must not be 
null" );
+        Preconditions.checkNotNull( query, "query must not be null" );
 
+        filters.add( filterFactory.connectionElasticSearchFilter( query, 
connectionName, entityType ) );
+        collectorState.setCandidateResultsEntityResultsCollector();
         return this;
     }
 
 
     @Override
-    public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String entityType,
-                                                    final String query ) {
+    public Observable<PipelineResult<ResultsPage>> execute() {
 
-        //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter =
-            readFilterFactory.queryConnectionElasticSearchCollector( 
connectionName, entityType, query);
-        return this;
-    }
+        ValidationUtils.validateApplicationScope( applicationScope );
 
+        final Collector<?, ResultsPage> collector = 
collectorState.getCollector();
 
-    @Override
-    public Observable<Results> build() {
-        Preconditions.checkNotNull( collectorFilter,
+        Preconditions.checkNotNull( collector,
             "You have not specified an operation that creates a collection 
filter.  This is required for loading "
                 + "results" );
 
-        return pipeline.build( collectorFilter );
+
+        Preconditions.checkNotNull( cursor, "A cursor should be initialized 
even if absent" );
+
+        Preconditions.checkArgument( limit > 0, "limit must be > than 0" );
+
+
+        Pipeline pipeline = new Pipeline( applicationScope, filters, 
collector, cursor, limit );
+
+
+        return pipeline.execute();
     }
 
 
-    private void setEntityLoaderFilter() {
-        collectorFilter = readFilterFactory.entityLoadCollector();
+    /**
+     * A mutable state for our collectors.  Rather than create a new instance 
each time, we create a singleton
+     * collector
+     */
+    private static final class CollectorState {
+        private final CollectorFactory collectorFactory;
+
+        private EntityLoadCollector entityLoadCollector;
+
+        private CandidateResultsEntityResultsCollector 
candidateResultsEntityResultsCollector;
+
+
+        private Collector<?, ResultsPage> collector = null;
+
+
+        private CollectorState( final CollectorFactory collectorFactory ) 
{this.collectorFactory = collectorFactory;}
+
+
+        public void setEntityLoaderCollector() {
+            if ( entityLoadCollector == null ) {
+                entityLoadCollector = collectorFactory.entityLoadCollector();
+            }
+
+
+            collector = entityLoadCollector;
+        }
+
+
+        public void setCandidateResultsEntityResultsCollector() {
+            if ( candidateResultsEntityResultsCollector == null ) {
+                candidateResultsEntityResultsCollector = 
collectorFactory.candidateResultsEntityResultsCollector();
+            }
+
+            collector = candidateResultsEntityResultsCollector;
+        }
+
+
+        public void clear() {
+            collector = null;
+        }
+
+
+        public Collector<?, ResultsPage> getCollector() {
+            return collector;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
new file mode 100644
index 0000000..e428e7a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.util.List;
+
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * An encapsulation of entities as a group of responses.  Ordered by the 
requesting filters.  Each set should be considered a "page" of results.
+ */
+public class ResultsPage {
+
+    private final List<Entity> entityList;
+
+
+    public ResultsPage( final List<Entity> entityList ) {this.entityList = 
entityList;}
+
+
+    public List<Entity> getEntityList() {
+        return entityList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java
deleted file mode 100644
index ba7c802..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an 
observable of ids, and emits
- * an observable of ids
- */
-public interface TraverseFilter extends Filter<Id> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
new file mode 100644
index 0000000..eac8a65
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractElasticSearchFilter extends 
AbstractSeekingFilter<Id, CandidateResults, Integer>
+    implements CandidateResultsFilter {
+
+    private static final Logger log = LoggerFactory.getLogger( 
AbstractElasticSearchFilter.class );
+
+    private final EntityIndexFactory entityIndexFactory;
+    private final String query;
+    private final Timer searchTimer;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public AbstractElasticSearchFilter( final EntityIndexFactory 
entityIndexFactory,
+                                        final MetricsFactory metricsFactory, 
final String query ) {
+        this.entityIndexFactory = entityIndexFactory;
+        this.query = query;
+        this.searchTimer = metricsFactory.getTimer( 
AbstractElasticSearchFilter.class, "query" );
+    }
+
+
+    @Override
+    public Observable<CandidateResults> call( final Observable<Id> observable 
) {
+
+        //get the graph manager
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( 
pipelineContext.getApplicationScope() );
+
+
+        final int limit = pipelineContext.getLimit();
+
+
+        final SearchTypes searchTypes = getSearchTypes();
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( id -> {
+
+            final SearchEdge searchEdge = getSearchEdge( id );
+
+
+            final Observable<CandidateResults> candidates = Observable.create( 
subscriber -> {
+
+                //our offset to our start value.  This will be set the first 
time we emit
+                //after we receive new ids, we want to reset this to 0
+                //set our our constant state
+                final Optional<Integer> startFromCursor = getSeekValue();
+
+                final int startOffset = startFromCursor.or( 0 );
+
+                int currentOffSet = startOffset;
+
+                subscriber.onStart();
+
+                //emit while we have values from ES
+                while ( true ) {
+
+
+                    try {
+                        final CandidateResults candidateResults =
+                            applicationEntityIndex.search( searchEdge, 
searchTypes, query, limit, currentOffSet );
+
+                        currentOffSet += candidateResults.size();
+
+                        //set the cursor for the next value
+                        setCursor( currentOffSet );
+
+                        /**
+                         * No candidates, we're done
+                         */
+                        if ( candidateResults.size() == 0 ) {
+                            subscriber.onCompleted();
+                            return;
+                        }
+
+                        subscriber.onNext( candidateResults );
+                    }
+                    catch ( Throwable t ) {
+
+                        log.error( "Unable to search candidates", t );
+                        subscriber.onError( t );
+                    }
+                }
+            } );
+
+
+            //add a timer around our observable
+            ObservableTimer.time( candidates, searchTimer );
+
+            return candidates;
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Integer> getCursorSerializer() {
+        return ElasticsearchCursorSerializer.INSTANCE;
+    }
+
+
+    /**
+     * Get the search edge from the id
+     */
+    protected abstract SearchEdge getSearchEdge( final Id id );
+
+    /**
+     * Get the search types
+     */
+    protected abstract SearchTypes getSearchTypes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
deleted file mode 100644
index f46a78a..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * A command that will query and load elasticsearch
- *
- * On future iteration, this needs to be split into 2 commands 1 that loads 
the candidate results and validates the
- * versions then another that will search and load
- *
- * TODO, split this into 3 seperate observables
- *
- * 1) An observable that emits candidate results 2) An observable that 
validates versions by uuid (for Traverse
- * commands) 3) An observbale that emits Results as a collector (for final 
commands)
- */
-public abstract class AbstractQueryElasticSearchCollectorFilter extends 
AbstractFilter<Results, Integer>
-    implements CollectorFilter<Results> {
-
-
-    protected final EntityIndexFactory applicationEntityIndex;
-    protected final Query query;
-    private int limit;
-
-
-    @Inject
-    protected AbstractQueryElasticSearchCollectorFilter( final 
EntityIndexFactory applicationEntityIndex, final Query query ) {
-        this.applicationEntityIndex = applicationEntityIndex;
-        this.query = query;
-    }
-
-
-    @Override
-    public Observable<Results> call( final Observable<Id> idObservable ) {
-
-
-        final ApplicationEntityIndex
-            entityIndex = applicationEntityIndex.createApplicationEntityIndex( 
applicationScope );
-
-        return idObservable.flatMap( id -> {
-
-            //TODO, refactor this logic to use Observables.  make this a 
TraverseFilter and load entities with the entity loader collector
-            final ResultsLoaderFactory resultsLoaderFactory = 
getResultsLoaderFactory( id );
-            final SearchEdge searchEdge = getSearchEdge( id );
-            final SearchTypes searchTypes = getSearchTypes();
-
-
-
-            final Iterable<Results> executor =
-                new ElasticSearchQueryExecutor( resultsLoaderFactory, 
entityIndex, applicationScope,
-                    searchEdge, searchTypes, query.withLimit( limit ) );
-
-            return Observable.from( executor );
-        } );
-    }
-
-
-    /**
-     * Get the search types
-     */
-    protected abstract SearchTypes getSearchTypes();
-
-    /**
-     * Get the search edge
-     */
-    protected abstract SearchEdge getSearchEdge(final Id incomingId);
-
-
-    /**
-     * Get the results loader factor
-     */
-    protected abstract ResultsLoaderFactory getResultsLoaderFactory( final Id 
incomingId );
-
-
-    @Override
-    protected CursorSerializer<Integer> getCursorSerializer() {
-        return ElasticsearchCursorSerializer.INSTANCE;
-    }
-
-
-    @Override
-    public void setLimit( final int limit ) {
-        this.limit = limit;
-    }
-
-
-    /**
-     * Get an entiity ref from the Id.  TODO refactor this away
-     * @param id
-     * @return
-     */
-    protected EntityRef getRef(final Id id){
-        return new SimpleEntityRef( id.getType(), id.getUuid() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
new file mode 100644
index 0000000..83a4b8c
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResults object and return them as 
results
+ */
+public class CandidateResultsEntityResultsCollector extends 
AbstractPipelineOperation<CandidateResults, ResultsPage>
+    implements Collector<CandidateResults, ResultsPage> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateResultsEntityResultsCollector( final 
EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                                   final EntityIndexFactory 
entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+    public Observable<ResultsPage> call( final Observable<CandidateResults> 
observable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = 
pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final Observable<ResultsPage> searchIdSetObservable = 
observable.flatMap( candidateResults -> {
+            //flatten toa list of ids to load
+            final Observable<List<Id>> candidateIds =
+                Observable.from( candidateResults ).map( candidate -> 
candidate.getId() ).toList();
+
+            //load the ids
+            final Observable<EntitySet> entitySetObservable =
+                candidateIds.flatMap( ids -> entityCollectionManager.load( ids 
) );
+
+            //now we have a collection, validate our canidate set is correct.
+
+            return entitySetObservable
+                .map( entitySet -> new EntityCollector( 
applicationIndex.createBatch(), entitySet, candidateResults ) )
+                .doOnNext( entityCollector -> entityCollector.merge() )
+                .map( entityCollector -> entityCollector.getResults() );
+        } );
+
+        return searchIdSetObservable;
+    }
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but 
works within our operational flow as this state is mutable and difficult to 
represent functionally
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( 
EntityCollector.class );
+        private List<Entity> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final CandidateResults candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final EntitySet 
entitySet,
+                                final CandidateResults candidateResults ) {
+            this.batch = batch;
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final CandidateResult candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public ResultsPage getResults() {
+            return new ResultsPage( results );
+        }
+
+
+        public EntityIndexBatch getBatch() {
+            return batch;
+        }
+
+
+        private void validate( final CandidateResult candidateResult ) {
+
+            final Id candidateId = candidateResult.getId();
+            final UUID candidateVersion = candidateResult.getVersion();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null ) {
+                logger.warn(
+                    "Searched and received candidate with entityId {} and 
version {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue",
+                    candidateId, candidateVersion );
+
+
+                //TODO trigger an audit after a fail count where we explicitly 
try to repair from other regions
+
+                return;
+
+            }
+
+
+            final UUID entityVersion = entity.getVersion();
+
+
+            //entity is newer than ES version, could be an update or the 
entity is marked as deleted
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion 
) > 0) {
+
+                final Id entityId = entity.getId();
+                final SearchEdge searchEdge = candidateResults.getSearchEdge();
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId 
{} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record 
in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read 
repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion 
) > 0 ) {
+
+                final Id entityId = entity.getId();
+                final SearchEdge searchEdge = candidateResults.getSearchEdge();
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} 
for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, 
entityId, entityVersion } );
+
+                  //TODO trigger an audit after a fail count where we 
explicitly try to repair from other regions
+
+                return;
+            }
+
+            //they're the same add it
+
+
+            results.add( entity.getEntity().get() );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
new file mode 100644
index 0000000..bb9ab76
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids 
of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateResultsIdVerifyFilter extends 
AbstractPipelineOperation<CandidateResults, Id>
+    implements PipelineOperation<CandidateResults, Id> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateResultsIdVerifyFilter( final 
EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                           final EntityIndexFactory 
entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+
+    @Override
+    public Observable<Id> call( final Observable<CandidateResults> observable 
) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = 
pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final Observable<Id> searchIdSetObservable = observable.flatMap( 
candidateResults -> {
+            //flatten toa list of ids to load
+            final Observable<List<Id>> candidateIds =
+                Observable.from( candidateResults ).map( candidate -> 
candidate.getId() ).toList();
+
+            //load the ids
+            final Observable<VersionSet> versionSetObservable =
+                candidateIds.flatMap( ids -> 
entityCollectionManager.getLatestVersion( ids ) );
+
+            //now we have a collection, validate our canidate set is correct.
+
+            return versionSetObservable
+                .map( entitySet -> new EntityCollector( 
applicationIndex.createBatch(), entitySet, candidateResults ) )
+                .doOnNext( entityCollector -> entityCollector.merge() )
+                .flatMap( entityCollector -> Observable.from(  
entityCollector.collectResults() ) );
+        } );
+
+        return searchIdSetObservable;
+    }
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( 
EntityCollector.class );
+        private List<Id> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final CandidateResults candidateResults;
+        private final VersionSet versionSet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final VersionSet 
versionSet,
+                                final CandidateResults candidateResults ) {
+            this.batch = batch;
+            this.versionSet = versionSet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( versionSet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final CandidateResult candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<Id> collectResults() {
+            return results;
+        }
+
+
+        /**
+         * Validate each candidate results vs the data loaded from cass
+         * @param candidateResult
+         */
+        private void validate( final CandidateResult candidateResult ) {
+
+            final MvccLogEntry logEntry = versionSet.getMaxVersion( 
candidateResult.getId() );
+
+            final UUID candidateVersion = candidateResult.getVersion();
+
+            final UUID entityVersion = logEntry.getVersion();
+
+            final Id entityId = logEntry.getEntityId();
+
+            //entity is newer than ES version
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion 
) > 0 ) {
+
+                final SearchEdge searchEdge = candidateResults.getSearchEdge();
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId 
{} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record 
in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read 
repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion 
) > 0 ) {
+
+                final SearchEdge searchEdge = candidateResults.getSearchEdge();
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} 
for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, 
entityId, entityVersion } );
+            }
+
+            //they're the same add it
+
+            results.add( entityId );
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java
new file mode 100644
index 0000000..4280f6a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
+
+
+public class CollectionElasticSearchFilter extends AbstractElasticSearchFilter 
{
+
+    private  final String collectionName;
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public CollectionElasticSearchFilter( final EntityIndexFactory 
entityIndexFactory,
+                                          final MetricsFactory metricsFactory,
+                                          @Assisted("query")
+                                          final String query,
+                                          @Assisted("collectionName") final 
String collectionName ) {
+        super( entityIndexFactory, metricsFactory, query );
+        this.collectionName = collectionName;
+    }
+
+
+
+    @Override
+    protected SearchTypes getSearchTypes() {
+        final SearchTypes types = SearchTypes.fromTypes( collectionName );
+
+        return types;
+    }
+
+
+    @Override
+    protected SearchEdge getSearchEdge( final Id incomingId ) {
+        final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, 
collectionName );
+
+        return searchEdge;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java
new file mode 100644
index 0000000..ab5d233
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
+
+
+public class ConnectionElasticSearchFilter extends AbstractElasticSearchFilter 
{
+
+
+    private final String connectionName;
+    private final Optional<String> connectedEntityType;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ConnectionElasticSearchFilter( final EntityIndexFactory 
entityIndexFactory,
+                                          final MetricsFactory metricsFactory, 
@Assisted("query") final String query,
+                                          @Assisted("connectionName") final 
String connectionName,
+                                          @Assisted("connectedEntityType") 
final Optional<String> connectedEntityType ) {
+        super( entityIndexFactory, metricsFactory, query );
+
+        this.connectionName = connectionName;
+        this.connectedEntityType = connectedEntityType;
+    }
+
+
+    @Override
+    protected SearchTypes getSearchTypes() {
+        final SearchTypes searchTypes = SearchTypes.fromNullableTypes( 
connectedEntityType.orNull() );
+
+        return searchTypes;
+    }
+
+
+    @Override
+    protected SearchEdge getSearchEdge( final Id id ) {
+        final SearchEdge searchEdge = createConnectionSearchEdge( id, 
connectionName );
+
+        return searchEdge;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
deleted file mode 100644
index 4813978..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
-
-
-/**
- * Command for querying collections
- */
-public class QueryCollectionElasticSearchCollectorFilter extends 
AbstractQueryElasticSearchCollectorFilter {
-
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final String collectionName;
-
-
-    @Inject
-    public QueryCollectionElasticSearchCollectorFilter( final 
EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                                           final 
EntityIndexFactory entityIndexFactory,
-                                                           @Assisted final 
String collectionName , @Assisted final Query query ) {
-        super( entityIndexFactory, query );
-        this.entityIndexFactory = entityIndexFactory;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.collectionName = collectionName;
-    }
-
-
-    @Override
-    protected SearchTypes getSearchTypes() {
-        final SearchTypes types = SearchTypes.fromTypes( collectionName );
-
-        return types;
-    }
-
-
-    @Override
-    protected SearchEdge getSearchEdge( final Id incomingId ) {
-        final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, 
collectionName );
-
-        return searchEdge;
-    }
-
-
-    @Override
-    protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) {
-        final EntityCollectionManager entityCollectionManager = 
entityCollectionManagerFactory.createCollectionManager( applicationScope );
-        final ApplicationEntityIndex entityIndex = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final EntityRef entityRef = getRef( id );
-        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, entityIndex, entityRef,
-            collectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
deleted file mode 100644
index 2f7a6b3..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
-
-
-/**
- * Command for querying connections
- */
-public class QueryConnectionElasticSearchCollectorFilter extends 
AbstractQueryElasticSearchCollectorFilter {
-
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final String connectionName;
-
-
-    @Inject
-    public QueryConnectionElasticSearchCollectorFilter(
-        final EntityCollectionManagerFactory entityCollectionManagerFactory,
-        final EntityIndexFactory entityIndexFactory, @Assisted final String 
connectionName,
-        @Assisted final Query query ) {
-        super( entityIndexFactory, query );
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-        this.connectionName = connectionName;
-    }
-
-
-    @Override
-    protected SearchTypes getSearchTypes() {
-
-        final SearchTypes searchTypes = SearchTypes.fromNullableTypes( 
query.getEntityType() );
-
-        return searchTypes;
-    }
-
-
-    @Override
-    protected SearchEdge getSearchEdge( final Id id ) {
-        final SearchEdge searchEdge = createConnectionSearchEdge( id, 
connectionName );
-
-        return searchEdge;
-    }
-
-
-    @Override
-    protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) {
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
-        final ApplicationEntityIndex entityIndex = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final EntityRef entityRef = getRef( id );
-        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, entityIndex, entityRef,
-            connectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
index ccc5198..6e170f8 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
@@ -48,7 +48,13 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
 
     private final SearchTypes types;
 
-    private final Query query;
+    private final String query;
+
+    private final Optional<Integer> setOffsetFromCursor;
+
+    private final int limit;
+
+    private int offset;
 
 
     private Results currentResults;
@@ -56,18 +62,22 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
     private boolean moreToLoad = true;
 
 
+
+
     public ElasticSearchQueryExecutor( final ResultsLoaderFactory 
resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
                                        final ApplicationScope 
applicationScope, final SearchEdge indexScope,
-                                       final SearchTypes types, final Query 
query ) {
+                                       final SearchTypes types, final String 
query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
         this.resultsLoaderFactory = resultsLoaderFactory;
         this.applicationScope = applicationScope;
         this.entityIndex = entityIndex;
         this.indexScope = indexScope;
         this.types = types;
+        this.setOffsetFromCursor = setOffsetFromCursor;
 
         //we must deep copy the query passed.  Otherwise we will modify it's 
state with cursors.  Won't fix, not relevant
         //once we start subscribing to streams.
-        this.query = new Query(query);
+        this.query = query;
+        this.limit = limit;
     }
 
 
@@ -83,7 +93,6 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
 
         final int maxQueries = 10; // max re-queries to satisfy query limit
 
-        final int originalLimit = query.getLimit();
 
         Results results = null;
         int queryCount = 0;
@@ -91,13 +100,15 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
 
         CandidateResults crs = null;
 
+        int newLimit = limit;
+
         while ( queryCount++ < maxQueries ) {
 
-            crs = getCandidateResults( query );
+            crs = entityIndex.search( indexScope, types, query, newLimit , 
offset);
 
 
             logger.debug( "Calling build results with crs {}", crs );
-            results = buildResults( indexScope, query, crs );
+            results = buildResults( indexScope, crs );
 
             /**
              * In an edge case where we delete stale entities, we could 
potentially get less results than expected.
@@ -115,17 +126,15 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
 
             // need to query for more
             // ask for just what we need to satisfy, don't want to exceed limit
-            query.setOffsetFromCursor(results.getCursor());
-            query.setLimit( originalLimit - results.size() );
+            newLimit =  newLimit - results.size();
 
             logger.warn( "Satisfy query limit {}, new limit {} query count 
{}", new Object[] {
-                originalLimit, query.getLimit(), queryCount
+                limit, newLimit, queryCount
             } );
         }
 
         //now set our cursor if we have one for the next iteration
         if ( results.hasCursor() ) {
-            query.setOffsetFromCursor(results.getCursor());
             moreToLoad = true;
         }
 
@@ -133,48 +142,32 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
             moreToLoad = false;
         }
 
-
-        //set our select subjects into our query if provided
-        if(crs != null){
-            query.setSelectSubjects( crs.getGetFieldMappings() );
-        }
-
+//
+//        //set our select subjects into our query if provided
+//        if(crs != null){
+//            query.setSelectSubjects( crs.getGetFieldMappings() );
+//        }
+//
 
         //set our current results and the flag
         this.currentResults = results;
     }
 
 
-    /**
-     * Get the candidates or load the cursor, whichever we require
-     * @param query
-     * @return
-     */
-    private CandidateResults getCandidateResults(final Query query){
-        final Optional<Integer> cursor = query.getOffset();
-        final String queryToExecute = query.getQl().or("select *");
-
-        CandidateResults results = cursor.isPresent()
-            ? entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit() , cursor.get())
-            : entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit());
-
-        return results;
-    }
-
 
     /**
      * Build results from a set of candidates, and discard those that 
represent stale indexes.
      *
-     * @param query Query that was executed
+     * @param indexScope The index scope to execute the search on
      * @param crs Candidates to be considered for results
      */
-    private Results buildResults( final SearchEdge indexScope, final Query 
query, final CandidateResults crs ) {
+    private Results buildResults( final SearchEdge indexScope, final 
CandidateResults crs ) {
 
         logger.debug( "buildResults()  from {} candidates", crs.size() );
 
         //get an instance of our results loader
         final ResultsLoader resultsLoader =
-            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, 
query.getResultsLevel() );
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, 
Query.Level.ALL_PROPERTIES );
 
         //load the results
         final Results results = resultsLoader.loadResults(crs);
@@ -183,12 +176,6 @@ public class ElasticSearchQueryExecutor implements 
Iterable<Results>, Iterator<R
         resultsLoader.postProcess();
 
         //set offset into query
-        if(crs.getOffset().isPresent()) {
-            query.setOffset(crs.getOffset().get());
-        }else{
-            query.clearOffset();
-        }
-        results.setCursorFromOffset( query.getOffset() );
 
         logger.debug( "Returning results size {}", results.size() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
index 8ba5238..6230147 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
@@ -20,12 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.entity;
 
 
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
@@ -38,7 +34,7 @@ import rx.Observable;
  * This command is a stopgap to make migrating 1.0 code easier.  Once full 
traversal has been implemented, this should
  * be removed
  */
-public class EntityIdFilter extends AbstractFilter<Id, Serializable> 
implements TraverseFilter {
+public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> 
implements Filter<Id, Id> {
 
     private final Id entityId;
 
@@ -54,9 +50,4 @@ public class EntityIdFilter extends AbstractFilter<Id, 
Serializable> implements
         return Observable.just( entityId );
     }
 
-
-    @Override
-    protected CursorSerializer<Serializable> getCursorSerializer() {
-        return NoCursorSerializer.create();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
new file mode 100644
index 0000000..dd6b9b8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.entity;
+
+
+import java.util.List;
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
+ */
+public class EntityLoadCollector extends AbstractPipelineOperation<Id, 
ResultsPage>
+    implements Collector<Id, ResultsPage> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+
+    @Inject
+    public EntityLoadCollector( final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public Observable<ResultsPage> call( final Observable<Id> observable ) {
+
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
pipelineContext.getApplicationScope() );
+
+        final Observable<EntitySet> entitySetObservable = observable.buffer( 
pipelineContext.getLimit() ).flatMap(
+            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> 
entityCollectionManager.load( ids ) ) );
+
+
+        final Observable<ResultsPage> resultsObservable = entitySetObservable
+
+            .flatMap( entitySet -> {
+
+                //get our entites and filter missing ones, then collect them 
into a results object
+                final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
+
+
+                //convert them to our old entity model, then filter abscent, 
meaning they weren't found
+                final Observable<List<Entity>> entitiesPageObservable =
+                    mvccEntityObservable.filter( mvccEntity -> 
mvccEntity.getEntity().isPresent() )
+                                        .map( mvccEntity -> 
mvccEntity.getEntity().get() ).toList();
+
+                //convert them to a list, then map them into results
+                return entitiesPageObservable.map( entities -> new 
ResultsPage( entities ) );
+            } );
+
+
+        return resultsObservable;
+    }
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
deleted file mode 100644
index 78a9835..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
- */
-public class EntityLoadCollectorFilter extends AbstractFilter<Results, 
Serializable>
-    implements CollectorFilter<Results> {
-
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private int resultSize;
-
-
-    @Inject
-    public EntityLoadCollectorFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-      protected CursorSerializer<Serializable> getCursorSerializer() {
-          return NoCursorSerializer.create();
-      }
-
-
-    @Override
-    public Observable<Results> call( final Observable<Id> observable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
-         * objects
-         */
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
-
-        final Observable<EntitySet> entitySetObservable = observable.buffer( 
resultSize ).flatMap(
-            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> 
entityCollectionManager.load( ids ) ) );
-
-
-        final Observable<Results> resultsObservable =  entitySetObservable
-
-            .flatMap( entitySet -> {
-
-            //get our entites and filter missing ones, then collect them into 
a results object
-            final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
-
-            //convert them to our old entity model, then filter nulls, meaning 
they weren't found
-            return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter( entity -> entity != null )
-
-                //convert them to a list, then map them into results
-                .toList().map( entities -> {
-                    final Results results = Results.fromEntities( entities );
-                    results.setCursor( generateCursor() );
-
-                    return results;
-                } );
-        } );
-
-
-        return resultsObservable;
-    }
-
-                /**
-                 * Map a new cp entity to an old entity.  May be null if not 
present
-                 */
-
-
-    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
-        if ( !mvccEntity.getEntity().isPresent() ) {
-            return null;
-        }
-
-
-        final Entity cpEntity = mvccEntity.getEntity().get();
-        final Id entityId = cpEntity.getId();
-
-        org.apache.usergrid.persistence.Entity entity =
-            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
-
-        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-        entity.addProperties( entityMap );
-
-        return entity;
-    }
-
-
-    @Override
-    public void setLimit( final int limit ) {
-        this.resultSize = limit;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index 9f63bd8..e0f69cf 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,10 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -41,7 +39,8 @@ import rx.Observable;
 /**
  * Filter should take and Id and a graph edge, and ensure the connection 
between the two exists
  */
-public abstract class AbstractReadGraphEdgeByIdFilter extends 
AbstractFilter<Id, Id> implements TraverseFilter {
+public abstract class AbstractReadGraphEdgeByIdFilter extends 
AbstractPipelineOperation<Id, Id> implements
+    Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
     private final Id targetId;
@@ -56,15 +55,9 @@ public abstract class AbstractReadGraphEdgeByIdFilter 
extends AbstractFilter<Id,
 
 
     @Override
-    protected CursorSerializer<Id> getCursorSerializer() {
-        return NoCursorSerializer.create();
-    }
-
-
-    @Override
     public Observable<Id> call( final Observable<Id> idObservable ) {
 
-        final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
+        final GraphManager gm = graphManagerFactory.createEdgeManager( 
pipelineContext.getApplicationScope() );
 
         return idObservable.flatMap( id -> {
             final String edgeTypeName = getEdgeName();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index e6da9c2..4021952 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,8 @@ package 
org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +38,7 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> 
implements TraverseFilter {
+public abstract class AbstractReadGraphFilter extends 
AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
 
@@ -55,10 +55,8 @@ public abstract class AbstractReadGraphFilter extends 
AbstractFilter<Id, Edge> i
     public Observable<Id> call( final Observable<Id> observable ) {
 
         //get the graph manager
-        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
 
-        //set our our constant state
-        final Optional<Edge> startFromCursor = getCursor();
 
         final String edgeName = getEdgeTypeName();
 
@@ -66,6 +64,10 @@ public abstract class AbstractReadGraphFilter extends 
AbstractFilter<Id, Edge> i
         //return all ids that are emitted from this edge
         return observable.flatMap( id -> {
 
+                 //set our our constant state
+        final Optional<Edge> startFromCursor = getSeekValue();
+
+
             final SimpleSearchByEdgeType search =
                 new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING,
                     startFromCursor );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 65b02b6..12306fd 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -21,8 +21,8 @@ package 
org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -42,7 +42,7 @@ import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType
 /**
  * Command for reading graph edges on a connection
  */
-public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> 
implements TraverseFilter {
+public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, 
Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
     private final String connectionName;
@@ -54,7 +54,7 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
      */
     @Inject
     public ReadGraphConnectionByTypeFilter( final GraphManagerFactory 
graphManagerFactory,
-                                            @Assisted final String 
connectionName, @Assisted final String entityType ) {
+                                            @Assisted("connectionName") final 
String connectionName, @Assisted("entityType") final String entityType ) {
         this.graphManagerFactory = graphManagerFactory;
         this.connectionName = connectionName;
         this.entityType = entityType;
@@ -65,10 +65,9 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
     public Observable<Id> call( final Observable<Id> observable ) {
 
         //get the graph manager
-        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
+
 
-        //set our our constant state
-        final Optional<Edge> startFromCursor = getCursor();
 
         final String edgeName = getEdgeTypeFromConnectionType( connectionName 
);
 
@@ -76,6 +75,9 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
         //return all ids that are emitted from this edge
         return observable.flatMap( id -> {
 
+              //set our our constant state
+            final Optional<Edge> startFromCursor = getSeekValue();
+
             final SimpleSearchByIdType search =
                 new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING,
                     entityType, startFromCursor );

Reply via email to