Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-614 99cb70052 -> 59ad6a989
Migrated filter to a transform operation Updated tests Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59ad6a98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59ad6a98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59ad6a98 Branch: refs/heads/USERGRID-614 Commit: 59ad6a9897fcdaf31c048a46426873b42915e4b7 Parents: 99cb700 Author: Todd Nine <[email protected]> Authored: Sun May 10 04:58:24 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Sun May 10 04:58:24 2015 -0600 ---------------------------------------------------------------------- .../graph/impl/GraphManagerImpl.java | 144 ++-- .../graph/impl/SimpleSearchByIdType.java | 8 + .../persistence/graph/GraphManagerIT.java | 664 ++++++++++++++++--- .../graph/test/util/EdgeTestUtils.java | 27 + 4 files changed, 672 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 06cb5a1..e4ce4fd 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -51,7 +51,6 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -60,7 +59,6 @@ import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import rx.Observable; -import rx.functions.Func1; /** @@ -293,7 +291,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); } } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ); + .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesVersionsTimer ); } @@ -308,7 +306,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesFromSource( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); } @@ -323,7 +321,7 @@ public class GraphManagerImpl implements GraphManager { return storageEdgeSerialization.getEdgesToTarget( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetTimer ); @@ -334,14 +332,14 @@ public class GraphManagerImpl implements GraphManager { public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { final Observable<Edge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); - return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); + return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer ); } @@ -349,12 +347,12 @@ public class GraphManagerImpl implements GraphManager { public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { final Observable<Edge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer ); } @@ -362,13 +360,13 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) { - final Observable<String> edgeTypes = Observable.create( - new ObservableIterator<String>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); - } - } ); + final Observable<String> edgeTypes = + Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); + } + } ); return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer ); } @@ -376,12 +374,13 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<String> getIdTypesFromSource( final SearchIdType search ) { - final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); - } - } ); + final Observable<String> edgeTypes = + Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); + } + } ); return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer ); } @@ -389,22 +388,21 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) { - final Observable<String> edgeTypes = Observable.create( - new ObservableIterator<String>( "getEdgeTypesToTarget" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); - } - } ); + final Observable<String> edgeTypes = + Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); + } + } ); - return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer ); + return ObservableTimer.time( edgeTypes, getEdgeTypesToTargetTimer ); } @Override public Observable<String> getIdTypesToTarget( final SearchIdType search ) { - final Observable<String> edgeTypes = Observable.create( - new ObservableIterator<String>( "getIdTypesToTarget" ) { + final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) { @Override protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); @@ -418,7 +416,9 @@ public class GraphManagerImpl implements GraphManager { /** * Helper filter to perform mapping and return an observable of pre-filtered edges */ - private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> { + private class EdgeBufferFilter implements + Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>, + // Observable<MarkedEdge>> { private final long maxVersion; private final boolean filterMarked; @@ -430,46 +430,60 @@ public class GraphManagerImpl implements GraphManager { } + @Override + /** * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then * used in conjunction with the max version filter to filter any edges that should not be returned * * @return An observable that emits only edges that can be consumed. There could be multiple versions of the * same edge so those need de-duped. - */ - @Override - public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) { + */ public Observable<MarkedEdge> call( final Observable<List<MarkedEdge>> markedEdgesObservable ) { - final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - final long maxTimestamp = maxVersion; + return markedEdgesObservable.flatMap( markedEdges -> { - return Observable.from( markedEdges ).filter( edge -> { - final long edgeTimestamp = edge.getTimestamp(); + final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges ); - //our edge needs to not be deleted and have a version that's > max Version - if (( edge.isDeleted() && filterMarked) || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { - return false; + /** + * We aren't going to filter anything, return exactly what we're passed + */ + if(!filterMarked){ + return markedEdgeObservable; } + //We need to filter, perform that filter + final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); + return markedEdgeObservable.filter( edge -> { + final long edgeTimestamp = edge.getTimestamp(); - //the source Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { - return false; - } + //our edge needs to not be deleted and have a version that's > max Version + if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxVersion ) > 0 ) { + return false; + } - final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); - //the target Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { - return false; - } + final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); + + //the source Id has been marked for deletion. It's version is <= to the marked version for + // deletion, + // so we need to discard it + if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { + return false; + } + + final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); + + //the target Id has been marked for deletion. It's version is <= to the marked version for + // deletion, + // so we need to discard it + if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { + return false; + } - return true; + return true; + } ); } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java index 9d989a3..bcbff0a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java @@ -54,6 +54,14 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear } + public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType, + final Optional<Edge> last, final boolean filterMarked ) { + super( node, type, maxTimestamp, order, last, filterMarked ); + ValidationUtils.verifyString( idType, "idType" ); + this.idType = idType; + } + + @Override public String getIdType() { return idType; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index 75f61f0..e512608 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -43,18 +43,21 @@ import com.google.inject.Inject; import rx.Observable; +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createGetByEdge; -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId; +import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndIdUnfiltered; +import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeUnfiltered; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -@RunWith(ITRunner.class) -@UseModules({ TestGraphModule.class }) + +@RunWith( ITRunner.class ) +@UseModules( { TestGraphModule.class } ) public class GraphManagerIT { @@ -69,18 +72,16 @@ public class GraphManagerIT { protected ApplicationScope scope; - - @Before public void mockApp() { - this.scope = new ApplicationScopeImpl(createId("application") ); + this.scope = new ApplicationScopeImpl( createId( "application" ) ); } @Test public void testWriteReadEdgeTypeSource() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -112,7 +113,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeTarget() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -144,7 +145,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 1000l; @@ -178,7 +179,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -216,7 +217,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionSourceDistinct() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -241,7 +242,7 @@ public class GraphManagerIT { //now test retrieving it, we should only get edge3, since it's the latest SearchByEdgeType search = - createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); + createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); Observable<Edge> edges = gm.loadEdgesFromSource( search ); @@ -288,7 +289,7 @@ public class GraphManagerIT { public void testWriteReadEdgeTypeVersionTargetDistinct() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -314,7 +315,7 @@ public class GraphManagerIT { //now test retrieving it, we should only get edge3, since it's the latest SearchByEdgeType search = - createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); + createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); Observable<Edge> edges = gm.loadEdgesToTarget( search ); @@ -360,7 +361,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypePagingSource() throws TimeoutException, InterruptedException { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final Id sourceId = IdGenerator.createId( "source" ); @@ -380,7 +381,7 @@ public class GraphManagerIT { //now test retrieving it SearchByEdgeType search = - createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); + createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); Observable<Edge> edges = gm.loadEdgesFromSource( search ); @@ -415,7 +416,7 @@ public class GraphManagerIT { public void testWriteReadEdgeTypePagingTarget() { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); final Id targetId = IdGenerator.createId( "target" ); @@ -436,7 +437,7 @@ public class GraphManagerIT { //now test retrieving it SearchByEdgeType search = - createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); + createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); Observable<Edge> edges = gm.loadEdgesToTarget( search ); @@ -470,7 +471,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeTargetTypeSource() { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -480,7 +481,7 @@ public class GraphManagerIT { //now test retrieving it SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), - edge.getTargetNode().getType(), null ); + edge.getTargetNode().getType(), null ); Observable<Edge> edges = gm.loadEdgesFromSourceByType( search ); @@ -492,7 +493,7 @@ public class GraphManagerIT { //change edge type to be invalid, shouldn't get a result search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), - edge.getTargetNode().getType() + "invalid", null ); + edge.getTargetNode().getType() + "invalid", null ); edges = gm.loadEdgesFromSourceByType( search ); @@ -506,7 +507,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypeTargetTypeTarget() { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); ; @@ -517,7 +518,7 @@ public class GraphManagerIT { //now test retrieving it SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), - edge.getSourceNode().getType(), null ); + edge.getSourceNode().getType(), null ); Observable<Edge> edges = gm.loadEdgesToTargetByType( search ); @@ -529,7 +530,7 @@ public class GraphManagerIT { //change edge type to be invalid, shouldn't get a result search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), - edge.getSourceNode().getType() + "invalid", null ); + edge.getSourceNode().getType() + "invalid", null ); edges = gm.loadEdgesToTargetByType( search ); @@ -543,7 +544,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeDeleteSource() { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -563,7 +564,7 @@ public class GraphManagerIT { assertEquals( "Correct edge returned", edge, returned ); SearchByIdType searchById = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), - edge.getTargetNode().getType(), null ); + edge.getTargetNode().getType(), null ); edges = gm.loadEdgesFromSourceByType( searchById ); @@ -573,8 +574,7 @@ public class GraphManagerIT { assertEquals( "Correct edge returned", edge, returned ); final SearchByEdge searchByEdge = - createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), - null ); + createGetByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), null ); returned = gm.loadEdgeVersions( searchByEdge ).toBlocking().single(); @@ -613,7 +613,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeDeleteTarget() { - GraphManager gm = emf.createEdgeManager( scope ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -633,7 +633,7 @@ public class GraphManagerIT { assertEquals( "Correct edge returned", edge, returned ); SearchByIdType searchById = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), - edge.getSourceNode().getType(), null ); + edge.getSourceNode().getType(), null ); edges = gm.loadEdgesToTargetByType( searchById ); @@ -669,7 +669,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypes() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -691,7 +691,7 @@ public class GraphManagerIT { //get our 2 edge types Observable<String> edges = - gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) ); + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, null ) ); Iterator<String> results = edges.toBlocking().getIterator(); @@ -718,8 +718,8 @@ public class GraphManagerIT { assertFalse( "No results", results.hasNext() ); //now get types for test2 - edges = gm.getIdTypesFromSource( - new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) ); + edges = + gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null, null ) ); results = edges.toBlocking().getIterator(); @@ -732,7 +732,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypes() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -797,7 +797,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypesPaging() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -861,7 +861,7 @@ public class GraphManagerIT { //now get the next page edges = gm.getIdTypesFromSource( - new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) ); + new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null, targetId1.getType() ) ); results = edges.toBlocking().getIterator(); @@ -875,7 +875,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypesPaging() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -942,7 +942,7 @@ public class GraphManagerIT { //now get the next page edges = gm.getIdTypesToTarget( - new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) ); + new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null, sourceId1.getType() ) ); results = edges.toBlocking().getIterator(); @@ -956,7 +956,7 @@ public class GraphManagerIT { @Test public void testMarkSourceEdges() throws InterruptedException { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -966,11 +966,11 @@ public class GraphManagerIT { long startTime = System.currentTimeMillis(); long edge1Time = startTime; - long edge2Time = edge1Time+1; + long edge2Time = edge1Time + 1; - final long maxVersion= edge2Time; + final long maxVersion = edge2Time; - Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time); + Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time ); gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null ); @@ -979,21 +979,18 @@ public class GraphManagerIT { gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null ); - assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 ); assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 ); //get our 2 edges - Observable<Edge> edges = gm.loadEdgesFromSource( - createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + Observable<Edge> edges = + gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); Iterator<Edge> results = edges.toBlocking().getIterator(); - System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); - assertEquals( "Edges correct", edge2, results.next() ); assertEquals( "Edges correct", edge1, results.next() ); @@ -1002,15 +999,12 @@ public class GraphManagerIT { //now delete one of the edges - System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); gm.markEdge( edge1 ).toBlocking().last(); - System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); - - edges = gm.loadEdgesFromSource( - createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + edges = + gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); results = edges.toBlocking().getIterator(); @@ -1020,16 +1014,14 @@ public class GraphManagerIT { assertFalse( "No more edges", results.hasNext() ); - System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); //now delete one of the edges gm.markEdge( edge2 ).toBlocking().last(); - System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); - edges = gm.loadEdgesFromSource( - createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + edges = + gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); results = edges.toBlocking().getIterator(); @@ -1045,7 +1037,7 @@ public class GraphManagerIT { @Test public void testMarkTargetEdges() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1065,7 +1057,7 @@ public class GraphManagerIT { //get our 2 edges Observable<Edge> edges = - gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); + gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); Iterator<Edge> results = edges.toBlocking().getIterator(); @@ -1111,9 +1103,163 @@ public class GraphManagerIT { @Test + public void testMarkCompactSourceEdges() throws InterruptedException { + + final GraphManager gm = emf.createEdgeManager( scope ); + + Id sourceId = new SimpleId( "source" ); + Id targetId1 = new SimpleId( "target" ); + Id targetId2 = new SimpleId( "target2" ); + + + long startTime = System.currentTimeMillis(); + + long edge1Time = startTime; + long edge2Time = edge1Time + 1; + + final long maxVersion = edge2Time; + + Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time ); + + gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null ); + + Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time ); + + gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null ); + + + assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) >= 0 ); + assertTrue( Long.compare( maxVersion, edge1.getTimestamp() ) >= 0 ); + + + gm.markEdge( edge1 ).toBlocking().last(); + + + //get our 2 edges + Observable<Edge> edges = gm.loadEdgesFromSource( + createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + + + Iterator<Edge> results = edges.toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() ); + + assertEquals( "Edges correct", edge1.getTargetNode(), results.next().getTargetNode() ); + + assertFalse( "No more edges", results.hasNext() ); + + + //now delete one of the edges + + gm.deleteEdge( edge1 ).toBlocking().last(); + + + edges = gm.loadEdgesFromSource( + createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + + + results = edges.toBlocking().getIterator(); + + assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() ); + + assertFalse( "No more edges", results.hasNext() ); + + //delete an edge we didn't mark, should be a no-op + gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null ); + + edges = gm.loadEdgesFromSource( + createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); + + + results = edges.toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); + + assertFalse( "No more edges", results.hasNext() ); + } + + + @Test + public void testMarkCompactTargetEdges() { + + final GraphManager gm = emf.createEdgeManager( scope ); + + Id sourceId1 = new SimpleId( "source" ); + Id sourceId2 = new SimpleId( "source2" ); + Id targetId = new SimpleId( "target" ); + + Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() ); + + gm.writeEdge( edge1 ).toBlocking().last(); + + Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() ); + + gm.writeEdge( edge2 ).toBlocking().last(); + + + final long maxVersion = System.currentTimeMillis(); + + + //now delete one of the edges + + gm.markEdge( edge1 ).toBlocking().last(); + + //get our 2 edges + Observable<Edge> edges = gm.loadEdgesToTarget( + createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); + + + Iterator<Edge> results = edges.toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); + + assertEquals( "Edges correct", edge1.getSourceNode(), results.next().getSourceNode() ); + + + assertFalse( "No more edges", results.hasNext() ); + + + //now delete one of the edges + + gm.deleteEdge( edge1 ).toBlocking().last(); + + edges = gm.loadEdgesToTarget( + createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); + + + results = edges.toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + + + //delete an edge we didn't mark, should be a no-op + gm.deleteEdge( edge2 ).toBlocking().lastOrDefault( null ); + + edges = gm.loadEdgesToTarget( + createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); + + + results = edges.toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); + + assertFalse( "No more edges", results.hasNext() ); + } + + + @Test public void testMarkSourceEdgesType() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -1133,7 +1279,7 @@ public class GraphManagerIT { //get our 2 edges Observable<Edge> edges = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); Iterator<Edge> results = edges.toBlocking().getIterator(); @@ -1149,7 +1295,7 @@ public class GraphManagerIT { edges = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1158,7 +1304,7 @@ public class GraphManagerIT { edges = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) ); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1174,7 +1320,7 @@ public class GraphManagerIT { edges = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) ); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId2.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1191,7 +1337,7 @@ public class GraphManagerIT { @Test public void testMarkTargetEdgesType() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1210,7 +1356,7 @@ public class GraphManagerIT { //get our 2 edges Observable<Edge> edges = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ); + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ); Iterator<Edge> results = edges.toBlocking().getIterator(); @@ -1226,8 +1372,7 @@ public class GraphManagerIT { edges = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(), - null ) ); + createSearchByEdgeAndId( edge1.getSourceNode(), edge1.getType(), maxVersion, sourceId1.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1236,7 +1381,7 @@ public class GraphManagerIT { edges = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) ); + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1252,7 +1397,7 @@ public class GraphManagerIT { edges = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) ); + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId2.getType(), null ) ); results = edges.toBlocking().getIterator(); @@ -1269,7 +1414,7 @@ public class GraphManagerIT { @Test public void markSourceNode() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -1287,8 +1432,8 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); Iterator<Edge> results = - gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ) - .toBlocking().getIterator(); + gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); assertEquals( "Edge found", edge2, results.next() ); @@ -1300,8 +1445,8 @@ public class GraphManagerIT { //get our 2 edges results = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ) - .toBlocking().getIterator(); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking() + .getIterator(); assertEquals( "Edges correct", edge1, results.next() ); @@ -1310,8 +1455,8 @@ public class GraphManagerIT { //now delete one of the edges results = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ) - .toBlocking().getIterator(); + createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking() + .getIterator(); assertEquals( "Edges correct", edge2, results.next() ); @@ -1324,26 +1469,61 @@ public class GraphManagerIT { //now re-read, nothing should be there since they're marked - results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ) - .toBlocking().getIterator(); + results = + gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); assertFalse( "No more edges", results.hasNext() ); //get our 2 edges results = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ) - .toBlocking().getIterator(); + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking() + .getIterator(); assertFalse( "No more edges", results.hasNext() ); + //now delete one of the edges results = gm.loadEdgesFromSourceByType( - createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ) + createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking() + .getIterator(); + + + assertFalse( "No more edges", results.hasNext() ); + + + //test they come back unfiltered + + //read with filter marked off to ensure they're still persisted + results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ) .toBlocking().getIterator(); + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2, results.next() ); + assertFalse( "No more edges", results.hasNext() ); } @@ -1351,7 +1531,7 @@ public class GraphManagerIT { @Test public void markTargetNode() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1369,8 +1549,8 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); Iterator<Edge> results = - gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ) - .toBlocking().getIterator(); + gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); assertEquals( "Edge found", edge2, results.next() ); @@ -1382,7 +1562,69 @@ public class GraphManagerIT { //get our 2 edges results = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ) + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edges correct", edge2, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //mark the source node + gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last(); + + + //now re-read, nothing should be there since they're marked + + results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); + + assertFalse( "No more edges", results.hasNext() ); + + + //get our 2 edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking() + .getIterator(); + + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking() + .getIterator(); + + + assertFalse( "No more edges", results.hasNext() ); + + + //now test they come back when unfiltered + + results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + //get our 2 edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ) .toBlocking().getIterator(); @@ -1392,10 +1634,180 @@ public class GraphManagerIT { //now delete one of the edges results = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ) + createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + } + + + @Test + public void markDeleteSourceNode() { + + final GraphManager gm = emf.createEdgeManager( scope ); + + Id sourceId = new SimpleId( "source" ); + Id targetId1 = new SimpleId( "target" ); + Id targetId2 = new SimpleId( "target2" ); + + Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() ); + + gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null ); + + Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() ); + + gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null ); + + + final long maxVersion = System.currentTimeMillis(); + + Iterator<Edge> results = + gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + //get our 2 edges + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndId( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edges correct", edge2, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //mark the source node + gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last(); + + + //test they come back unfiltered + + //read with filter marked off to ensure they're still persisted + results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge2, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete them + + + gm.compactNode( sourceId ).toBlocking().last(); + + results = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( sourceId, edge1.getType(), maxVersion, null ) ) .toBlocking().getIterator(); + assertFalse( "No more edges", results.hasNext() ); + + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ) + .toBlocking().getIterator(); + + + assertFalse( "No more edges", results.hasNext() ); + + results = gm.loadEdgesFromSourceByType( + createSearchByEdgeAndIdUnfiltered( sourceId, edge2.getType(), maxVersion, targetId2.getType(), null ) ) + .toBlocking().getIterator(); + + + assertFalse( "No more edges", results.hasNext() ); + } + + + @Test + public void markDeleteTargetNode() { + + final GraphManager gm = emf.createEdgeManager( scope ); + + Id sourceId1 = new SimpleId( "source" ); + Id sourceId2 = new SimpleId( "source2" ); + Id targetId = new SimpleId( "target" ); + + Edge edge1 = createEdge( sourceId1, "test", targetId, System.currentTimeMillis() ); + + gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null ); + + Edge edge2 = createEdge( sourceId2, "test", targetId, System.currentTimeMillis() ); + + gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null ); + + + final long maxVersion = System.currentTimeMillis(); + + Iterator<Edge> results = + gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + //get our 2 edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ).toBlocking() + .getIterator(); + + + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ).toBlocking() + .getIterator(); + + assertEquals( "Edges correct", edge2, results.next() ); assertFalse( "No more edges", results.hasNext() ); @@ -1406,15 +1818,55 @@ public class GraphManagerIT { //now re-read, nothing should be there since they're marked - results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ) + + //now test they come back when unfiltered + + results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edge found", edge2, results.next() ); + + assertEquals( "Edge found", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + + //get our 2 edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ) + .toBlocking().getIterator(); + + + assertEquals( "Edges correct", edge1, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now delete one of the edges + results = gm.loadEdgesToTargetByType( + createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ) .toBlocking().getIterator(); + + assertEquals( "Edges correct", edge2, results.next() ); + + assertFalse( "No more edges", results.hasNext() ); + + //now compact, everything should be removed + + gm.compactNode( targetId ).toBlocking().last(); + + + results = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( targetId, edge1.getType(), maxVersion, null ) ) + .toBlocking().getIterator(); + + assertFalse( "No more edges", results.hasNext() ); //get our 2 edges results = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ) + createSearchByEdgeAndIdUnfiltered( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ) .toBlocking().getIterator(); @@ -1422,7 +1874,7 @@ public class GraphManagerIT { //now delete one of the edges results = gm.loadEdgesToTargetByType( - createSearchByEdgeAndId( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ) + createSearchByEdgeAndIdUnfiltered( targetId, edge2.getType(), maxVersion, sourceId2.getType(), null ) ) .toBlocking().getIterator(); @@ -1433,7 +1885,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypesPrefix() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId = new SimpleId( "target" ); @@ -1454,7 +1906,7 @@ public class GraphManagerIT { //get our 2 edge types Observable<String> edges = - gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) ); + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test1", null ) ); Iterator<String> results = edges.toBlocking().getIterator(); @@ -1483,7 +1935,7 @@ public class GraphManagerIT { public void testSourceSubTypes() { //now test sub edges - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1target1 = new SimpleId( "type1target1" ); @@ -1504,8 +1956,8 @@ public class GraphManagerIT { gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null ); - Observable<String> edges = gm.getIdTypesFromSource( - new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) ); + Observable<String> edges = + gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type1", null ) ); Iterator<String> results = edges.toBlocking().getIterator(); @@ -1517,8 +1969,8 @@ public class GraphManagerIT { assertFalse( "No results", results.hasNext() ); //now get types for test2 - edges = gm.getIdTypesFromSource( - new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) ); + edges = + gm.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", "type2", null ) ); results = edges.toBlocking().getIterator(); @@ -1532,7 +1984,7 @@ public class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypesPrefix() { - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id targetId = new SimpleId( "target" ); Id sourceId = new SimpleId( "source" ); @@ -1553,7 +2005,7 @@ public class GraphManagerIT { //get our 2 edge types Observable<String> edges = - gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) ); + gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test1", null ) ); Iterator<String> results = edges.toBlocking().getIterator(); @@ -1582,7 +2034,7 @@ public class GraphManagerIT { public void testTargetSubTypes() { //now test sub edges - final GraphManager gm = emf.createEdgeManager( scope ); + final GraphManager gm = emf.createEdgeManager( scope ); Id targetId = new SimpleId( "target" ); Id sourceId1target1 = new SimpleId( "type1source1" ); @@ -1603,8 +2055,8 @@ public class GraphManagerIT { gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null ); - Observable<String> edges = gm.getIdTypesToTarget( - new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) ); + Observable<String> edges = + gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type1", null ) ); Iterator<String> results = edges.toBlocking().getIterator(); @@ -1616,8 +2068,8 @@ public class GraphManagerIT { assertFalse( "No results", results.hasNext() ); //now get types for test2 - edges = gm.getIdTypesToTarget( - new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) ); + edges = + gm.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", "type2", null ) ); results = edges.toBlocking().getIterator(); @@ -1629,7 +2081,7 @@ public class GraphManagerIT { @Test( expected = NullPointerException.class ) - public void invalidEdgeTypesWrite( ) { + public void invalidEdgeTypesWrite() { final GraphManager em = emf.createEdgeManager( scope ); em.writeEdge( null ); @@ -1637,7 +2089,7 @@ public class GraphManagerIT { @Test( expected = NullPointerException.class ) - public void invalidEdgeTypesDelete( ) { + public void invalidEdgeTypesDelete() { final GraphManager em = emf.createEdgeManager( scope ); em.markEdge( null ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ad6a98/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java index b2142e8..718fc70 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java @@ -152,6 +152,19 @@ public class EdgeTestUtils { return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, last ); } + /** + * + * @param sourceId + * @param type + * @param maxVersion + * @param last + * @return + */ + public static SearchByEdgeType createSearchByEdgeUnfiltered( final Id sourceId, final String type, final long maxVersion, + final Edge last ) { + return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.fromNullable( last ) , false ); + } + /** * @@ -167,6 +180,20 @@ public class EdgeTestUtils { return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, Optional.fromNullable(last) ); } + /** + * + * @param sourceId + * @param type + * @param maxVersion + * @param idType + * @param last + * @return + */ + public static SearchByIdType createSearchByEdgeAndIdUnfiltered( final Id sourceId, final String type, final long maxVersion, + final String idType, final Edge last ) { + return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, Optional.fromNullable(last), false ); + } + /** *
