Fixes graph cursor resume state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/118711ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/118711ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/118711ae Branch: refs/heads/two-dot-o-dev Commit: 118711aee83875f72a0b058b784218b211748d4e Parents: 413f023 Author: Todd Nine <[email protected]> Authored: Mon May 4 09:59:00 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon May 4 09:59:00 2015 -0600 ---------------------------------------------------------------------- .../read/graph/AbstractReadGraphFilter.java | 52 ++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/118711ae/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java index 503fcf9..303bc5b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.graph.Edge; @@ -62,6 +63,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, final String edgeName = getEdgeTypeName(); + final EdgeState edgeCursorState = new EdgeState(); //return all ids that are emitted from this edge @@ -80,15 +82,30 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, * TODO, pass a message with pointers to our cursor values to be generated later */ return graphManager.loadEdgesFromSource( search ) - //set our cursor every edge we traverse + //set the edge state for cursors + .doOnNext( edge -> edgeCursorState.update( edge ) ) - //map our id from the target edge - .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) ); + //map our id from the target edge and set our cursor every edge we traverse + .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(), + previousFilterValue.getPath() ) ); } ); } @Override + protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue, + final Optional<EdgePath> parent ) { + + //if it's our first pass, there's no cursor to generate + if(cursorValue == null){ + return new FilterResult<>( emit, parent ); + } + + return super.createFilterResult( emit, cursorValue, parent ); + } + + + @Override protected CursorSerializer<Edge> getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } @@ -98,4 +115,33 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, * Get the edge type name we should use when traversing */ protected abstract String getEdgeTypeName(); + + + /** + * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We + * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we + * seek to a value = to our last, we may skip data. + */ + private final class EdgeState { + + private Edge cursorEdge = null; + private Edge currentEdge = null; + + + /** + * Update the pointers + */ + private void update( final Edge newEdge ) { + cursorEdge = currentEdge; + currentEdge = newEdge; + } + + + /** + * Get the edge to use in cursors for resume + */ + private Edge getCursorEdge() { + return cursorEdge; + } + } }
