http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
new file mode 100644
index 0000000..ccc5198
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.index.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class ElasticSearchQueryExecutor implements Iterable<Results>, 
Iterator<Results> {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
ElasticSearchQueryExecutor.class );
+
+    private final ResultsLoaderFactory resultsLoaderFactory;
+
+    private final ApplicationScope applicationScope;
+
+    private final ApplicationEntityIndex entityIndex;
+
+    private final SearchEdge indexScope;
+
+    private final SearchTypes types;
+
+    private final Query query;
+
+
+    private Results currentResults;
+
+    private boolean moreToLoad = true;
+
+
+    public ElasticSearchQueryExecutor( final ResultsLoaderFactory 
resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
+                                       final ApplicationScope 
applicationScope, final SearchEdge indexScope,
+                                       final SearchTypes types, final Query 
query ) {
+        this.resultsLoaderFactory = resultsLoaderFactory;
+        this.applicationScope = applicationScope;
+        this.entityIndex = entityIndex;
+        this.indexScope = indexScope;
+        this.types = types;
+
+        //we must deep copy the query passed.  Otherwise we will modify it's 
state with cursors.  Won't fix, not relevant
+        //once we start subscribing to streams.
+        this.query = new Query(query);
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    private void loadNextPage() {
+        // Because of possible stale entities, which are filtered out by 
buildResults(),
+        // we loop until the we've got enough results to satisfy the query 
limit.
+
+        final int maxQueries = 10; // max re-queries to satisfy query limit
+
+        final int originalLimit = query.getLimit();
+
+        Results results = null;
+        int queryCount = 0;
+
+
+        CandidateResults crs = null;
+
+        while ( queryCount++ < maxQueries ) {
+
+            crs = getCandidateResults( query );
+
+
+            logger.debug( "Calling build results with crs {}", crs );
+            results = buildResults( indexScope, query, crs );
+
+            /**
+             * In an edge case where we delete stale entities, we could 
potentially get less results than expected.
+             * This will only occur once during the repair phase.
+             * We need to ensure that we short circuit before we overflow the 
requested limit during a repair.
+             */
+            if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { 
// no results, no cursor, can't get more
+                break;
+            }
+
+
+            //we didn't load anything, but there was a cursor, this means a 
read repair occured.  We have to short
+            //circuit to avoid over returning the result set
+
+
+            // need to query for more
+            // ask for just what we need to satisfy, don't want to exceed limit
+            query.setOffsetFromCursor(results.getCursor());
+            query.setLimit( originalLimit - results.size() );
+
+            logger.warn( "Satisfy query limit {}, new limit {} query count 
{}", new Object[] {
+                originalLimit, query.getLimit(), queryCount
+            } );
+        }
+
+        //now set our cursor if we have one for the next iteration
+        if ( results.hasCursor() ) {
+            query.setOffsetFromCursor(results.getCursor());
+            moreToLoad = true;
+        }
+
+        else {
+            moreToLoad = false;
+        }
+
+
+        //set our select subjects into our query if provided
+        if(crs != null){
+            query.setSelectSubjects( crs.getGetFieldMappings() );
+        }
+
+
+        //set our current results and the flag
+        this.currentResults = results;
+    }
+
+
+    /**
+     * Get the candidates or load the cursor, whichever we require
+     * @param query
+     * @return
+     */
+    private CandidateResults getCandidateResults(final Query query){
+        final Optional<Integer> cursor = query.getOffset();
+        final String queryToExecute = query.getQl().or("select *");
+
+        CandidateResults results = cursor.isPresent()
+            ? entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit() , cursor.get())
+            : entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit());
+
+        return results;
+    }
+
+
+    /**
+     * Build results from a set of candidates, and discard those that 
represent stale indexes.
+     *
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     */
+    private Results buildResults( final SearchEdge indexScope, final Query 
query, final CandidateResults crs ) {
+
+        logger.debug( "buildResults()  from {} candidates", crs.size() );
+
+        //get an instance of our results loader
+        final ResultsLoader resultsLoader =
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, 
query.getResultsLevel() );
+
+        //load the results
+        final Results results = resultsLoader.loadResults(crs);
+
+        //signal for post processing
+        resultsLoader.postProcess();
+
+        //set offset into query
+        if(crs.getOffset().isPresent()) {
+            query.setOffset(crs.getOffset().get());
+        }else{
+            query.clearOffset();
+        }
+        results.setCursorFromOffset( query.getOffset() );
+
+        logger.debug( "Returning results size {}", results.size() );
+
+        return results;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //we've tried to load and it's empty and we have more to load, load 
the next page
+        if ( currentResults == null ) {
+            //there's nothing left to load, nothing to do
+            if ( !moreToLoad ) {
+                return false;
+            }
+
+            //load the page
+
+            loadNextPage();
+        }
+
+
+        //see if our current results are not null
+        return currentResults != null;
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results toReturn = currentResults;
+
+        currentResults = null;
+
+        return toReturn;
+    }
+
+    @Override
+    public void remove() {
+        throw new RuntimeException("Remove not implemented!!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
new file mode 100644
index 0000000..d73c731
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
@@ -0,0 +1,127 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match 
elasticsearch
+ */
+public class EntityVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
EntityVerifier.class );
+
+    private EntitySet ids;
+
+    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> 
entityMapping;
+
+
+    public EntityVerifier( final int maxSize ) {
+        this.entityMapping = new HashMap<>( maxSize );
+    }
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final 
EntityCollectionManager ecm ) {
+        ids = ecm.load( idsToLoad ).toBlocking().last();
+        logger.debug("loadResults() asked for {} ids and got {}", 
idsToLoad.size(), ids.size());
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccEntity savedEntity = ids.getEntity( entityId );
+
+        //version wasn't found deindex
+        if ( savedEntity == null ) {
+            logger.warn( "Version for Entity {}:{} not found", 
entityId.getType(), entityId.getUuid() );
+            return false;
+        }
+
+        final UUID candidateVersion = candidateResult.getVersion();
+        final UUID savedVersion = savedEntity.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 
0 ) {
+            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, 
latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), candidateVersion, 
savedEntity
+            } );
+
+            return false;
+        }
+
+
+        final Optional<org.apache.usergrid.persistence.model.entity.Entity> 
entity = savedEntity.getEntity();
+
+        if ( !entity.isPresent() ) {
+            logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, 
this is a bug ",
+                    entityId.getUuid(), savedEntity.getEntity() );
+            return false;
+        }
+
+        entityMapping.put( entityId, entity.get() );
+
+        return true;
+    }
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            final org.apache.usergrid.persistence.model.entity.Entity cpEntity 
= entityMapping.get( id );
+
+            Entity entity = EntityFactory.newEntity( id.getUuid(), 
id.getType() );
+
+            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+            entity.addProperties( entityMap );
+            ugEntities.add( entity );
+        }
+
+        return Results.fromEntities( ugEntities );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
new file mode 100644
index 0000000..ade64a2
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+
+public class FilteringLoader implements ResultsLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
FilteringLoader.class );
+
+    private final EntityCollectionManager entityCollectionManager;
+    private final ResultsVerifier resultsVerifier;
+    private final ApplicationScope applicationScope;
+    private final SearchEdge indexScope;
+    private final EntityIndexBatch indexBatch;
+
+
+    /**
+     * Create an instance of a filter loader
+     *
+     * @param entityCollectionManager The entityCollectionManagerFactory
+     * @param resultsVerifier The verifier to verify the candidate results
+     * @param applicationScope The application scope to perform the load
+     * @param indexScope The index scope used in the search
+     */
+    protected FilteringLoader( final  EntityCollectionManager 
entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex,  
final ResultsVerifier resultsVerifier,
+                               final ApplicationScope applicationScope, final 
SearchEdge indexScope ) {
+
+        this.entityCollectionManager = entityCollectionManager;
+        this.resultsVerifier = resultsVerifier;
+        this.applicationScope = applicationScope;
+        this.indexScope = indexScope;
+
+        indexBatch = applicationEntityIndex.createBatch();
+    }
+
+
+    @Override
+    public Results loadResults( final CandidateResults crs ) {
+
+
+        if ( crs.size() == 0 ) {
+            return new Results();
+        }
+
+
+        // For each entity, holds the index it appears in our candidates for 
keeping ordering correct
+        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
+
+        // Maps the entity ids to our candidates
+        final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( 
crs.size() );
+
+
+        final Iterator<CandidateResult> iter = crs.iterator();
+
+
+        // TODO, in this case we're "optimizing" due to the limitations of 
collection scope.
+        // Perhaps  we should change the API to just be an application, then 
an "owner" scope?
+
+        // Go through the candidates and group them by scope for more 
efficient retrieval.
+        // Also remove duplicates before we even make a network call
+        for ( int i = 0; iter.hasNext(); i++ ) {
+
+            final CandidateResult currentCandidate = iter.next();
+
+            final Id entityId = currentCandidate.getId();
+
+            //check if we've seen this candidate by id
+            final CandidateResult previousMax = maxCandidateMapping.get( 
entityId );
+
+            //its not been seen, save it
+            if ( previousMax == null ) {
+                maxCandidateMapping.put( entityId, currentCandidate );
+                orderIndex.put( entityId, i );
+                continue;
+            }
+
+            //we have seen it, compare them
+
+            final UUID previousMaxVersion = previousMax.getVersion();
+
+            final UUID currentVersion = currentCandidate.getVersion();
+
+
+            final CandidateResult toRemove;
+            final CandidateResult toKeep;
+
+            //current is newer than previous.  Remove previous and keep current
+            if ( UUIDComparator.staticCompare( currentVersion, 
previousMaxVersion ) > 0 ) {
+                toRemove = previousMax;
+                toKeep = currentCandidate;
+            }
+            //previously seen value is newer than current.  Remove the current 
and keep the previously seen value
+            else {
+                toRemove = currentCandidate;
+                toKeep = previousMax;
+            }
+
+            //this is a newer version, we know we already have a stale entity, 
add it to be cleaned up
+
+
+            //de-index it
+            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, 
latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), 
toRemove.getVersion(), toKeep.getVersion()
+                } );
+
+            //deindex this document, and remove the previous maxVersion
+            //we have to deindex this from our ownerId, since this is what 
gave us the reference
+            indexBatch.deindex( indexScope, toRemove );
+
+
+            //TODO, fire the entity repair cleanup task here instead of 
de-indexing
+
+            //replace the value with a more current version
+            maxCandidateMapping.put( entityId, toKeep );
+            orderIndex.put( entityId, i );
+        }
+
+
+        //now everything is ordered, and older versions are removed.  Batch 
fetch versions to verify
+        // existence and correct versions
+
+        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
+
+
+        final Collection<Id> idsToLoad =
+            Collections2.transform( maxCandidateMapping.values(), new 
Function<CandidateResult, Id>() {
+                @Nullable
+                @Override
+                public Id apply( @Nullable final CandidateResult input ) {
+                    //NOTE this is never null, we won't need to check
+                    return input.getId();
+                }
+            } );
+
+
+        //now using the scope, load the collection
+
+
+
+        //load the results into the loader for this scope for validation
+        resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
+
+        //now let the loader validate each candidate.  For instance, the "max" 
in this candidate
+        //could still be a stale result, so it needs validated
+        for ( final Id requestedId : idsToLoad ) {
+
+            final CandidateResult cr = maxCandidateMapping.get( requestedId );
+
+            //ask the loader if this is valid, if not discard it and de-index 
it
+            if ( !resultsVerifier.isValid( cr ) ) {
+                indexBatch.deindex( indexScope, cr );
+                continue;
+            }
+
+            //if we get here we're good, we need to add this to our results
+            final int candidateIndex = orderIndex.get( requestedId );
+
+            sortedResults.put( candidateIndex, requestedId );
+        }
+
+
+        // NOTE DO NOT execute the batch here.
+        // It changes the results and we need consistent paging until we 
aggregate all results
+        return resultsVerifier.getResults( sortedResults.values() );
+    }
+
+
+    @Override
+    public void postProcess() {
+        this.indexBatch.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
new file mode 100644
index 0000000..4a3bfcd
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class IdsVerifier extends VersionVerifier {
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<UUID> returnIds = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            returnIds.add( id.getUuid() );
+        }
+
+
+        return Results.fromIdList( returnIds );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
new file mode 100644
index 0000000..c2a3e9a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.CandidateResults;
+
+
+/**
+ * Interface for loading results
+ */
+public interface ResultsLoader {
+
+    /**
+     * Using the candidate results, load our results.  Should filter stale 
results
+     * @param  crs The candidate result set
+     * @return Results.  Null safe, but may be empty
+     */
+    public Results loadResults( final CandidateResults crs);
+
+    /**
+     * Post process the load operation
+     */
+    public void postProcess();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
new file mode 100644
index 0000000..3ccca1b
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Factory for creating results
+ */
+public interface ResultsLoaderFactory {
+
+    /**
+     * Get the loader for results
+     * @param applicationScope The application scope used to load results
+     * @param indexScope The index scope used in the search
+     * @param
+     */
+    ResultsLoader getLoader( final ApplicationScope applicationScope, final 
SearchEdge indexScope,
+                             final Query.Level resultsLevel );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
new file mode 100644
index 0000000..fe72ca2
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public interface ResultsVerifier {
+
+    /**
+     * Load all the candidate ides for verification
+     * @param ids The Id's to load
+     * @param ecm The entity collection manager
+     */
+    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
+
+    /**
+     * Return true if the candidate result is a valid result that should be 
retained. If it should
+     * not it should also be removed from the list of possible return values 
in this loader
+     * @param candidateResult
+     */
+    public boolean isValid(CandidateResult candidateResult);
+
+
+    /**
+     * Load the result set with the given ids
+     * @return
+     */
+    public Results getResults(Collection<Id> ids);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
new file mode 100644
index 0000000..c49fb28
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+
+/**
+ * A loader that verifies versions are correct in Cassandra and match 
ElasticSearch
+ */
+public abstract class VersionVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
VersionVerifier.class );
+
+    private VersionSet ids;
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final 
EntityCollectionManager ecm ) {
+        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccLogEntry version = ids.getMaxVersion( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found",
+                    entityId.getUuid(), entityId.getUuid() );
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, 
candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale 
v:{}, latest v:{}",
+                new Object[] {
+                    entityId.getUuid(),
+                    entityId.getType(),
+                    candidateResult.getVersion(),
+                    savedVersion
+            } );
+
+            return false;
+        }
+
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
new file mode 100644
index 0000000..5734a5b
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.entity;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full 
traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Serializable> 
implements TraverseFilter {
+
+    private final Id entityId;
+
+
+    @Inject
+    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = 
entityId;}
+
+
+    @Override
+    protected Class<Serializable> getCursorClass() {
+        //no op
+        return null;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<Id> idObservable ) {
+        return Observable.just( entityId );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
new file mode 100644
index 0000000..74f626d
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.entity;
+
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
+ */
+public class EntityLoadCollectorFilter extends AbstractFilter<Results, 
Serializable>
+    implements CollectorFilter<Results> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final ApplicationScope applicationScope;
+    private int resultSize;
+
+
+    @Inject
+    public EntityLoadCollectorFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                      final ApplicationScope applicationScope 
) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.applicationScope = applicationScope;
+    }
+
+
+    @Override
+    protected Class<Serializable> getCursorClass() {
+        return null;
+    }
+
+
+    @Override
+    public Observable<Results> call( final Observable<Id> observable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
+         * objects
+         */
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        final Observable<EntitySet> entitySetObservable = observable.buffer( 
resultSize ).flatMap(
+            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> 
entityCollectionManager.load( ids ) ) );
+
+
+        final Observable<Results> resultsObservable =  entitySetObservable
+
+            .flatMap( entitySet -> {
+
+            //get our entites and filter missing ones, then collect them into 
a results object
+            final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
+
+            //convert them to our old entity model, then filter nulls, meaning 
they weren't found
+            return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter( entity -> entity == null )
+
+                //convert them to a list, then map them into results
+                .toList().map( entities -> {
+                    final Results results = Results.fromEntities( entities );
+                    results.setCursor( generateCursor() );
+
+                    return results;
+                } )
+                //if no results are present, return an empty results
+                .singleOrDefault( new Results(  ) );
+        } );
+
+
+        return resultsObservable;
+    }
+
+                /**
+                 * Map a new cp entity to an old entity.  May be null if not 
present
+                 */
+
+
+    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
+        if ( !mvccEntity.getEntity().isPresent() ) {
+            return null;
+        }
+
+
+        final Entity cpEntity = mvccEntity.getEntity().get();
+        final Id entityId = cpEntity.getId();
+
+        org.apache.usergrid.persistence.Entity entity =
+            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+        entity.addProperties( entityMap );
+
+        return entity;
+    }
+
+
+    @Override
+    public void setLimit( final int limit ) {
+        this.resultSize = limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
new file mode 100644
index 0000000..2e8f041
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * Filter should take and Id and a graph edge, and ensure the connection 
between the two exists
+ */
+public abstract class AbstractReadGraphEdgeByIdFilter extends 
AbstractFilter<Id, Id> implements TraverseFilter {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final Id targetId;
+
+
+    @Inject
+    public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory 
graphManagerFactory, @Assisted final Id
+        targetId ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.targetId = targetId;
+    }
+
+
+    @Override
+    protected Class<Id> getCursorClass() {
+        //no op
+        return null;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<Id> idObservable ) {
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
+
+        return idObservable.flatMap( id -> {
+            final String edgeTypeName = getEdgeName();
+
+            //create our search
+            final SearchByEdge searchByEdge =
+                new SimpleSearchByEdge( id, edgeTypeName, targetId, 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    Optional.absent() );
+
+            //load the versions of the edge, take the first since that's all 
we need to validate existence, then emit the target node
+            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> 
edge.getTargetNode() );
+        } );
+    }
+
+
+    /**
+     * Get the name of the edge to be used in the seek
+     */
+    protected abstract String getEdgeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
new file mode 100644
index 0000000..4bdcdad
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge>
+    implements TraverseFilter {
+
+    private final GraphManagerFactory graphManagerFactory;
+
+
+    /**
+     * Create a new instance of our command
+     * @param graphManagerFactory
+     */
+    public AbstractReadGraphFilter( final GraphManagerFactory 
graphManagerFactory ) {
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<Id> observable ) {
+
+        //get the graph manager
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //set our our constant state
+        final Optional<Edge> startFromCursor = getCursor();
+
+        final String edgeName = getEdgeTypeName();
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( new Func1<Id, Observable<Id>>() {
+
+            @Override
+            public Observable<Id> call( final Id id ) {
+
+                final SimpleSearchByEdgeType search = new 
SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE,
+                    SearchByEdgeType.Order.DESCENDING, startFromCursor   );
+
+                /**
+                 * TODO, pass a message with pointers to our cursor values to 
be generated later
+                 */
+                return graphManager.loadEdgesFromSource( search ).doOnNext( 
edge -> setCursor( edge ) ).map(
+                    edge -> edge.getTargetNode() );
+            }
+        } );
+    }
+
+
+    @Override
+    protected Class<Edge> getCursorClass() {
+        return Edge.class;
+    }
+
+
+
+    /**
+     * Get the edge type name we should use when traversing
+     * @return
+     */
+    protected abstract String getEdgeTypeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
new file mode 100644
index 0000000..2ccad67
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphCollectionByIdFilter extends 
AbstractReadGraphEdgeByIdFilter{
+
+    private final String collectionName;
+
+    public ReadGraphCollectionByIdFilter( final GraphManagerFactory 
graphManagerFactory, @Assisted final String collectionName, @Assisted final Id 
targetId ) {
+        super( graphManagerFactory, targetId );
+        this.collectionName = collectionName;
+    }
+
+
+    @Override
+    protected String getEdgeName() {
+        return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
new file mode 100644
index 0000000..3f0a70a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+
+
+/**
+ * Command for reading graph edges on a collection
+ */
+public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
+
+    private final String collectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphCollectionFilter( final GraphManagerFactory 
graphManagerFactory, @Assisted final String collectionName ) {
+        super( graphManagerFactory );
+        this.collectionName = collectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getCollectionScopeNameFromCollectionName( collectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
new file mode 100644
index 0000000..f16867d
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * Read an edge in the graph to verify it's existence by id
+ */
+public class ReadGraphConnectionByIdFilter extends 
AbstractReadGraphEdgeByIdFilter{
+
+    private final String connectionName;
+
+    public ReadGraphConnectionByIdFilter( final GraphManagerFactory 
graphManagerFactory,
+                                          @Assisted final String 
connectionName, @Assisted final Id targetId ) {
+        super( graphManagerFactory, targetId );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeName() {
+        return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
new file mode 100644
index 0000000..3057111
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> 
implements TraverseFilter {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final String connectionName;
+    private final String entityType;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public ReadGraphConnectionByTypeFilter( final GraphManagerFactory 
graphManagerFactory,
+                                            @Assisted final String 
connectionName, @Assisted final String entityType ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.connectionName = connectionName;
+        this.entityType = entityType;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<Id> observable ) {
+
+        //get the graph manager
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //set our our constant state
+        final Optional<Edge> startFromCursor = getCursor();
+
+        final String edgeName = getConnectionScopeName( connectionName );
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( id -> {
+
+            final SimpleSearchByIdType search =
+                new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING,
+                    entityType, startFromCursor );
+
+            /**
+             * TODO, pass a message with pointers to our cursor values to be 
generated later
+             */
+            return graphManager.loadEdgesFromSourceByType( search ).doOnNext( 
edge -> setCursor( edge ) ).map(
+                edge -> edge.getTargetNode() );
+        } );
+    }
+
+
+    @Override
+    protected Class<Edge> getCursorClass() {
+        return Edge.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
new file mode 100644
index 0000000..49360f6
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
+
+    private final String connectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphConnectionFilter( final GraphManagerFactory 
graphManagerFactory, @Assisted final String connectionName ) {
+        super( graphManagerFactory );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getConnectionScopeName( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
index ec4271a..663c15c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
@@ -24,8 +24,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.apache.usergrid.corepersistence.command.CommandBuilder;
-import 
org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
+import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Results;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -82,14 +82,14 @@ public abstract class AbstractGraphQueryExecutor implements 
QueryExecutor {
             //assign them to an iterator.  this now uses an internal buffer 
with backpressure, so we won't load all
             // results
             //set up our command builder
-            final CommandBuilder commandBuilder = new CommandBuilder( 
applicationScope, sourceId, requestCursor, limit );
+            final DataPipeline dataPipeline = new DataPipeline( 
applicationScope, sourceId, requestCursor, limit );
 
 
-            addTraverseCommand( commandBuilder );
+            addTraverseCommand( dataPipeline );
 
             //construct our results to be observed later. This is a cold 
observable
             final Observable<Results> resultsObservable =
-                commandBuilder.build( new EntityLoadCollector( 
entityCollectionManagerFactory, applicationScope ) );
+                dataPipeline.build( new EntityLoadCollectorFilter( 
entityCollectionManagerFactory, applicationScope ) );
 
             this.observableIterator = 
resultsObservable.toBlocking().getIterator();
 
@@ -129,5 +129,5 @@ public abstract class AbstractGraphQueryExecutor implements 
QueryExecutor {
     /**
      * Add the traverse command to the graph
      */
-    protected abstract void addTraverseCommand( final CommandBuilder 
commandBuilder );
+    protected abstract void addTraverseCommand( final DataPipeline 
dataPipeline );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
index 515fc2b..b506c71 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java
@@ -20,8 +20,8 @@
 package org.apache.usergrid.corepersistence.results;
 
 
-import org.apache.usergrid.corepersistence.command.CommandBuilder;
-import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand;
+import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
 import org.apache.usergrid.persistence.EntityRef;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -53,8 +53,8 @@ public class CollectionGraphQueryExecutor extends 
AbstractGraphQueryExecutor {
 
 
     @Override
-    protected void addTraverseCommand( final CommandBuilder commandBuilder ) {
+    protected void addTraverseCommand( final DataPipeline dataPipeline ) {
         //set the traverse command from the source Id to the connect name
-        commandBuilder.withTraverseCommand( new ReadGraphCollectionCommand( 
graphManagerFactory, collectionName ) );
+        dataPipeline.withTraverseCommand( new ReadGraphCollectionFilter( 
graphManagerFactory, collectionName ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
index 61726b7..3dc5cd6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java
@@ -20,8 +20,8 @@
 package org.apache.usergrid.corepersistence.results;
 
 
-import org.apache.usergrid.corepersistence.command.CommandBuilder;
-import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand;
+import org.apache.usergrid.corepersistence.pipeline.DataPipeline;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
 import org.apache.usergrid.persistence.EntityRef;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -51,8 +51,8 @@ public class ConnectionGraphQueryExecutor extends 
AbstractGraphQueryExecutor {
 
 
     @Override
-    protected void addTraverseCommand( final CommandBuilder commandBuilder ) {
+    protected void addTraverseCommand( final DataPipeline dataPipeline ) {
      //set the traverse command from the source Id to the connect name
-        commandBuilder.withTraverseCommand( new ReadGraphConnectionCommand( 
graphManagerFactory, connectionName ) );
+        dataPipeline.withTraverseCommand( new ReadGraphConnectionFilter( 
graphManagerFactory, connectionName ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index d40efc0..99c0cdb 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -53,7 +53,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
      * @param maxTimestamp The maximum timestamp to seek from
      * @param last The value to start seeking from.  Must be >= this value
      */
-    public SimpleSearchByEdge( final Id sourceNode, final String type, final 
Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, 
final Edge last ) {
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final 
Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, 
final Optional<Edge> last ) {
 
         ValidationUtils.verifyIdentity(sourceNode);
         ValidationUtils.verifyIdentity(targetNode);
@@ -67,7 +67,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
         this.type = type;
         this.maxTimestamp = maxTimestamp;
         this.order = order;
-        this.last = Optional.fromNullable(last);
+        this.last = last;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
index 4b73347..9d989a3 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 
 /**
  *
@@ -44,7 +46,7 @@ public class SimpleSearchByIdType extends 
SimpleSearchByEdgeType implements Sear
      * @param last The value to start seeking from.  Must be >= this value
 
      */
-    public SimpleSearchByIdType( final Id node, final String type, final long 
maxTimestamp, final Order order, final String idType, final Edge last  ) {
+    public SimpleSearchByIdType( final Id node, final String type, final long 
maxTimestamp, final Order order, final String idType, final Optional<Edge> last 
 ) {
         super( node, type, maxTimestamp, order, last );
 
         ValidationUtils.verifyString( idType, "idType" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0523dc03/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index cc0bb51..c81fa58 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -37,6 +37,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 
 /**
  * Simple class for edge testing generation
@@ -190,7 +192,8 @@ public class EdgeTestUtils {
      */
     public static SearchByEdge createGetByEdge( final Id sourceId, final 
String type, final Id targetId,
                                                 final long maxVersion, final 
Edge last ) {
-        return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, 
SearchByEdgeType.Order.DESCENDING, last );
+        return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, 
SearchByEdgeType.Order.DESCENDING, Optional
+            .of( last ) );
     }
 
 //

Reply via email to