Upgrades RX to latest stable Adds deleted markers for the nodes on edges
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e51d383 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e51d383 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e51d383 Branch: refs/heads/master Commit: 4e51d3839cecc9d55ea0bc4f65d5610d29b34fb9 Parents: f73ac4c Author: Todd Nine <[email protected]> Authored: Fri Oct 23 14:16:17 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Oct 23 14:16:17 2015 -0600 ---------------------------------------------------------------------- .../read/traverse/AbstractReadGraphFilter.java | 2 +- .../usergrid/persistence/graph/MarkedEdge.java | 13 +++++ .../graph/impl/GraphManagerImpl.java | 54 ++++++++++---------- .../graph/impl/SimpleMarkedEdge.java | 33 ++++++++++-- stack/corepersistence/pom.xml | 2 +- 5 files changed, 71 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 88c912a..d3e0345 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, /** * TODO, pass a message with pointers to our cursor values to be generated later */ - return graphManager.loadEdgesFromSource( search ).onBackpressureBlock() + return graphManager.loadEdgesFromSource( search ) //set the edge state for cursors .doOnNext( edge -> { logger.trace( "Seeking over edge {}", edge ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java index 4b5eeaa..da6fedb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java @@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{ */ boolean isDeleted(); + /** + * Return true if the source node is deleted + * @return + */ + boolean isSourceNodeDelete(); + + /** + * Return true if the target node is deleted + * @return + */ + boolean isTargetNodeDeleted(); + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index c1e9cea..1bcb398 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -291,7 +291,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ); + .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesVersionsTimer ); } @@ -306,7 +306,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesFromSource( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); } @@ -321,7 +321,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesToTarget( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetTimer ); @@ -337,7 +337,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer ); } @@ -352,7 +352,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer ); } @@ -420,12 +420,10 @@ public class GraphManagerImpl implements GraphManager { Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>, // Observable<MarkedEdge>> { - private final long maxVersion; private final boolean filterMarked; - private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { - this.maxVersion = maxVersion; + private EdgeBufferFilter( final boolean filterMarked ) { this.filterMarked = filterMarked; } @@ -444,23 +442,16 @@ public class GraphManagerImpl implements GraphManager { final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges ); - /** - * We aren't going to filter anything, return exactly what we're passed - */ - if(!filterMarked){ - return markedEdgeObservable; - } - //We need to filter, perform that filter final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - return markedEdgeObservable.filter( edge -> { - final long edgeTimestamp = edge.getTimestamp(); + return markedEdgeObservable.map( edge -> { - //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() ) { - return false; - } + /** + * Make sure we mark source and target deleted nodes as such + */ + + final long edgeTimestamp = edge.getTimestamp(); final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); @@ -468,22 +459,29 @@ public class GraphManagerImpl implements GraphManager { //the source Id has been marked for deletion. It's version is <= to the marked version for // deletion, // so we need to discard it - if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { - return false; - } + final boolean isSourceDeleted = ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ); final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); //the target Id has been marked for deletion. It's version is <= to the marked version for // deletion, // so we need to discard it - if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { - return false; + final boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ); + + //one has been marked for deletion, return it + if(isSourceDeleted || isTargetDeleted){ + return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted ); } + return edge; + } ).filter( simpleMarkedEdge -> { + if(!filterMarked){ + return true; + } - return true; - } ); + //if any one of these is true, we filter it + return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted()); + }); } ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java index 29d90eb..c6dc2e4 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java @@ -34,12 +34,23 @@ import com.fasterxml.jackson.annotation.JsonIgnore; public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { private final boolean deleted; + private final boolean isSourceNodeDeleted; + private final boolean isTargetNodeDeleted; public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) { - super(sourceNode, type, targetNode, timestamp); + this( sourceNode, type, targetNode, timestamp, deleted, false, false ); + } + + + public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, + final boolean deleted, final boolean isSourceNodeDeleted, + final boolean isTargetNodeDeleted ) { + super( sourceNode, type, targetNode, timestamp ); this.deleted = deleted; + this.isSourceNodeDeleted = isSourceNodeDeleted; + this.isTargetNodeDeleted = isTargetNodeDeleted; } @@ -56,6 +67,18 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { @Override + public boolean isSourceNodeDelete() { + return isSourceNodeDeleted; + } + + + @Override + public boolean isTargetNodeDeleted() { + return isTargetNodeDeleted; + } + + + @Override public boolean equals( final Object o ) { if ( this == o ) { return true; @@ -81,6 +104,8 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { public int hashCode() { int result = super.hashCode(); result = 31 * result + ( deleted ? 1 : 0 ); + result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 ); + result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 ); return result; } @@ -88,8 +113,10 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { @Override public String toString() { return "SimpleMarkedEdge{" + - "deleted=" + deleted + - "} " + super.toString(); + "deleted=" + deleted + + ", isSourceNodeDeleted=" + isSourceNodeDeleted + + ", isTargetNodeDeleted=" + isTargetNodeDeleted + + "} " + super.toString(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml index 4b47bc0..4e4648e 100644 --- a/stack/corepersistence/pom.xml +++ b/stack/corepersistence/pom.xml @@ -70,7 +70,7 @@ limitations under the License. <junit.version>4.11</junit.version> <kryo-serializers.version>0.26</kryo-serializers.version> <log4j.version>1.2.17</log4j.version> - <rx.version>1.0.12</rx.version> + <rx.version>1.0.14</rx.version> <slf4j.version>1.7.2</slf4j.version> <surefire.version>2.16</surefire.version> <aws.version>1.10.6</aws.version>
