Completes refactor of compact/delete operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/93bacf51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/93bacf51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/93bacf51 Branch: refs/heads/two-dot-o-dev Commit: 93bacf513a532ec12b4603b5b14d2bca9cabaafa Parents: 9a89437 Author: Todd Nine <[email protected]> Authored: Sun May 10 04:08:16 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Sun May 10 04:59:22 2015 -0600 ---------------------------------------------------------------------- .../persistence/graph/GraphManager.java | 27 +- .../graph/impl/GraphManagerImpl.java | 451 ++++++++----------- .../graph/impl/stage/EdgeDeleteListener.java | 2 +- .../impl/stage/EdgeDeleteListenerImpl.java | 31 +- .../graph/impl/stage/EdgeMetaRepair.java | 6 +- .../graph/impl/stage/EdgeMetaRepairImpl.java | 196 ++++---- .../graph/impl/stage/NodeDeleteListener.java | 2 +- .../graph/CommittedGraphManagerIT.java | 135 ------ .../persistence/graph/GraphManagerIT.java | 67 ++- .../graph/StorageGraphManagerIT.java | 240 ---------- 10 files changed, 346 insertions(+), 811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java index 987a36c..6100725 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java @@ -27,8 +27,7 @@ import rx.Observable; /** - * Represents operations that can be performed on edges within our graph. A graph should be within an - * ApplicationScope + * Represents operations that can be performed on edges within our graph. A graph should be within an ApplicationScope * * An Edge: is defined as the following. * @@ -67,25 +66,37 @@ public interface GraphManager extends CPManager { * @param edge Mark the edge as deleted in the graph * * - * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version + * Implementation should also mark the incoming (reversed) edge. Only marks the specific version */ Observable<Edge> markEdge( Edge edge ); /** + * @param edge Remove the edge in the graph + * * + * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version + * Will only delete if the edge has marked versions + */ + Observable<Edge> deleteEdge( Edge edge ); + + /** * Mark the node as removed from the graph. * * @param node The node to remove - * @param timestamp The timestamp to apply the delete operation. Any edges connected to this node with a timestmap - * <= the specified time will be removed from the graph - * @return + * @param timestamp The timestamp to apply the mark operation. */ Observable<Id> markNode( Id node, long timestamp ); /** + * Mark the node as removed from the graph. + * + * @param node The node to remove. This will apply a timestamp to apply the delete + compact operation. Any edges connected to this node with a timestamp + * <= the specified time on the mark will be removed from the graph + */ + Observable<Id> compactNode( final Id node ); + + /** * Get all versions of this edge where versions <= max version - * @param edge - * @return */ Observable<Edge> loadEdgeVersions( SearchByEdge edge ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 9a8a00f..06cb5a1 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; @@ -52,19 +53,14 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import rx.Notification; import rx.Observable; -import rx.Observer; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; import rx.functions.Func1; -import rx.schedulers.Schedulers; /** @@ -86,32 +82,19 @@ public class GraphManagerImpl implements GraphManager { private final EdgeDeleteListener edgeDeleteListener; private final NodeDeleteListener nodeDeleteListener; private final Timer writeEdgeTimer; - private final Meter writeEdgeMeter; - private final Meter deleteEdgeMeter; - private final Timer deleteEdgeTimer; - private final Meter deleteNodeMeter; - private final Timer deleteNodeTimer; - private final Meter loadEdgesFromSourceMeter; + private final Timer markEdgeTimer; + private final Timer markNodeTimer; private final Timer loadEdgesFromSourceTimer; - private final Meter loadEdgesToTargetMeter; private final Timer loadEdgesToTargetTimer; - private final Meter loadEdgesVersionsMeter; private final Timer loadEdgesVersionsTimer; - private final Meter loadEdgesFromSourceByTypeMeter; private final Timer loadEdgesFromSourceByTypeTimer; - private final Meter loadEdgesToTargetByTypeMeter; private final Timer loadEdgesToTargetByTypeTimer; private final Timer getEdgeTypesFromSourceTimer; - private final Meter getEdgeTypesFromSourceMeter; private final Timer getIdTypesFromSourceTimer; - private final Meter getIdTypesFromSourceMeter; - private final Meter getEdgeTypesToTargetMeter; private final Timer getEdgeTypesToTargetTimer; private final Timer getIdTypesToTargetTimer; - private final Meter getIdTypesToTargetMeter; - - private Observer<Integer> edgeDeleteSubcriber; - private Observer<Integer> nodeDelete; + private final Timer deleteNodeTimer; + private final Timer deleteEdgeTimer; private final GraphFig graphFig; @@ -141,36 +124,24 @@ public class GraphManagerImpl implements GraphManager { this.edgeDeleteListener = edgeDeleteListener; this.nodeDeleteListener = nodeDeleteListener; - this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE; - this.nodeDelete = MetricSubscriber.INSTANCE; - this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "write.edge.meter" ); - this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge.timer" ); - this.deleteEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.edge.meter" ); - this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge.timer" ); - this.deleteNodeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.node.meter" ); - this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node.timer" ); - this.loadEdgesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.meter" ); - this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.timer" ); - this.loadEdgesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.meter" ); - this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.timer" ); - this.loadEdgesVersionsMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.versions.meter" ); - this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions.timer" ); - this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.type.meter" ); - this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type.timer" ); - this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.type.meter" ); - this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type.timer" ); - - this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from.timer" ); - this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.from.meter" ); - - this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from.timer" ); - this.getIdTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.from.meter" ); - - this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to.timer" ); - this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.to.meter" ); - - this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to.timer" ); - this.getIdTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.to.meter" ); + this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge" ); + this.markEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "mark.edge" ); + this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge" ); + this.markNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "mark.node" ); + this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node" ); + this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from" ); + this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to" ); + this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions" ); + this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type" ); + this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type" ); + + this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from" ); + + this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from" ); + + this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to" ); + + this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to" ); } @@ -179,42 +150,29 @@ public class GraphManagerImpl implements GraphManager { GraphValidation.validateEdge( edge ); final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false ); - final Timer.Context timer = writeEdgeTimer.time(); - final Meter meter = writeEdgeMeter; - - return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() { - @Override - public Edge call( final MarkedEdge edge ) { - final UUID timestamp = UUIDGenerator.newTimeUUID(); + final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> { + final UUID timestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge ); - final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp ); + final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge1 ); - mutation.mergeShallow( edgeMutation ); + final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge1, timestamp ); - try { - mutation.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute mutation", e ); - } + mutation.mergeShallow( edgeMutation ); - return edge; - } - } ).doOnEach( new Action1<Notification<? super Edge>>() { - @Override - public void call( Notification<? super Edge> notification ) { - meter.mark(); + try { + mutation.execute(); } - } ).doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute mutation", e ); } + + return edge1; } ); + + return ObservableTimer.time( observable, writeEdgeTimer ); } @@ -224,52 +182,64 @@ public class GraphManagerImpl implements GraphManager { final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); - final Timer.Context timer = deleteEdgeTimer.time(); - final Meter meter = deleteEdgeMeter; - return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() { - @Override - public Edge call( final MarkedEdge edge ) { + final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> { - final UUID timestamp = UUIDGenerator.newTimeUUID(); + final UUID timestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp ); + final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge1, timestamp ); - LOG.debug( "Marking edge {} as deleted to commit log", edge ); - try { - edgeMutation.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute mutation", e ); - } + LOG.debug( "Marking edge {} as deleted to commit log", edge1 ); + try { + edgeMutation.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute mutation", e ); + } - //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge, - // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber ); - edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() ) - .subscribe( edgeDeleteSubcriber ); + return edge1; + } ); + return ObservableTimer.time( observable, markEdgeTimer ); + } - return edge; - } - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + + @Override + public Observable<Edge> deleteEdge( final Edge edge ) { + + GraphValidation.validateEdge( edge ); + final UUID startTimestamp = UUIDGenerator.newTimeUUID(); + + + final Observable<Edge> observable = + Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgeVersions( scope, + new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), + Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, Optional.absent() ) ); + } + } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked -> + //fire our delete listener and wait for the results + edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext( + //log them + count -> LOG.debug( "removed {} types for edge {} ", count, edge ) ) + //return the marked edge + .map( count -> marked ) ); + + + return ObservableTimer.time( observable, deleteEdgeTimer ); } @Override public Observable<Id> markNode( final Id node, final long timestamp ) { - final Timer.Context timer = deleteNodeTimer.time(); - final Meter meter = deleteNodeMeter; - return Observable.just( node ).map( id -> { + final Observable<Id> idObservable = Observable.just( node ).map( id -> { //mark the node as deleted - - final UUID eventTimestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); @@ -282,161 +252,166 @@ public class GraphManagerImpl implements GraphManager { } - //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn( - // Schedulers.io() ).subscribe( nodeDelete ); - nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() ) - .subscribe( nodeDelete ); - return id; - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + } ); + + return ObservableTimer.time( idObservable, markNodeTimer ); + } + + + @Override + public Observable<Id> compactNode( final Id inputNode ) { + + + final UUID startTime = UUIDGenerator.newTimeUUID(); + + + final Observable<Id> nodeObservable = + Observable.just( inputNode ).map( node -> nodeSerialization.getMaxVersion( scope, inputNode ) ).takeWhile( + maxTimestamp -> maxTimestamp.isPresent() ) + + //map our delete listener + .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) ) + //set to 0 if nothing is emitted + .lastOrDefault( 0 ) + //log for posterity + .doOnNext( count -> LOG.debug( "Removed {} edges from node {}", count, inputNode ) ) + //return our id + .map( count -> inputNode ); + + return ObservableTimer.time( nodeObservable, this.deleteNodeTimer ); } @Override public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) { - final Timer.Context timer = loadEdgesVersionsTimer.time(); - final Meter meter = loadEdgesVersionsMeter; - return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ) - .cast( Edge.class ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + + final Observable<Edge> edges = + Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); + } + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ); + + return ObservableTimer.time( edges, loadEdgesVersionsTimer ); } @Override public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) { - final Timer.Context timer = loadEdgesFromSourceTimer.time(); - final Meter meter = loadEdgesFromSourceMeter; - return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesFromSource( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) - .doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<Edge> edges = + Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesFromSource( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + + return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); } @Override public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) { - final Timer.Context timer = loadEdgesToTargetTimer.time(); - final Meter meter = loadEdgesToTargetMeter; - return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesToTarget( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) - .doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<Edge> edges = + Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesToTarget( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + + + return ObservableTimer.time( edges, loadEdgesToTargetTimer ); } @Override public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { - final Timer.Context timer = loadEdgesFromSourceByTypeTimer.time(); - final Meter meter = loadEdgesFromSourceByTypeMeter; - return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ) - - .cast( Edge.class ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<Edge> edges = + Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + + return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); } @Override public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { - final Timer.Context timer = loadEdgesToTargetByTypeTimer.time(); - final Meter meter = loadEdgesToTargetByTypeMeter; - return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<MarkedEdge> getIterator() { - return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); - } - } ).buffer( graphFig.getScanPageSize() ) - .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) - .doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<Edge> edges = + Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<MarkedEdge> getIterator() { + return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); + } + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + + return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer ); } @Override public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) { - final Timer.Context timer = getEdgeTypesFromSourceTimer.time(); - final Meter meter = getEdgeTypesFromSourceMeter; - return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); - } - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<String> edgeTypes = Observable.create( + new ObservableIterator<String>( "getEdgeTypesFromSource" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); + } + } ); + + return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer ); } @Override public Observable<String> getIdTypesFromSource( final SearchIdType search ) { - final Timer.Context timer = getIdTypesFromSourceTimer.time(); - final Meter meter = getIdTypesFromSourceMeter; - return Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) { + final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) { @Override protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); } - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + } ); + + return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer ); } @Override public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) { - final Timer.Context timer = getEdgeTypesToTargetTimer.time(); - final Meter meter = getEdgeTypesToTargetMeter; - return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); - } - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<String> edgeTypes = Observable.create( + new ObservableIterator<String>( "getEdgeTypesToTarget" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); + } + } ); + + return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer ); } @Override public Observable<String> getIdTypesToTarget( final SearchIdType search ) { - final Timer.Context timer = getIdTypesToTargetTimer.time(); - final Meter meter = getIdTypesToTargetMeter; - return Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) { - @Override - protected Iterator<String> getIterator() { - return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); - } - } ).doOnEach( notification -> { - meter.mark(); - } ).doOnCompleted( () -> timer.stop() ); + final Observable<String> edgeTypes = Observable.create( + new ObservableIterator<String>( "getIdTypesToTarget" ) { + @Override + protected Iterator<String> getIterator() { + return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); + } + } ); + + return ObservableTimer.time( edgeTypes, getIdTypesToTargetTimer ); } @@ -472,7 +447,7 @@ public class GraphManagerImpl implements GraphManager { final long edgeTimestamp = edge.getTimestamp(); //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { + if (( edge.isDeleted() && filterMarked) || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { return false; } @@ -498,52 +473,4 @@ public class GraphManagerImpl implements GraphManager { } ); } } - - - /** - * Set the subscription for the edge delete - */ - public void setEdgeDeleteSubcriber( final Observer<Integer> edgeDeleteSubcriber ) { - Preconditions.checkNotNull( edgeDeleteSubcriber, "Subscriber cannot be null" ); - this.edgeDeleteSubcriber = edgeDeleteSubcriber; - } - - - /** - * Set the subscription for the node delete - */ - public void setNodeDelete( final Observer<Integer> nodeDelete ) { - Preconditions.checkNotNull( nodeDelete, "Subscriber cannot be null" ); - this.nodeDelete = nodeDelete; - } - - - /** - * Simple subscriber that can be used to gather metrics. Needs to be refactored to use codehale metrics - */ - private static class MetricSubscriber extends Subscriber<Integer> { - - - private static final MetricSubscriber INSTANCE = new MetricSubscriber(); - - private final Logger logger = LoggerFactory.getLogger( MetricSubscriber.class ); - - - @Override - public void onCompleted() { - logger.debug( "Event completed" ); - } - - - @Override - public void onError( final Throwable e ) { - logger.error( "Failed to execute event", e ); - } - - - @Override - public void onNext( final Integer integer ) { - logger.debug( "Next received {}", integer ); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java index 4b53940..d8a8896 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java @@ -40,7 +40,7 @@ public interface EdgeDeleteListener { * @param eventTimestamp * @return */ - public Observable<Integer> receive( final ApplicationScope scope, final MarkedEdge edge, final UUID eventTimestamp ); + Observable<Integer> receive( final ApplicationScope scope, final MarkedEdge edge, final UUID eventTimestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java index 7e42293..22a5ad0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java @@ -61,27 +61,16 @@ public class EdgeDeleteListenerImpl implements EdgeDeleteListener { return edgeDeleteRepair.repair( scope, edge, eventTimestamp ) - .flatMap( new Func1<MarkedEdge, Observable<Integer>>() { - @Override - public Observable<Integer> call( final MarkedEdge markedEdge ) { - - Observable<Integer> sourceDelete = edgeMetaRepair - .repairSources( scope, edge.getSourceNode(), edge.getType(), - maxTimestamp ); - - Observable<Integer> targetDelete = edgeMetaRepair - .repairTargets( scope, edge.getTargetNode(), edge.getType(), - maxTimestamp ); - - return Observable.zip( sourceDelete, targetDelete, - new Func2<Integer, Integer, Integer>() { - @Override - public Integer call( final Integer sourceCount, - final Integer targetCount ) { - return sourceCount + targetCount; - } - } ); - } + .flatMap( markedEdge -> { + + Observable<Integer> sourceDelete = edgeMetaRepair + .repairSources( scope, edge.getSourceNode(), edge.getType(), maxTimestamp ); + + Observable<Integer> targetDelete = edgeMetaRepair + .repairTargets( scope, edge.getTargetNode(), edge.getType(), maxTimestamp ); + + return Observable.zip( sourceDelete, targetDelete, + ( sourceCount, targetCount ) -> sourceCount + targetCount ); } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java index 5ba822b..efd88fc 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java @@ -27,7 +27,7 @@ import rx.Observable; /** - * Audits edge meta data and removes them if they're obscelete + * Audits edge meta data and removes them if they're obsolete */ public interface EdgeMetaRepair { @@ -42,7 +42,7 @@ public interface EdgeMetaRepair { * @return An observable that emits the total number of sub types still in use. 0 implies the type and subtypes * have been removed. Anything > 0 implies the edgeType and subTypes are still in use */ - public Observable<Integer> repairSources( ApplicationScope scope, Id sourceId, String edgeType, long maxTimestamp ); + Observable<Integer> repairSources( ApplicationScope scope, Id sourceId, String edgeType, long maxTimestamp ); /** @@ -56,5 +56,5 @@ public interface EdgeMetaRepair { * @return An observable that emits the total number of sub types still in use. 0 implies the type and subtypes * have been removed. Anything > 0 implies the edgeType and subTypes are still in use */ - public Observable<Integer> repairTargets( ApplicationScope scope, Id targetId, String edgeType, long maxTimestamp ); + Observable<Integer> repairTargets( ApplicationScope scope, Id targetId, String edgeType, long maxTimestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java index 5e76f75..206145a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java @@ -118,91 +118,79 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair { Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, maxTimestamp ).buffer( graphFig.getRepairConcurrentSize() ) //buffer them into concurrent groups based on the concurrent repair size - .flatMap( new Func1<List<String>, Observable<Integer>>() { - - @Override - public Observable<Integer> call( final List<String> types ) { - - - final MutationBatch batch = keyspace.prepareMutationBatch(); - - final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() ); - - //for each id type, check if the exist in parallel to increase processing speed - for ( final String subType : types ) { - - LOG.debug( "Checking for edges with nodeId {}, type {}, and subtype {}", node, edgeType, - subType ); - - Observable<Integer> search = - //load each edge in it's own thread - serialization.loadEdges( scope, node, edgeType, subType, maxTimestamp ) - .doOnNext( RX_LOG ).take( 1 ).count() - .doOnNext( new Action1<Integer>() { - - @Override - public void call( final Integer count ) { - /** - * we only want to delete if no edges are in this class. If - * there - * are - * still edges - * we must retain the information in order to keep our index - * structure - * correct for edge - * iteration - **/ - if ( count != 0 ) { - LOG.debug( "Found edge with nodeId {}, type {}, " - + "and subtype {}. Not removing subtype. ", - node, edgeType, subType ); - return; - } - - - LOG.debug( "No edges with nodeId {}, type {}, " - + "and subtype {}. Removing subtype.", node, - edgeType, subType ); - batch.mergeShallow( serialization - .removeEdgeSubType( scope, node, edgeType, subType, - maxTimestamp ) ); - } - } ); - - checks.add( search ); - } - - - /** - * Sum up the total number of edges we had, then execute the mutation if we have - * anything to do - */ - - - return MathObservable.sumInteger( Observable.merge( checks ) ) - .doOnNext( new Action1<Integer>() { - @Override - public void call( final Integer count ) { - - - LOG.debug( - "Executing batch for subtype deletion with " + - "type {}. " - + "Mutation has {} rows to mutate ", - edgeType, batch.getRowCount() ); - - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); - } - } - } - - - ); + .flatMap( types -> { + + + final MutationBatch batch = keyspace.prepareMutationBatch(); + + final List<Observable<Integer>> checks = new ArrayList<Observable<Integer>>( types.size() ); + + //for each id type, check if the exist in parallel to increase processing speed + for ( final String subType : types ) { + + LOG.debug( "Checking for edges with nodeId {}, type {}, and subtype {}", node, edgeType, + subType ); + + Observable<Integer> search = + //load each edge in it's own thread + serialization.loadEdges( scope, node, edgeType, subType, maxTimestamp ) + .doOnNext( RX_LOG ).take( 1 ).count() + .doOnNext( count -> { + /** + * we only want to delete if no edges are in this class. If + * there + * are + * still edges + * we must retain the information in order to keep our index + * structure + * correct for edge + * iteration + **/ + if ( count != 0 ) { + LOG.debug( "Found edge with nodeId {}, type {}, " + + "and subtype {}. Not removing subtype. ", + node, edgeType, subType ); + return; + } + + + LOG.debug( "No edges with nodeId {}, type {}, " + + "and subtype {}. Removing subtype.", node, + edgeType, subType ); + batch.mergeShallow( serialization + .removeEdgeSubType( scope, node, edgeType, subType, + maxTimestamp ) ); + } ); + + checks.add( search ); } + + + /** + * Sum up the total number of edges we had, then execute the mutation if we have + * anything to do + */ + + + return MathObservable.sumInteger( Observable.merge( checks ) ) + .doOnNext( count -> { + + + LOG.debug( "Executing batch for subtype deletion with " + + "type {}. " + "Mutation has {} rows to mutate ", + edgeType, batch.getRowCount() ); + + try { + batch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( + "Unable to connect to casandra", e ); + } + } + + + ); } ) //if we get no edges, emit a 0 so the caller knows we can delete the type .defaultIfEmpty( 0 ); @@ -210,29 +198,25 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair { //sum up everything emitted by sub types. If there's no edges existing for all sub types, // then we can safely remove them - return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 ).doOnNext( new Action1<Integer>() { - - @Override - public void call( final Integer subTypeUsedCount ) { - - /** - * We can only execute deleting this type if no sub types were deleted - */ - if ( subTypeUsedCount != 0 ) { - LOG.debug( "Type {} has {} subtypes in use as of maxTimestamp {}. Not deleting type.", edgeType, - subTypeUsedCount, maxTimestamp ); - return; - } + return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 ).doOnNext( subTypeUsedCount -> { + + /** + * We can only execute deleting this type if no sub types were deleted + */ + if ( subTypeUsedCount != 0 ) { + LOG.debug( "Type {} has {} subtypes in use as of maxTimestamp {}. Not deleting type.", edgeType, + subTypeUsedCount, maxTimestamp ); + return; + } - LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType, - maxTimestamp ); - try { - serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); - } + LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType, + maxTimestamp ); + try { + serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to connect to casandra", e ); } } ); } @@ -241,7 +225,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair { /** * Simple edge serialization */ - private static interface CleanSerialization { + private interface CleanSerialization { /** * Load all subtypes for the edge with a maxTimestamp <= the provided maxTimestamp http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java index 89f280d..68569e5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java @@ -42,5 +42,5 @@ public interface NodeDeleteListener { * @return An observable that emits the total number of edges that have been removed with this node both as the * target and source */ - public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ); + Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java deleted file mode 100644 index 658f4bf..0000000 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. - * - */ -package org.apache.usergrid.persistence.graph; - - -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.test.ITRunner; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.graph.guice.TestGraphModule; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; - - -/** - * Integration test that performs all calls immediately after writes without blocking. Tests that our - * view is immediately consistent to our users, even if we have yet to perform background processing - */ -@RunWith(ITRunner.class) -@UseModules({ TestGraphModule.class }) -public class CommittedGraphManagerIT extends GraphManagerIT { - - - @Override - protected GraphManager getHelper(GraphManager gm) { - return new ComittedGraphTestHelper( gm ); - } - - - /** - * Doesn't wait for the async process to happen before returning. Simply executes and immediately returns. - */ - public static class ComittedGraphTestHelper implements GraphManager { - - private final GraphManager graphManager; - - - public ComittedGraphTestHelper( final GraphManager graphManager ) { - this.graphManager = graphManager; - } - - - @Override - public Observable<Edge> writeEdge( final Edge edge ) { - return graphManager.writeEdge( edge ); - } - - - @Override - public Observable<Edge> markEdge( final Edge edge ) { - return graphManager.markEdge( edge ); - } - - - @Override - public Observable<Id> markNode( final Id node, final long timestamp ) { - return graphManager.markNode( node, timestamp ); - } - - - @Override - public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) { - return graphManager.loadEdgeVersions( edge ); - } - - - @Override - public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) { - return graphManager.loadEdgesFromSource( search ); - } - - - @Override - public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) { - return graphManager.loadEdgesToTarget( search ); - } - - - @Override - public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { - return graphManager.loadEdgesFromSourceByType(search); - } - - - @Override - public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { - return graphManager.loadEdgesToTargetByType( search ); - } - - - @Override - public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) { - return graphManager.getEdgeTypesFromSource( search ); - } - - - @Override - public Observable<String> getIdTypesFromSource( final SearchIdType search ) { - return graphManager.getIdTypesFromSource( search ); - } - - - @Override - public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) { - return graphManager.getEdgeTypesToTarget( search ); - } - - - @Override - public Observable<String> getIdTypesToTarget( final SearchIdType search ) { - return graphManager.getIdTypesToTarget( search ); - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index 340c712..75f61f0 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -25,11 +25,15 @@ import java.util.concurrent.TimeoutException; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.core.util.IdGenerator; +import org.apache.usergrid.persistence.graph.guice.TestGraphModule; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType; import org.apache.usergrid.persistence.model.entity.Id; @@ -49,8 +53,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - -public abstract class GraphManagerIT { +@RunWith(ITRunner.class) +@UseModules({ TestGraphModule.class }) +public class GraphManagerIT { @Inject @@ -64,12 +69,6 @@ public abstract class GraphManagerIT { protected ApplicationScope scope; - /** - * Get the helper for performing our tests - * - * @return The helper to use when writing/synchronizing tests - */ - protected abstract GraphManager getHelper( GraphManager gm ); @Before @@ -81,7 +80,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeSource() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -113,7 +112,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeTarget() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -145,7 +144,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 1000l; @@ -179,7 +178,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -217,7 +216,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeVersionSourceDistinct() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -289,7 +288,7 @@ public abstract class GraphManagerIT { public void testWriteReadEdgeTypeVersionTargetDistinct() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final long earlyVersion = 10000l; @@ -361,7 +360,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypePagingSource() throws TimeoutException, InterruptedException { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final Id sourceId = IdGenerator.createId( "source" ); @@ -416,7 +415,7 @@ public abstract class GraphManagerIT { public void testWriteReadEdgeTypePagingTarget() { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); final Id targetId = IdGenerator.createId( "target" ); @@ -471,7 +470,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeTargetTypeSource() { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -507,7 +506,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypeTargetTypeTarget() { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); ; @@ -544,7 +543,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeDeleteSource() { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -614,7 +613,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeDeleteTarget() { - GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + GraphManager gm = emf.createEdgeManager( scope ); Edge edge = createEdge( "source", "test", "target" ); @@ -670,7 +669,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypes() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -733,7 +732,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypes() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -798,7 +797,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypesPaging() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -876,7 +875,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypesPaging() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -957,7 +956,7 @@ public abstract class GraphManagerIT { @Test public void testMarkSourceEdges() throws InterruptedException { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -1046,7 +1045,7 @@ public abstract class GraphManagerIT { @Test public void testMarkTargetEdges() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1114,7 +1113,7 @@ public abstract class GraphManagerIT { @Test public void testMarkSourceEdgesType() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -1192,7 +1191,7 @@ public abstract class GraphManagerIT { @Test public void testMarkTargetEdgesType() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1270,7 +1269,7 @@ public abstract class GraphManagerIT { @Test public void markSourceNode() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1 = new SimpleId( "target" ); @@ -1352,7 +1351,7 @@ public abstract class GraphManagerIT { @Test public void markTargetNode() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId1 = new SimpleId( "source" ); Id sourceId2 = new SimpleId( "source2" ); @@ -1434,7 +1433,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesSourceTypesPrefix() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId = new SimpleId( "target" ); @@ -1484,7 +1483,7 @@ public abstract class GraphManagerIT { public void testSourceSubTypes() { //now test sub edges - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id sourceId = new SimpleId( "source" ); Id targetId1target1 = new SimpleId( "type1target1" ); @@ -1533,7 +1532,7 @@ public abstract class GraphManagerIT { @Test public void testWriteReadEdgeTypesTargetTypesPrefix() { - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id targetId = new SimpleId( "target" ); Id sourceId = new SimpleId( "source" ); @@ -1583,7 +1582,7 @@ public abstract class GraphManagerIT { public void testTargetSubTypes() { //now test sub edges - final GraphManager gm = getHelper( emf.createEdgeManager( scope ) ); + final GraphManager gm = emf.createEdgeManager( scope ); Id targetId = new SimpleId( "target" ); Id sourceId1target1 = new SimpleId( "type1source1" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/93bacf51/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java deleted file mode 100644 index 84825aa..0000000 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. - * - */ -package org.apache.usergrid.persistence.graph; - - -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.test.ITRunner; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.graph.guice.TestGraphModule; -import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; -import rx.Observer; - - -/** - * Integration test that will block reads until all post processing has completed. This ensures once our data has made - * it to the permanent storage. Tests that our view is immediately consistent to our users, even if we have yet to - * perform background processing - */ -@RunWith( ITRunner.class ) -@UseModules( { TestGraphModule.class } ) -public class StorageGraphManagerIT extends GraphManagerIT { - - private static final Logger LOG = LoggerFactory.getLogger( StorageGraphManagerIT.class ); - - - @Before - public void setup(){ - } - - - - @Override - protected GraphManager getHelper( final GraphManager gm ) { - - - final StorageGraphTestHelper helper = new StorageGraphTestHelper( gm ); - - GraphManagerImpl gmi = ( GraphManagerImpl ) gm; - - - Observer<Integer> subscriber = new Observer<Integer>() { - @Override - public void onCompleted() { - helper.complete(); - } - - - @Override - public void onError( final Throwable e ) { - helper.complete(); - helper.error(); - } - - - @Override - public void onNext( final Integer integer ) { - //no op - } - }; - - gmi.setEdgeDeleteSubcriber(subscriber ); -// gmi.setEdgeWriteSubcriber( subscriber ); - gmi.setNodeDelete( subscriber ); - - return helper; - } - - - /** - * Doesn't wait for the async process to happen before returning. Simply executes and immediately returns. - */ - public static class StorageGraphTestHelper implements GraphManager { - - private final GraphManager graphManager; - private final AtomicInteger completeInvocations = new AtomicInteger( 0 ); - private final AtomicInteger errorInvocations = new AtomicInteger( 0 ); - private final Object mutex = new Object(); - - - public StorageGraphTestHelper( final GraphManager graphManager ) { - this.graphManager = graphManager; - } - - - @Override - public Observable<Edge> writeEdge( final Edge edge ) { - return graphManager.writeEdge( edge ); - } - - - @Override - public Observable<Edge> markEdge( final Edge edge ) { - waitForComplete(); - return graphManager.markEdge( edge ); - } - - - @Override - public Observable<Id> markNode( final Id node, final long timestamp ) { - waitForComplete(); - return graphManager.markNode( node, timestamp ); - } - - - @Override - public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) { - await(); - return graphManager.loadEdgeVersions( edge ); - } - - - @Override - public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) { - await(); - return graphManager.loadEdgesFromSource( search ); - } - - - @Override - public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) { - await(); - return graphManager.loadEdgesToTarget( search ); - } - - - @Override - public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { - await(); - return graphManager.loadEdgesFromSourceByType( search ); - } - - - @Override - public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { - await(); - return graphManager.loadEdgesToTargetByType( search ); - } - - - @Override - public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) { - await(); - return graphManager.getEdgeTypesFromSource( search ); - } - - - @Override - public Observable<String> getIdTypesFromSource( final SearchIdType search ) { - await(); - return graphManager.getIdTypesFromSource( search ); - } - - - @Override - public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) { - await(); - return graphManager.getEdgeTypesToTarget( search ); - } - - - @Override - public Observable<String> getIdTypesToTarget( final SearchIdType search ) { - await(); - return graphManager.getIdTypesToTarget( search ); - } - - - public void waitForComplete(){ - LOG.info( "Complete incremented" ); - completeInvocations.incrementAndGet(); - } - - public void complete() { - LOG.info( "Complete decremented" ); - completeInvocations.decrementAndGet(); - tryWake(); - } - - - public void error() { - LOG.info( "Error incremented" ); - errorInvocations.incrementAndGet(); - tryWake(); - } - - - public void tryWake() { - synchronized ( mutex ) { - mutex.notify(); - } - } - - - /** - * Away for our invocations to be 0 - */ - public void await() { - while ( completeInvocations.get() != 0 ) { - - LOG.info( "Waiting for more invocations, count is {} ", completeInvocations.get() ); - - synchronized ( mutex ) { - try { - mutex.wait(1000 ); - } - catch ( InterruptedException e ) { - //no op - } - } - } - } - } -}
