Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-614 36b5bad2d -> 70e0e75aa
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 9c0c62d..9a8a00f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -120,12 +120,9 @@ public class GraphManagerImpl implements GraphManager { @Inject public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final EdgeSerialization storageEdgeSerialization, - final NodeSerialization nodeSerialization, - final GraphFig graphFig, - final EdgeDeleteListener edgeDeleteListener, - final NodeDeleteListener nodeDeleteListener, - final ApplicationScope scope, - MetricsFactory metricsFactory) { + final NodeSerialization nodeSerialization, final GraphFig graphFig, + final EdgeDeleteListener edgeDeleteListener, final NodeDeleteListener nodeDeleteListener, + final ApplicationScope scope, MetricsFactory metricsFactory ) { ValidationUtils.validateApplicationScope( scope ); @@ -146,36 +143,34 @@ public class GraphManagerImpl implements GraphManager { this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE; this.nodeDelete = MetricSubscriber.INSTANCE; - this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter"); - this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer"); - this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter"); - this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer"); - this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter"); - this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer"); - this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter"); - this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer"); - this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter"); - this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer"); - this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter"); - this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer"); - this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter"); - this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer"); - this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter"); - this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer"); - - this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer"); - this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.from.meter"); - - this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer"); - this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter"); - - this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer"); - this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter"); - - this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer"); - this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter"); - - + this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "write.edge.meter" ); + this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge.timer" ); + this.deleteEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.edge.meter" ); + this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge.timer" ); + this.deleteNodeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.node.meter" ); + this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node.timer" ); + this.loadEdgesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.meter" ); + this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.timer" ); + this.loadEdgesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.meter" ); + this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.timer" ); + this.loadEdgesVersionsMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.versions.meter" ); + this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions.timer" ); + this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.type.meter" ); + this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type.timer" ); + this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.type.meter" ); + this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type.timer" ); + + this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from.timer" ); + this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.from.meter" ); + + this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from.timer" ); + this.getIdTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.from.meter" ); + + this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to.timer" ); + this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.to.meter" ); + + this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to.timer" ); + this.getIdTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.to.meter" ); } @@ -209,41 +204,39 @@ public class GraphManagerImpl implements GraphManager { return edge; } - } ) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( new Action1<Notification<? super Edge>>() { + @Override + public void call( Notification<? super Edge> notification ) { + meter.mark(); + } + } ).doOnCompleted( new Action0() { + @Override + public void call() { + timer.stop(); + } + } ); } @Override - public Observable<Edge> deleteEdge( final Edge edge ) { - GraphValidation.validateEdge(edge); + public Observable<Edge> markEdge( final Edge edge ) { + GraphValidation.validateEdge( edge ); - final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true); + final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); final Timer.Context timer = deleteEdgeTimer.time(); final Meter meter = deleteEdgeMeter; - return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() { + return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() { @Override - public Edge call(final MarkedEdge edge) { + public Edge call( final MarkedEdge edge ) { final UUID timestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp); + final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp ); - LOG.debug("Marking edge {} as deleted to commit log", edge); + LOG.debug( "Marking edge {} as deleted to commit log", edge ); try { edgeMutation.execute(); } @@ -254,73 +247,50 @@ public class GraphManagerImpl implements GraphManager { //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge, // timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber ); - edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io()) - .subscribe(edgeDeleteSubcriber); + edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() ) + .subscribe( edgeDeleteSubcriber ); return edge; } - }) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp ) { + public Observable<Id> markNode( final Id node, final long timestamp ) { final Timer.Context timer = deleteNodeTimer.time(); final Meter meter = deleteNodeMeter; - return Observable.just( node ).map( new Func1<Id, Id>() { - @Override - public Id call( final Id id ) { + return Observable.just( node ).map( id -> { - //mark the node as deleted + //mark the node as deleted - final UUID eventTimestamp = UUIDGenerator.newTimeUUID(); + final UUID eventTimestamp = UUIDGenerator.newTimeUUID(); - final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); + final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); - LOG.debug( "Marking node {} as deleted to node mark", node ); - try { - nodeMutation.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute mutation", e ); - } + LOG.debug( "Marking node {} as deleted to node mark", node ); + try { + nodeMutation.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute mutation", e ); + } - //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn( - // Schedulers.io() ).subscribe( nodeDelete ); - nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() ) - .subscribe( nodeDelete ); + //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn( + // Schedulers.io() ).subscribe( nodeDelete ); + nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() ) + .subscribe( nodeDelete ); - return id; - } - } ) - .doOnEach(new Action1<Notification<? super Id>>() { - @Override - public void call(Notification<? super Id> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + return id; + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -333,20 +303,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ) + .cast( Edge.class ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -359,20 +320,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSource( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -385,20 +337,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTarget( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -411,21 +354,12 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + .cast( Edge.class ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -438,20 +372,11 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); } - } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp())) - .cast(Edge.class) - .doOnEach(new Action1<Notification<? super Edge>>() { - @Override - public void call(Notification<? super Edge> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).buffer( graphFig.getScanPageSize() ) + .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class ) + .doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -464,19 +389,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -489,19 +404,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -514,19 +419,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -539,19 +434,9 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<String> getIterator() { return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); } - } ) - .doOnEach(new Action1<Notification<? super String>>() { - @Override - public void call(Notification<? super String> notification) { - meter.mark(); - } - }) - .doOnCompleted(new Action0() { - @Override - public void call() { - timer.stop(); - } - }); + } ).doOnEach( notification -> { + meter.mark(); + } ).doOnCompleted( () -> timer.stop() ); } @@ -561,10 +446,12 @@ public class GraphManagerImpl implements GraphManager { private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> { private final long maxVersion; + private final boolean filterMarked; - private EdgeBufferFilter( final long maxVersion ) { + private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { this.maxVersion = maxVersion; + this.filterMarked = filterMarked; } @@ -579,57 +466,36 @@ public class GraphManagerImpl implements GraphManager { public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) { final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - return Observable.from( markedEdges ).filter( new EdgeFilter( this.maxVersion, markedVersions ) ); - } - } - - - /** - * Filter the returned values based on the max uuid and if it's been marked for deletion or not - */ - private static class EdgeFilter implements Func1<MarkedEdge, Boolean> { - - private final long maxTimestamp; - - private final Map<Id, Long> markCache; - - - private EdgeFilter( final long maxTimestamp, Map<Id, Long> markCache ) { - this.maxTimestamp = maxTimestamp; - this.markCache = markCache; - } - - - @Override - public Boolean call( final MarkedEdge edge ) { - + final long maxTimestamp = maxVersion; - final long edgeTimestamp = edge.getTimestamp(); + return Observable.from( markedEdges ).filter( edge -> { + final long edgeTimestamp = edge.getTimestamp(); - //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { - return false; - } + //our edge needs to not be deleted and have a version that's > max Version + if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) { + return false; + } - final Long sourceTimestamp = markCache.get( edge.getSourceNode() ); + final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); - //the source Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { - return false; - } + //the source Id has been marked for deletion. It's version is <= to the marked version for deletion, + // so we need to discard it + if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { + return false; + } - final Long targetTimestamp = markCache.get( edge.getTargetNode() ); + final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); - //the target Id has been marked for deletion. It's version is <= to the marked version for deletion, - // so we need to discard it - if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { - return false; - } + //the target Id has been marked for deletion. It's version is <= to the marked version for deletion, + // so we need to discard it + if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { + return false; + } - return true; + return true; + } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java index a28a0bb..9a23caf 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java @@ -33,7 +33,6 @@ import com.google.common.base.Preconditions; /** * Simple bean implementation of search by edge - * */ public class SimpleSearchByEdge implements SearchByEdge { @@ -43,24 +42,44 @@ public class SimpleSearchByEdge implements SearchByEdge { private final long maxTimestamp; private final Optional<Edge> last; private final SearchByEdgeType.Order order; + private final boolean filterMarked; /** * Create the search modules + * * @param sourceNode The source node of the edge * @param targetNode The target node of the edge * @param type The edge type * @param maxTimestamp The maximum timestamp to seek from * @param last The value to start seeking from. Must be >= this value */ - public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Optional<Edge> last ) { + public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, + final SearchByEdgeType.Order order, final Optional<Edge> last ) { + this( sourceNode, type, targetNode, maxTimestamp, order, last, true ); + } + + + /** + * Create the search modules + * + * @param sourceNode The source node of the edge + * @param type The edge type + * @param targetNode The target node of the edge + * @param maxTimestamp The maximum timestamp to seek from + * @param last The value to start seeking from. Must be >= this value + */ + public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, + final SearchByEdgeType.Order order, final Optional<Edge> last, + final boolean filterMarked ) { - ValidationUtils.verifyIdentity(sourceNode); - ValidationUtils.verifyIdentity(targetNode); - ValidationUtils.verifyString(type, "type"); - GraphValidation.validateTimestamp(maxTimestamp, "maxTimestamp"); - Preconditions.checkNotNull(order, "order must not be null"); - Preconditions.checkNotNull(last, "last can never be null"); + + ValidationUtils.verifyIdentity( sourceNode ); + ValidationUtils.verifyIdentity( targetNode ); + ValidationUtils.verifyString( type, "type" ); + GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" ); + Preconditions.checkNotNull( order, "order must not be null" ); + Preconditions.checkNotNull( last, "last can never be null" ); this.sourceNode = sourceNode; @@ -69,6 +88,7 @@ public class SimpleSearchByEdge implements SearchByEdge { this.maxTimestamp = maxTimestamp; this.order = order; this.last = last; + this.filterMarked = filterMarked; } @@ -97,6 +117,10 @@ public class SimpleSearchByEdge implements SearchByEdge { @Override + public boolean filterMarked() { return filterMarked; } + + + @Override public Optional<Edge> last() { return last; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java index 1687162..9392dbc 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java @@ -41,6 +41,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ private final long maxTimestamp; private final Optional<Edge> last; private final Order order; + private final boolean filterMarked; /** @@ -55,7 +56,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ * //TODO, make last an optional */ public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last ) { - this(node, type, maxTimestamp, order, Optional.fromNullable(last)); + this(node, type, maxTimestamp, order, Optional.fromNullable(last), true); } @@ -70,7 +71,24 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ * * //TODO, make last an optional */ - public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Optional<Edge> last ) { + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, + final Optional<Edge> last ) { + this( node, type, maxTimestamp, order, last, true ); + } + + + /** + * Create the search modules + * @param node The node to search from + * @param type The edge type + * @param maxTimestamp The maximum timestamp to return + * @param order The order order. Descending is most efficient + * @param last The value to start seeking from. Must be >= this value + * @param filterMarked + */ + public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, + final Optional<Edge> last, final boolean filterMarked ) { + Preconditions.checkNotNull( order, "order is required"); ValidationUtils.verifyIdentity( node ); @@ -84,6 +102,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ this.maxTimestamp = maxTimestamp; this.order = order; this.last = last; + this.filterMarked = filterMarked; } @@ -118,6 +137,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{ @Override + public boolean filterMarked() { + return filterMarked; + } + + + @Override public boolean equals( final Object o ) { if ( this == o ) { return true; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 6135121..619e65d 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -29,7 +29,9 @@ import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.Nullable; @@ -37,13 +39,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.consistency.TimeService; +import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; @@ -63,6 +63,8 @@ import com.google.common.hash.PrimitiveSink; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; @@ -85,7 +87,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { private static final HashFunction MURMUR_128 = Hashing.murmur3_128(); - private final TaskExecutor taskExecutor; + private final ListeningExecutorService taskExecutor; private final TimeService timeService; private final GraphFig graphFig; private final NodeShardAllocation nodeShardAllocation; @@ -104,8 +106,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final NodeShardAllocation nodeShardAllocation, final ShardedEdgeSerialization shardedEdgeSerialization, final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace, - final EdgeShardSerialization edgeShardSerialization, - @GraphTaskExecutor final TaskExecutor taskExecutor ) { + final EdgeShardSerialization edgeShardSerialization) { this.timeService = timeService; this.graphFig = graphFig; @@ -119,7 +120,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.shardCompactionTaskTracker = new ShardCompactionTaskTracker(); this.shardAuditTaskTracker = new ShardAuditTaskTracker(); - this.taskExecutor = taskExecutor; + + this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory + .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(), + graphFig.getShardAuditWorkerQueueSize() ) ); } @@ -139,8 +143,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { Preconditions.checkNotNull( group, "group cannot be null" ); Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" ); - Preconditions.checkArgument( group.shouldCompact( startTime ), - "Compaction cannot be run yet. Ignoring compaction." ); + Preconditions + .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." ); final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder(); @@ -170,8 +174,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { for ( Shard sourceShard : sourceShards ) { - Iterator<MarkedEdge> edges = edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, - Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); + Iterator<MarkedEdge> edges = edgeMeta + .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ), + Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); while ( edges.hasNext() ) { final MarkedEdge edge = edges.next(); @@ -186,13 +191,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - newRowBatch.mergeShallow( - edgeMeta.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, - timestamp ) ); + newRowBatch.mergeShallow( edgeMeta + .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, + timestamp ) ); - deleteRowBatch.mergeShallow( - edgeMeta.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, - timestamp ) ); + deleteRowBatch.mergeShallow( edgeMeta + .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, + timestamp ) ); edgeCount++; @@ -298,7 +303,18 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { * Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to * hose the system */ - ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) ); + final ListenableFuture<AuditResult> future; + + try { + future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) ); + } + catch ( RejectedExecutionException ree ) { + + //ignore, if this happens we don't care, we're saturated, we can check later + LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); + + return Futures.immediateFuture( AuditResult.NOT_CHECKED ); + } /** * Log our success or failures for debugging purposes @@ -320,7 +336,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - private final class ShardAuditTask implements Task<AuditResult> { + private final class ShardAuditTask implements Callable<AuditResult> { private final ApplicationScope scope; private final DirectedEdgeMeta edgeMeta; @@ -334,20 +350,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.group = group; } - @Override - public void exceptionThrown( final Throwable throwable ) { - LOG.error( "Unable to execute audit for shard of {}", throwable ); - } - - - @Override - public AuditResult rejected() { - //ignore, if this happens we don't care, we're saturated, we can check later - LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); - - return AuditResult.NOT_CHECKED; - } - @Override public AuditResult call() throws Exception { @@ -401,10 +403,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { */ try { CompactionResult result = compact( scope, edgeMeta, group ); - LOG.info( - "Compaction result for compaction of scope {} with edge meta data of {} and shard group " + - "{} is {}", - new Object[] { scope, edgeMeta, group, result } ); + LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group " + + "{} is {}", new Object[] { scope, edgeMeta, group, result } ); } finally { shardCompactionTaskTracker.complete( scope, edgeMeta, group ); @@ -418,8 +418,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - - /** * Inner class used to track running tasks per instance */ @@ -534,8 +532,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - - public static final class CompactionResult { public final long copiedEdges; @@ -566,12 +562,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { @Override public String toString() { return "CompactionResult{" + - "copiedEdges=" + copiedEdges + - ", targetShard=" + targetShard + - ", sourceShards=" + sourceShards + - ", removedShards=" + removedShards + - ", compactedShard=" + compactedShard + - '}'; + "copiedEdges=" + copiedEdges + + ", targetShard=" + targetShard + + ", sourceShards=" + sourceShards + + ", removedShards=" + removedShards + + ", compactedShard=" + compactedShard + + '}'; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java index b471119..658f4bf 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java @@ -66,14 +66,14 @@ public class CommittedGraphManagerIT extends GraphManagerIT { @Override - public Observable<Edge> deleteEdge( final Edge edge ) { - return graphManager.deleteEdge( edge ); + public Observable<Edge> markEdge( final Edge edge ) { + return graphManager.markEdge( edge ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp) { - return graphManager.deleteNode( node, timestamp ); + public Observable<Id> markNode( final Id node, final long timestamp ) { + return graphManager.markNode( node, timestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index 2ed4f13..340c712 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -584,7 +584,7 @@ public abstract class GraphManagerIT { //now delete it - returned = gm.deleteEdge( edge ).toBlocking().last(); + returned = gm.markEdge( edge ).toBlocking().last(); //now test retrieval, should be null @@ -645,7 +645,7 @@ public abstract class GraphManagerIT { //now delete it - gm.deleteEdge( edge ).toBlocking().last(); + gm.markEdge( edge ).toBlocking().last(); //now test retrieval, should be null edges = gm.loadEdgesToTarget( search ); @@ -1005,7 +1005,7 @@ public abstract class GraphManagerIT { System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); @@ -1025,7 +1025,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); @@ -1081,7 +1081,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); @@ -1096,7 +1096,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); @@ -1146,7 +1146,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesFromSourceByType( @@ -1171,7 +1171,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesFromSourceByType( @@ -1223,7 +1223,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesToTargetByType( @@ -1249,7 +1249,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesToTargetByType( @@ -1320,7 +1320,7 @@ public abstract class GraphManagerIT { assertFalse( "No more edges", results.hasNext() ); //mark the source node - gm.deleteNode( sourceId, edge2.getTimestamp() ).toBlocking().last(); + gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last(); //now re-read, nothing should be there since they're marked @@ -1402,7 +1402,7 @@ public abstract class GraphManagerIT { assertFalse( "No more edges", results.hasNext() ); //mark the source node - gm.deleteNode( targetId, edge2.getTimestamp() ).toBlocking().last(); + gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last(); //now re-read, nothing should be there since they're marked @@ -1641,7 +1641,7 @@ public abstract class GraphManagerIT { public void invalidEdgeTypesDelete( ) { final GraphManager em = emf.createEdgeManager( scope ); - em.deleteEdge( null ); + em.markEdge( null ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java index 6f3d388..84825aa 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java @@ -116,16 +116,16 @@ public class StorageGraphManagerIT extends GraphManagerIT { @Override - public Observable<Edge> deleteEdge( final Edge edge ) { + public Observable<Edge> markEdge( final Edge edge ) { waitForComplete(); - return graphManager.deleteEdge( edge ); + return graphManager.markEdge( edge ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp ) { + public Observable<Id> markNode( final Id node, final long timestamp ) { waitForComplete(); - return graphManager.deleteNode( node, timestamp ); + return graphManager.markNode( node, timestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java index 68b6f8f..7d4b7f6 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java @@ -32,14 +32,12 @@ import org.junit.Test; import org.apache.usergrid.persistence.core.consistency.TimeService; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.core.util.IdGenerator; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl; import com.netflix.astyanax.Keyspace; -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -79,8 +77,6 @@ public class ShardGroupCompactionTest { final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class ); - final TaskExecutor taskExecutor = mock( TaskExecutor.class ); - final long delta = 10000; final long createTime = 20000; @@ -97,7 +93,7 @@ public class ShardGroupCompactionTest { ShardGroupCompactionImpl compaction = new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization, - edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor ); + edgeColumnFamilies, keyspace, edgeShardSerialization ); DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
