Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-614 b54d86824 -> 99cb70052


Completes refactor of compact/delete operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/99cb7005
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/99cb7005
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/99cb7005

Branch: refs/heads/USERGRID-614
Commit: 99cb70052c83e866f2872a894cac12e84f20109e
Parents: b54d868
Author: Todd Nine <[email protected]>
Authored: Sun May 10 04:08:16 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Sun May 10 04:08:16 2015 -0600

----------------------------------------------------------------------
 .../persistence/graph/GraphManager.java         |  27 +-
 .../graph/impl/GraphManagerImpl.java            | 451 ++++++++-----------
 .../graph/impl/stage/EdgeDeleteListener.java    |   2 +-
 .../impl/stage/EdgeDeleteListenerImpl.java      |  31 +-
 .../graph/impl/stage/EdgeMetaRepair.java        |   6 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    | 196 ++++----
 .../graph/impl/stage/NodeDeleteListener.java    |   2 +-
 .../graph/CommittedGraphManagerIT.java          | 135 ------
 .../persistence/graph/GraphManagerIT.java       |  67 ++-
 .../graph/StorageGraphManagerIT.java            | 240 ----------
 10 files changed, 346 insertions(+), 811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 987a36c..6100725 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -27,8 +27,7 @@ import rx.Observable;
 
 
 /**
- * Represents operations that can be performed on edges within our graph.  A 
graph should be within an
- * ApplicationScope
+ * Represents operations that can be performed on edges within our graph.  A 
graph should be within an ApplicationScope
  *
  * An Edge: is defined as the following.
  *
@@ -67,25 +66,37 @@ public interface GraphManager extends CPManager {
      * @param edge Mark the edge as deleted in the graph
      *
      *
-     * EdgeDelete the edge. Implementation should also delete the incoming 
(reversed) edge. Only deletes the specific version
+     * Implementation should also mark the incoming (reversed) edge. Only 
marks the specific version
      */
     Observable<Edge> markEdge( Edge edge );
 
     /**
+     * @param edge Remove the edge in the graph
+     *
      *
+     * EdgeDelete the edge. Implementation should also delete the incoming 
(reversed) edge. Only deletes the specific version
+     * Will only delete if the edge has marked versions
+     */
+    Observable<Edge> deleteEdge( Edge edge );
+
+    /**
      * Mark the node as removed from the graph.
      *
      * @param node The node to remove
-     * @param timestamp The timestamp to apply the delete operation.  Any 
edges connected to this node with a timestmap
-     * <= the specified time will be removed from the graph
-     * @return
+     * @param timestamp The timestamp to apply the mark operation.
      */
     Observable<Id> markNode( Id node, long timestamp );
 
     /**
+     * Mark the node as removed from the graph.
+     *
+     * @param node The node to remove.  This will apply a timestamp to apply 
the delete + compact operation.  Any edges connected to this node with a 
timestamp
+     * <= the specified time on the mark will be removed from the graph
+     */
+    Observable<Id> compactNode( final Id node );
+
+    /**
      * Get all versions of this edge where versions <= max version
-     * @param edge
-     * @return
      */
     Observable<Edge> loadEdgeVersions( SearchByEdge edge );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 9a8a00f..06cb5a1 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -52,19 +53,14 @@ import 
org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
-import rx.Notification;
 import rx.Observable;
-import rx.Observer;
-import rx.Subscriber;
-import rx.functions.Action0;
-import rx.functions.Action1;
 import rx.functions.Func1;
-import rx.schedulers.Schedulers;
 
 
 /**
@@ -86,32 +82,19 @@ public class GraphManagerImpl implements GraphManager {
     private final EdgeDeleteListener edgeDeleteListener;
     private final NodeDeleteListener nodeDeleteListener;
     private final Timer writeEdgeTimer;
-    private final Meter writeEdgeMeter;
-    private final Meter deleteEdgeMeter;
-    private final Timer deleteEdgeTimer;
-    private final Meter deleteNodeMeter;
-    private final Timer deleteNodeTimer;
-    private final Meter loadEdgesFromSourceMeter;
+    private final Timer markEdgeTimer;
+    private final Timer markNodeTimer;
     private final Timer loadEdgesFromSourceTimer;
-    private final Meter loadEdgesToTargetMeter;
     private final Timer loadEdgesToTargetTimer;
-    private final Meter loadEdgesVersionsMeter;
     private final Timer loadEdgesVersionsTimer;
-    private final Meter loadEdgesFromSourceByTypeMeter;
     private final Timer loadEdgesFromSourceByTypeTimer;
-    private final Meter loadEdgesToTargetByTypeMeter;
     private final Timer loadEdgesToTargetByTypeTimer;
     private final Timer getEdgeTypesFromSourceTimer;
-    private final Meter getEdgeTypesFromSourceMeter;
     private final Timer getIdTypesFromSourceTimer;
-    private final Meter getIdTypesFromSourceMeter;
-    private final Meter getEdgeTypesToTargetMeter;
     private final Timer getEdgeTypesToTargetTimer;
     private final Timer getIdTypesToTargetTimer;
-    private final Meter getIdTypesToTargetMeter;
-
-    private Observer<Integer> edgeDeleteSubcriber;
-    private Observer<Integer> nodeDelete;
+    private final Timer deleteNodeTimer;
+    private final Timer deleteEdgeTimer;
 
 
     private final GraphFig graphFig;
@@ -141,36 +124,24 @@ public class GraphManagerImpl implements GraphManager {
         this.edgeDeleteListener = edgeDeleteListener;
         this.nodeDeleteListener = nodeDeleteListener;
 
-        this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
-        this.nodeDelete = MetricSubscriber.INSTANCE;
-        this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, 
"write.edge.meter" );
-        this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, 
"write.edge.timer" );
-        this.deleteEdgeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "delete.edge.meter" );
-        this.deleteEdgeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.edge.timer" );
-        this.deleteNodeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "delete.node.meter" );
-        this.deleteNodeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.node.timer" );
-        this.loadEdgesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.from.meter" );
-        this.loadEdgesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from.timer" );
-        this.loadEdgesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.to.meter" );
-        this.loadEdgesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to.timer" );
-        this.loadEdgesVersionsMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.versions.meter" );
-        this.loadEdgesVersionsTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.versions.timer" );
-        this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.from.type.meter" );
-        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from.type.timer" );
-        this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.to.type.meter" );
-        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to.type.timer" );
-
-        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.from.timer" );
-        this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.edge.from.meter" );
-
-        this.getIdTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.from.timer" );
-        this.getIdTypesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.idtype.from.meter" );
-
-        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.to.timer" );
-        this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.edge.to.meter" );
-
-        this.getIdTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.to.timer" );
-        this.getIdTypesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.idtype.to.meter" );
+        this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, 
"write.edge" );
+        this.markEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, 
"mark.edge" );
+        this.deleteEdgeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.edge" );
+        this.markNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, 
"mark.node" );
+        this.deleteNodeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.node" );
+        this.loadEdgesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from" );
+        this.loadEdgesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to" );
+        this.loadEdgesVersionsTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.versions" );
+        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from.type" );
+        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to.type" );
+
+        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.from" );
+
+        this.getIdTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.from" );
+
+        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.to" );
+
+        this.getIdTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.to" );
     }
 
 
@@ -179,42 +150,29 @@ public class GraphManagerImpl implements GraphManager {
         GraphValidation.validateEdge( edge );
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
-        final Timer.Context timer = writeEdgeTimer.time();
-        final Meter meter = writeEdgeMeter;
-
-        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, 
Edge>() {
-            @Override
-            public Edge call( final MarkedEdge edge ) {
 
-                final UUID timestamp = UUIDGenerator.newTimeUUID();
+        final Observable<Edge> observable = Observable.just( markedEdge ).map( 
edge1 -> {
 
+            final UUID timestamp = UUIDGenerator.newTimeUUID();
 
-                final MutationBatch mutation = 
edgeMetadataSerialization.writeEdge( scope, edge );
 
-                final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+            final MutationBatch mutation = 
edgeMetadataSerialization.writeEdge( scope, edge1 );
 
-                mutation.mergeShallow( edgeMutation );
+            final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge( scope, edge1, timestamp );
 
-                try {
-                    mutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute mutation", 
e );
-                }
+            mutation.mergeShallow( edgeMutation );
 
-                return edge;
-            }
-        } ).doOnEach( new Action1<Notification<? super Edge>>() {
-            @Override
-            public void call( Notification<? super Edge> notification ) {
-                meter.mark();
+            try {
+                mutation.execute();
             }
-        } ).doOnCompleted( new Action0() {
-            @Override
-            public void call() {
-                timer.stop();
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to execute mutation", e );
             }
+
+            return edge1;
         } );
+
+        return ObservableTimer.time( observable, writeEdgeTimer );
     }
 
 
@@ -224,52 +182,64 @@ public class GraphManagerImpl implements GraphManager {
 
         final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
 
-        final Timer.Context timer = deleteEdgeTimer.time();
-        final Meter meter = deleteEdgeMeter;
-        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, 
Edge>() {
-            @Override
-            public Edge call( final MarkedEdge edge ) {
+        final Observable<Edge> observable = Observable.just( markedEdge ).map( 
edge1 -> {
 
-                final UUID timestamp = UUIDGenerator.newTimeUUID();
+            final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge( scope, edge, timestamp );
+            final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge( scope, edge1, timestamp );
 
 
-                LOG.debug( "Marking edge {} as deleted to commit log", edge );
-                try {
-                    edgeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute mutation", 
e );
-                }
+            LOG.debug( "Marking edge {} as deleted to commit log", edge1 );
+            try {
+                edgeMutation.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to execute mutation", e );
+            }
 
 
-                //HystrixCassandra.async( edgeDeleteListener.receive( scope, 
markedEdge,
-                // timestamp )).subscribeOn( Schedulers.io() ).subscribe( 
edgeDeleteSubcriber );
-                edgeDeleteListener.receive( scope, markedEdge, timestamp 
).subscribeOn( Schedulers.io() )
-                                  .subscribe( edgeDeleteSubcriber );
+            return edge1;
+        } );
 
+        return ObservableTimer.time( observable, markEdgeTimer );
+    }
 
-                return edge;
-            }
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+
+    @Override
+    public Observable<Edge> deleteEdge( final Edge edge ) {
+
+        GraphValidation.validateEdge( edge );
+        final UUID startTimestamp = UUIDGenerator.newTimeUUID();
+
+
+        final Observable<Edge> observable =
+            Observable.create( new ObservableIterator<MarkedEdge>( "read edge 
versions" ) {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgeVersions( scope,
+                        new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(),
+                            Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, 
Optional.absent() ) );
+                }
+            } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked 
->
+                //fire our delete listener and wait for the results
+                edgeDeleteListener.receive( scope, marked, startTimestamp 
).doOnNext(
+                    //log them
+                    count -> LOG.debug( "removed {} types for edge {} ", 
count, edge ) )
+                    //return the marked edge
+                    .map( count -> marked ) );
+
+
+        return ObservableTimer.time( observable, deleteEdgeTimer );
     }
 
 
     @Override
     public Observable<Id> markNode( final Id node, final long timestamp ) {
-        final Timer.Context timer = deleteNodeTimer.time();
-        final Meter meter = deleteNodeMeter;
-        return Observable.just( node ).map( id -> {
+        final Observable<Id> idObservable = Observable.just( node ).map( id -> 
{
 
             //mark the node as deleted
 
-
-            final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
-
             final MutationBatch nodeMutation = nodeSerialization.mark( scope, 
id, timestamp );
 
 
@@ -282,161 +252,166 @@ public class GraphManagerImpl implements GraphManager {
             }
 
 
-            //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, 
eventTimestamp  )).subscribeOn(
-            // Schedulers.io() ).subscribe( nodeDelete );
-            nodeDeleteListener.receive( scope, id, eventTimestamp 
).subscribeOn( Schedulers.io() )
-                              .subscribe( nodeDelete );
-
             return id;
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+        } );
+
+        return ObservableTimer.time( idObservable, markNodeTimer );
+    }
+
+
+    @Override
+    public Observable<Id> compactNode( final Id inputNode ) {
+
+
+        final UUID startTime = UUIDGenerator.newTimeUUID();
+
+
+        final Observable<Id> nodeObservable =
+            Observable.just( inputNode ).map( node -> 
nodeSerialization.getMaxVersion( scope, inputNode ) ).takeWhile(
+                maxTimestamp -> maxTimestamp.isPresent() )
+
+                //map our delete listener
+                .flatMap( timestamp -> nodeDeleteListener.receive( scope, 
inputNode, startTime ) )
+                    //set to 0 if nothing is emitted
+                .lastOrDefault( 0 )
+                    //log for posterity
+                .doOnNext( count -> LOG.debug( "Removed {} edges from node 
{}", count, inputNode ) )
+                    //return our id
+                .map( count -> inputNode );
+
+        return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
     }
 
 
     @Override
     public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge 
) {
-        final Timer.Context timer = loadEdgesVersionsTimer.time();
-        final Meter meter = loadEdgesVersionsMeter;
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return storageEdgeSerialization.getEdgeVersions( scope, 
searchByEdge );
-            }
-        } ).buffer( graphFig.getScanPageSize() )
-                         .flatMap( new EdgeBufferFilter( 
searchByEdge.getMaxTimestamp(),  searchByEdge.filterMarked() ) )
-                         .cast( Edge.class ).doOnEach( notification -> {
-                meter.mark();
-            } ).doOnCompleted( () -> timer.stop() );
+
+        final Observable<Edge> edges =
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgeVersions( scope, 
searchByEdge );
+                }
+            } ).buffer( graphFig.getScanPageSize() )
+                      .flatMap( new EdgeBufferFilter( 
searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+
+        return ObservableTimer.time( edges, loadEdgesVersionsTimer );
     }
 
 
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search 
) {
-        final Timer.Context timer = loadEdgesFromSourceTimer.time();
-        final Meter meter = loadEdgesFromSourceMeter;
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return storageEdgeSerialization.getEdgesFromSource( scope, 
search );
-            }
-        } ).buffer( graphFig.getScanPageSize() )
-                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
-                         .doOnEach( notification -> {
-                             meter.mark();
-                         } ).doOnCompleted( () -> timer.stop() );
+        final Observable<Edge> edges =
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgesFromSource( scope, 
search );
+                }
+            } ).buffer( graphFig.getScanPageSize() )
+                      .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) );
+
+        return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
 
 
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) 
{
-        final Timer.Context timer = loadEdgesToTargetTimer.time();
-        final Meter meter = loadEdgesToTargetMeter;
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return storageEdgeSerialization.getEdgesToTarget( scope, 
search );
-            }
-        } ).buffer( graphFig.getScanPageSize() )
-                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
-                         .doOnEach( notification -> {
-                             meter.mark();
-                         } ).doOnCompleted( () -> timer.stop() );
+        final Observable<Edge> edges =
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+                @Override
+                protected Iterator<MarkedEdge> getIterator() {
+                    return storageEdgeSerialization.getEdgesToTarget( scope, 
search );
+                }
+            } ).buffer( graphFig.getScanPageSize() )
+                      .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) );
+
+
+        return ObservableTimer.time( edges, loadEdgesToTargetTimer );
     }
 
 
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType 
search ) {
-        final Timer.Context timer = loadEdgesFromSourceByTypeTimer.time();
-        final Meter meter = loadEdgesFromSourceByTypeMeter;
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return 
storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
-            }
-        } ).buffer( graphFig.getScanPageSize() )
-                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) )
-
-                         .cast( Edge.class ).doOnEach( notification -> {
-                meter.mark();
-            } ).doOnCompleted( () -> timer.stop() );
+        final Observable<Edge> edges =
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+                    @Override
+                    protected Iterator<MarkedEdge> getIterator() {
+                        return 
storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
+                    }
+                } ).buffer( graphFig.getScanPageSize() )
+                      .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) );
+
+        return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
 
 
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType 
search ) {
-        final Timer.Context timer = loadEdgesToTargetByTypeTimer.time();
-        final Meter meter = loadEdgesToTargetByTypeMeter;
-        return Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return storageEdgeSerialization.getEdgesToTargetBySourceType( 
scope, search );
-            }
-        } ).buffer( graphFig.getScanPageSize() )
-                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
-                         .doOnEach( notification -> {
-                             meter.mark();
-                         } ).doOnCompleted( () -> timer.stop() );
+        final Observable<Edge> edges =
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+                    @Override
+                    protected Iterator<MarkedEdge> getIterator() {
+                        return 
storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
+                    }
+                } ).buffer( graphFig.getScanPageSize() )
+                      .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) );
+
+        return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
     }
 
 
     @Override
     public Observable<String> getEdgeTypesFromSource( final SearchEdgeType 
search ) {
-        final Timer.Context timer = getEdgeTypesFromSourceTimer.time();
-        final Meter meter = getEdgeTypesFromSourceMeter;
-        return Observable.create( new ObservableIterator<String>( 
"getEdgeTypesFromSource" ) {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesFromSource( 
scope, search );
-            }
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+       final Observable<String> edgeTypes = Observable.create(
+            new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization.getEdgeTypesFromSource( 
scope, search );
+                }
+            } );
+
+        return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
     }
 
 
     @Override
     public Observable<String> getIdTypesFromSource( final SearchIdType search 
) {
-        final Timer.Context timer = getIdTypesFromSourceTimer.time();
-        final Meter meter = getIdTypesFromSourceMeter;
-        return Observable.create( new ObservableIterator<String>( 
"getIdTypesFromSource" ) {
+        final Observable<String> edgeTypes =  Observable.create( new 
ObservableIterator<String>( "getIdTypesFromSource" ) {
             @Override
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, 
search );
             }
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+        } );
+
+        return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer );
     }
 
 
     @Override
     public Observable<String> getEdgeTypesToTarget( final SearchEdgeType 
search ) {
-        final Timer.Context timer = getEdgeTypesToTargetTimer.time();
-        final Meter meter = getEdgeTypesToTargetMeter;
-        return Observable.create( new ObservableIterator<String>( 
"getEdgeTypesToTarget" ) {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, 
search );
-            }
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+        final Observable<String> edgeTypes =   Observable.create(
+            new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization.getEdgeTypesToTarget( 
scope, search );
+                }
+            } );
+
+        return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer );
     }
 
 
     @Override
     public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        final Timer.Context timer = getIdTypesToTargetTimer.time();
-        final Meter meter = getIdTypesToTargetMeter;
-        return Observable.create( new ObservableIterator<String>( 
"getIdTypesToTarget" ) {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getIdTypesToTarget( scope, 
search );
-            }
-        } ).doOnEach( notification -> {
-            meter.mark();
-        } ).doOnCompleted( () -> timer.stop() );
+        final Observable<String> edgeTypes =  Observable.create(
+            new ObservableIterator<String>( "getIdTypesToTarget" ) {
+                @Override
+                protected Iterator<String> getIterator() {
+                    return edgeMetadataSerialization.getIdTypesToTarget( 
scope, search );
+                }
+            } );
+
+        return ObservableTimer.time( edgeTypes, getIdTypesToTargetTimer );
     }
 
 
@@ -472,7 +447,7 @@ public class GraphManagerImpl implements GraphManager {
                 final long edgeTimestamp = edge.getTimestamp();
 
                 //our edge needs to not be deleted and have a version that's > 
max Version
-                if ( edge.isDeleted() || Long.compare( edgeTimestamp, 
maxTimestamp ) > 0 ) {
+                if (( edge.isDeleted() && filterMarked) || Long.compare( 
edgeTimestamp, maxTimestamp ) > 0 ) {
                     return false;
                 }
 
@@ -498,52 +473,4 @@ public class GraphManagerImpl implements GraphManager {
             } );
         }
     }
-
-
-    /**
-     * Set the subscription for the edge delete
-     */
-    public void setEdgeDeleteSubcriber( final Observer<Integer> 
edgeDeleteSubcriber ) {
-        Preconditions.checkNotNull( edgeDeleteSubcriber, "Subscriber cannot be 
null" );
-        this.edgeDeleteSubcriber = edgeDeleteSubcriber;
-    }
-
-
-    /**
-     * Set the subscription for the node delete
-     */
-    public void setNodeDelete( final Observer<Integer> nodeDelete ) {
-        Preconditions.checkNotNull( nodeDelete, "Subscriber cannot be null" );
-        this.nodeDelete = nodeDelete;
-    }
-
-
-    /**
-     * Simple subscriber that can be used to gather metrics.  Needs to be 
refactored to use codehale metrics
-     */
-    private static class MetricSubscriber extends Subscriber<Integer> {
-
-
-        private static final MetricSubscriber INSTANCE = new 
MetricSubscriber();
-
-        private final Logger logger = LoggerFactory.getLogger( 
MetricSubscriber.class );
-
-
-        @Override
-        public void onCompleted() {
-            logger.debug( "Event completed" );
-        }
-
-
-        @Override
-        public void onError( final Throwable e ) {
-            logger.error( "Failed to execute event", e );
-        }
-
-
-        @Override
-        public void onNext( final Integer integer ) {
-            logger.debug( "Next received {}", integer );
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java
index 4b53940..d8a8896 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListener.java
@@ -40,7 +40,7 @@ public interface EdgeDeleteListener {
      * @param eventTimestamp
      * @return
      */
-    public Observable<Integer> receive( final ApplicationScope scope, final 
MarkedEdge edge, final UUID eventTimestamp );
+    Observable<Integer> receive( final ApplicationScope scope, final 
MarkedEdge edge, final UUID eventTimestamp );
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
index 7e42293..22a5ad0 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
@@ -61,27 +61,16 @@ public class EdgeDeleteListenerImpl implements 
EdgeDeleteListener {
 
 
         return edgeDeleteRepair.repair( scope, edge, eventTimestamp )
-                               .flatMap( new Func1<MarkedEdge, 
Observable<Integer>>() {
-                                   @Override
-                                   public Observable<Integer> call( final 
MarkedEdge markedEdge ) {
-
-                                       Observable<Integer> sourceDelete = 
edgeMetaRepair
-                                               .repairSources( scope, 
edge.getSourceNode(), edge.getType(),
-                                                       maxTimestamp );
-
-                                       Observable<Integer> targetDelete = 
edgeMetaRepair
-                                               .repairTargets( scope, 
edge.getTargetNode(), edge.getType(),
-                                                       maxTimestamp );
-
-                                       return Observable.zip( sourceDelete, 
targetDelete,
-                                               new Func2<Integer, Integer, 
Integer>() {
-                                                   @Override
-                                                   public Integer call( final 
Integer sourceCount,
-                                                                        final 
Integer targetCount ) {
-                                                       return sourceCount + 
targetCount;
-                                                   }
-                                               } );
-                                   }
+                               .flatMap( markedEdge -> {
+
+                                   Observable<Integer> sourceDelete = 
edgeMetaRepair
+                                           .repairSources( scope, 
edge.getSourceNode(), edge.getType(), maxTimestamp );
+
+                                   Observable<Integer> targetDelete = 
edgeMetaRepair
+                                           .repairTargets( scope, 
edge.getTargetNode(), edge.getType(), maxTimestamp );
+
+                                   return Observable.zip( sourceDelete, 
targetDelete,
+                                       ( sourceCount, targetCount ) -> 
sourceCount + targetCount );
                                } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
index 5ba822b..efd88fc 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepair.java
@@ -27,7 +27,7 @@ import rx.Observable;
 
 
 /**
- * Audits edge meta data and removes them if they're obscelete
+ * Audits edge meta data and removes them if they're obsolete
  */
 public interface EdgeMetaRepair {
 
@@ -42,7 +42,7 @@ public interface EdgeMetaRepair {
      * @return An observable that emits the total number of sub types still in 
use.  0 implies the type and subtypes
      *         have been removed.  Anything > 0 implies the edgeType and 
subTypes are still in use
      */
-    public Observable<Integer> repairSources( ApplicationScope scope, Id 
sourceId, String edgeType, long maxTimestamp );
+    Observable<Integer> repairSources( ApplicationScope scope, Id sourceId, 
String edgeType, long maxTimestamp );
 
 
     /**
@@ -56,5 +56,5 @@ public interface EdgeMetaRepair {
      * @return An observable that emits the total number of sub types still in 
use.  0 implies the type and subtypes
      *         have been removed.  Anything > 0 implies the edgeType and 
subTypes are still in use
      */
-    public Observable<Integer> repairTargets( ApplicationScope scope, Id 
targetId, String edgeType, long maxTimestamp );
+    Observable<Integer> repairTargets( ApplicationScope scope, Id targetId, 
String edgeType, long maxTimestamp );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 5e76f75..206145a 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -118,91 +118,79 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair 
{
         Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( 
scope, node, edgeType, maxTimestamp ).buffer(
                 graphFig.getRepairConcurrentSize() )
                 //buffer them into concurrent groups based on the concurrent 
repair size
-                .flatMap( new Func1<List<String>, Observable<Integer>>() {
-
-                    @Override
-                    public Observable<Integer> call( final List<String> types 
) {
-
-
-                        final MutationBatch batch = 
keyspace.prepareMutationBatch();
-
-                        final List<Observable<Integer>> checks = new 
ArrayList<Observable<Integer>>( types.size() );
-
-                        //for each id type, check if the exist in parallel to 
increase processing speed
-                        for ( final String subType : types ) {
-
-                            LOG.debug( "Checking for edges with nodeId {}, 
type {}, and subtype {}", node, edgeType,
-                                    subType );
-
-                            Observable<Integer> search =
-                                    //load each edge in it's own thread
-                                    serialization.loadEdges( scope, node, 
edgeType, subType, maxTimestamp )
-                                                 .doOnNext( RX_LOG ).take( 1 
).count()
-                                                 .doOnNext( new 
Action1<Integer>() {
-
-                                                     @Override
-                                                     public void call( final 
Integer count ) {
-                                                         /**
-                                                          * we only want to 
delete if no edges are in this class. If
-                                                          * there
-                                                          * are
-                                                          * still edges
-                                                          * we must retain the 
information in order to keep our index
-                                                          * structure
-                                                          * correct for edge
-                                                          * iteration
-                                                          **/
-                                                         if ( count != 0 ) {
-                                                             LOG.debug( "Found 
edge with nodeId {}, type {}, "
-                                                                             + 
"and subtype {}. Not removing subtype. ",
-                                                                     node, 
edgeType, subType );
-                                                             return;
-                                                         }
-
-
-                                                         LOG.debug( "No edges 
with nodeId {}, type {}, "
-                                                                         + 
"and subtype {}. Removing subtype.", node,
-                                                                 edgeType, 
subType );
-                                                         batch.mergeShallow( 
serialization
-                                                                 
.removeEdgeSubType( scope, node, edgeType, subType,
-                                                                         
maxTimestamp ) );
-                                                     }
-                                                 } );
-
-                            checks.add( search );
-                        }
-
-
-                        /**
-                         * Sum up the total number of edges we had, then 
execute the mutation if we have
-                         * anything to do
-                         */
-
-
-                        return MathObservable.sumInteger( Observable.merge( 
checks ) )
-                                             .doOnNext( new Action1<Integer>() 
{
-                                                            @Override
-                                                            public void call( 
final Integer count ) {
-
-
-                                                                LOG.debug(
-                                                                        
"Executing batch for subtype deletion with " +
-                                                                               
 "type {}.  "
-                                                                               
 + "Mutation has {} rows to mutate ",
-                                                                        
edgeType, batch.getRowCount() );
-
-                                                                try {
-                                                                    
batch.execute();
-                                                                }
-                                                                catch ( 
ConnectionException e ) {
-                                                                    throw new 
RuntimeException( "Unable to connect to casandra", e );
-                                                                }
-                                                            }
-                                                        }
-
-
-                                                      );
+                .flatMap( types -> {
+
+
+                    final MutationBatch batch = 
keyspace.prepareMutationBatch();
+
+                    final List<Observable<Integer>> checks = new 
ArrayList<Observable<Integer>>( types.size() );
+
+                    //for each id type, check if the exist in parallel to 
increase processing speed
+                    for ( final String subType : types ) {
+
+                        LOG.debug( "Checking for edges with nodeId {}, type 
{}, and subtype {}", node, edgeType,
+                                subType );
+
+                        Observable<Integer> search =
+                                //load each edge in it's own thread
+                                serialization.loadEdges( scope, node, 
edgeType, subType, maxTimestamp )
+                                             .doOnNext( RX_LOG ).take( 1 
).count()
+                                             .doOnNext( count -> {
+                                                 /**
+                                                  * we only want to delete if 
no edges are in this class. If
+                                                  * there
+                                                  * are
+                                                  * still edges
+                                                  * we must retain the 
information in order to keep our index
+                                                  * structure
+                                                  * correct for edge
+                                                  * iteration
+                                                  **/
+                                                 if ( count != 0 ) {
+                                                     LOG.debug( "Found edge 
with nodeId {}, type {}, "
+                                                                     + "and 
subtype {}. Not removing subtype. ",
+                                                             node, edgeType, 
subType );
+                                                     return;
+                                                 }
+
+
+                                                 LOG.debug( "No edges with 
nodeId {}, type {}, "
+                                                                 + "and 
subtype {}. Removing subtype.", node,
+                                                         edgeType, subType );
+                                                 batch.mergeShallow( 
serialization
+                                                         .removeEdgeSubType( 
scope, node, edgeType, subType,
+                                                             maxTimestamp ) );
+                                             } );
+
+                        checks.add( search );
                     }
+
+
+                    /**
+                     * Sum up the total number of edges we had, then execute 
the mutation if we have
+                     * anything to do
+                     */
+
+
+                    return MathObservable.sumInteger( Observable.merge( checks 
) )
+                                         .doOnNext( count -> {
+
+
+                                             LOG.debug( "Executing batch for 
subtype deletion with " +
+                                                     "type {}.  " + "Mutation 
has {} rows to mutate ",
+                                                 edgeType, batch.getRowCount() 
);
+
+                                             try {
+                                                 batch.execute();
+                                             }
+                                             catch ( ConnectionException e ) {
+                                                 throw new RuntimeException(
+                                                     "Unable to connect to 
casandra", e );
+                                             }
+                                         }
+
+
+                                                  );
                 } )
                         //if we get no edges, emit a 0 so the caller knows we 
can delete the type
                 .defaultIfEmpty( 0 );
@@ -210,29 +198,25 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair 
{
 
         //sum up everything emitted by sub types.  If there's no edges 
existing for all sub types,
         // then we can safely remove them
-        return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 
).doOnNext( new Action1<Integer>() {
-
-            @Override
-            public void call( final Integer subTypeUsedCount ) {
-
-                /**
-                 * We can only execute deleting this type if no sub types were 
deleted
-                 */
-                if ( subTypeUsedCount != 0 ) {
-                    LOG.debug( "Type {} has {} subtypes in use as of 
maxTimestamp {}.  Not deleting type.", edgeType,
-                            subTypeUsedCount, maxTimestamp );
-                    return;
-                }
+        return MathObservable.sumInteger( deleteCounts ).lastOrDefault( 0 
).doOnNext( subTypeUsedCount -> {
+
+            /**
+             * We can only execute deleting this type if no sub types were 
deleted
+             */
+            if ( subTypeUsedCount != 0 ) {
+                LOG.debug( "Type {} has {} subtypes in use as of maxTimestamp 
{}.  Not deleting type.", edgeType,
+                        subTypeUsedCount, maxTimestamp );
+                return;
+            }
 
 
-                LOG.debug( "Type {} has no subtypes in use as of maxTimestamp 
{}.  Deleting type.", edgeType,
-                        maxTimestamp );
-                try {
-                    serialization.removeEdgeType( scope, node, edgeType, 
maxTimestamp ).execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to 
casandra", e );
-                }
+            LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  
Deleting type.", edgeType,
+                    maxTimestamp );
+            try {
+                serialization.removeEdgeType( scope, node, edgeType, 
maxTimestamp ).execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e 
);
             }
         } );
     }
@@ -241,7 +225,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
     /**
      * Simple edge serialization
      */
-    private static interface CleanSerialization {
+    private interface CleanSerialization {
 
         /**
          * Load all subtypes for the edge with a maxTimestamp <= the provided 
maxTimestamp

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
index 89f280d..68569e5 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
@@ -42,5 +42,5 @@ public interface NodeDeleteListener {
        * @return An observable that emits the total number of edges that have 
been removed with this node both as the
        *         target and source
        */
-      public Observable<Integer> receive( final ApplicationScope scope, final 
Id node, final UUID timestamp );
+    Observable<Integer> receive( final ApplicationScope scope, final Id node, 
final UUID timestamp );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
deleted file mode 100644
index 658f4bf..0000000
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Integration test that performs all calls immediately after writes without 
blocking.  Tests that our
- * view is immediately consistent to our users, even if we have yet to perform 
background processing
- */
-@RunWith(ITRunner.class)
-@UseModules({ TestGraphModule.class })
-public class CommittedGraphManagerIT extends GraphManagerIT {
-
-
-    @Override
-    protected GraphManager getHelper(GraphManager gm) {
-        return new ComittedGraphTestHelper( gm );
-    }
-
-
-    /**
-     * Doesn't wait for the async process to happen before returning.  Simply 
executes and immediately returns.
-     */
-    public static class ComittedGraphTestHelper implements GraphManager {
-
-        private final GraphManager graphManager;
-
-
-        public ComittedGraphTestHelper( final GraphManager graphManager ) {
-            this.graphManager = graphManager;
-        }
-
-
-        @Override
-        public Observable<Edge> writeEdge( final Edge edge ) {
-            return graphManager.writeEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> markEdge( final Edge edge ) {
-            return graphManager.markEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Id> markNode( final Id node, final long timestamp ) {
-            return graphManager.markNode( node, timestamp );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
-            return graphManager.loadEdgeVersions( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType 
search ) {
-            return graphManager.loadEdgesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType 
search ) {
-            return graphManager.loadEdgesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSourceByType( final 
SearchByIdType search ) {
-            return  graphManager.loadEdgesFromSourceByType(search);
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType 
search ) {
-            return graphManager.loadEdgesToTargetByType( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesFromSource( final SearchEdgeType 
search ) {
-            return graphManager.getEdgeTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesFromSource( final SearchIdType 
search ) {
-            return graphManager.getIdTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesToTarget( final SearchEdgeType 
search ) {
-            return graphManager.getEdgeTypesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesToTarget( final SearchIdType 
search ) {
-            return graphManager.getIdTypesToTarget( search );
-        }
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 340c712..75f61f0 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -25,11 +25,15 @@ import java.util.concurrent.TimeoutException;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -49,8 +53,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-
-public abstract class GraphManagerIT {
+@RunWith(ITRunner.class)
+@UseModules({ TestGraphModule.class })
+public class GraphManagerIT {
 
 
     @Inject
@@ -64,12 +69,6 @@ public abstract class GraphManagerIT {
     protected ApplicationScope scope;
 
 
-    /**
-     * Get the helper for performing our tests
-     *
-     * @return The helper to use when writing/synchronizing tests
-     */
-    protected abstract GraphManager getHelper( GraphManager gm );
 
 
     @Before
@@ -81,7 +80,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeSource() throws TimeoutException, 
InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -113,7 +112,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTarget() throws TimeoutException, 
InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
         Edge edge = createEdge( "source", "test", "target" );
 
@@ -145,7 +144,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, 
InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
         final long earlyVersion = 1000l;
 
@@ -179,7 +178,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, 
InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
         final long earlyVersion = 10000l;
 
@@ -217,7 +216,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeVersionSourceDistinct() throws 
TimeoutException, InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
         final long earlyVersion = 10000l;
 
@@ -289,7 +288,7 @@ public abstract class GraphManagerIT {
     public void testWriteReadEdgeTypeVersionTargetDistinct() throws 
TimeoutException, InterruptedException {
 
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
 
         final long earlyVersion = 10000l;
@@ -361,7 +360,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypePagingSource() throws TimeoutException, 
InterruptedException {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
         final Id sourceId = IdGenerator.createId( "source" );
 
 
@@ -416,7 +415,7 @@ public abstract class GraphManagerIT {
     public void testWriteReadEdgeTypePagingTarget() {
 
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
 
         final Id targetId = IdGenerator.createId( "target" );
@@ -471,7 +470,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeSource() {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -507,7 +506,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypeTargetTypeTarget() {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
         ;
 
 
@@ -544,7 +543,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeDeleteSource() {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -614,7 +613,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeDeleteTarget() {
 
-        GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        GraphManager gm =  emf.createEdgeManager( scope );
 
 
         Edge edge = createEdge( "source", "test", "target" );
@@ -670,7 +669,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypes() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -733,7 +732,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypes() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -798,7 +797,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPaging() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -876,7 +875,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPaging() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -957,7 +956,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testMarkSourceEdges() throws InterruptedException {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1046,7 +1045,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testMarkTargetEdges() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1114,7 +1113,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testMarkSourceEdgesType() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1192,7 +1191,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testMarkTargetEdgesType() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1270,7 +1269,7 @@ public abstract class GraphManagerIT {
     @Test
     public void markSourceNode() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1 = new SimpleId( "target" );
@@ -1352,7 +1351,7 @@ public abstract class GraphManagerIT {
     @Test
     public void markTargetNode() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId1 = new SimpleId( "source" );
         Id sourceId2 = new SimpleId( "source2" );
@@ -1434,7 +1433,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesSourceTypesPrefix() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId = new SimpleId( "target" );
@@ -1484,7 +1483,7 @@ public abstract class GraphManagerIT {
     public void testSourceSubTypes() {
 
         //now test sub edges
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id sourceId = new SimpleId( "source" );
         Id targetId1target1 = new SimpleId( "type1target1" );
@@ -1533,7 +1532,7 @@ public abstract class GraphManagerIT {
     @Test
     public void testWriteReadEdgeTypesTargetTypesPrefix() {
 
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id targetId = new SimpleId( "target" );
         Id sourceId = new SimpleId( "source" );
@@ -1583,7 +1582,7 @@ public abstract class GraphManagerIT {
     public void testTargetSubTypes() {
 
         //now test sub edges
-        final GraphManager gm = getHelper( emf.createEdgeManager( scope ) );
+        final GraphManager gm =  emf.createEdgeManager( scope );
 
         Id targetId = new SimpleId( "target" );
         Id sourceId1target1 = new SimpleId( "type1source1" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/99cb7005/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
deleted file mode 100644
index 84825aa..0000000
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.Observer;
-
-
-/**
- * Integration test that will block reads until all post processing has 
completed.  This ensures once our data has made
- * it to the permanent storage.  Tests that our view is immediately consistent 
to our users, even if we have yet to
- * perform background processing
- */
-@RunWith( ITRunner.class )
-@UseModules( { TestGraphModule.class } )
-public class StorageGraphManagerIT extends GraphManagerIT {
-
-    private static final Logger LOG = LoggerFactory.getLogger( 
StorageGraphManagerIT.class );
-
-
-    @Before
-    public void setup(){
-    }
-
-
-
-    @Override
-    protected GraphManager getHelper( final GraphManager gm ) {
-
-
-        final StorageGraphTestHelper helper = new StorageGraphTestHelper( gm );
-
-        GraphManagerImpl gmi = ( GraphManagerImpl ) gm;
-
-
-        Observer<Integer> subscriber = new Observer<Integer>() {
-                    @Override
-                    public void onCompleted() {
-                        helper.complete();
-                    }
-
-
-                    @Override
-                    public void onError( final Throwable e ) {
-                        helper.complete();
-                        helper.error();
-                    }
-
-
-                    @Override
-                    public void onNext( final Integer integer ) {
-                        //no op
-                    }
-                };
-
-        gmi.setEdgeDeleteSubcriber(subscriber );
-//        gmi.setEdgeWriteSubcriber( subscriber );
-        gmi.setNodeDelete( subscriber );
-
-        return helper;
-    }
-
-
-    /**
-     * Doesn't wait for the async process to happen before returning.  Simply 
executes and immediately returns.
-     */
-    public static class StorageGraphTestHelper implements GraphManager {
-
-        private final GraphManager graphManager;
-        private final AtomicInteger completeInvocations = new AtomicInteger( 0 
);
-        private final AtomicInteger errorInvocations = new AtomicInteger( 0 );
-        private final Object mutex = new Object();
-
-
-        public StorageGraphTestHelper( final GraphManager graphManager ) {
-            this.graphManager = graphManager;
-        }
-
-
-        @Override
-        public Observable<Edge> writeEdge( final Edge edge ) {
-            return graphManager.writeEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> markEdge( final Edge edge ) {
-            waitForComplete();
-            return graphManager.markEdge( edge );
-        }
-
-
-        @Override
-        public Observable<Id> markNode( final Id node, final long timestamp ) {
-            waitForComplete();
-            return graphManager.markNode( node, timestamp );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgeVersions( final SearchByEdge edge ) {
-            await();
-            return graphManager.loadEdgeVersions( edge );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType 
search ) {
-            await();
-            return graphManager.loadEdgesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType 
search ) {
-            await();
-            return graphManager.loadEdgesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesFromSourceByType( final 
SearchByIdType search ) {
-            await();
-            return graphManager.loadEdgesFromSourceByType( search );
-        }
-
-
-        @Override
-        public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType 
search ) {
-            await();
-            return graphManager.loadEdgesToTargetByType( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesFromSource( final SearchEdgeType 
search ) {
-            await();
-            return graphManager.getEdgeTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesFromSource( final SearchIdType 
search ) {
-            await();
-            return graphManager.getIdTypesFromSource( search );
-        }
-
-
-        @Override
-        public Observable<String> getEdgeTypesToTarget( final SearchEdgeType 
search ) {
-            await();
-            return graphManager.getEdgeTypesToTarget( search );
-        }
-
-
-        @Override
-        public Observable<String> getIdTypesToTarget( final SearchIdType 
search ) {
-            await();
-            return graphManager.getIdTypesToTarget( search );
-        }
-
-
-        public void waitForComplete(){
-            LOG.info( "Complete incremented" );
-            completeInvocations.incrementAndGet();
-        }
-
-        public void complete() {
-            LOG.info( "Complete decremented" );
-            completeInvocations.decrementAndGet();
-            tryWake();
-        }
-
-
-        public void error() {
-            LOG.info( "Error incremented" );
-            errorInvocations.incrementAndGet();
-            tryWake();
-        }
-
-
-        public void tryWake() {
-            synchronized ( mutex ) {
-                mutex.notify();
-            }
-        }
-
-
-        /**
-         * Away for our invocations to be 0
-         */
-        public void await() {
-            while (  completeInvocations.get() != 0 ) {
-
-                LOG.info( "Waiting for more invocations, count is {} ", 
completeInvocations.get() );
-
-                synchronized ( mutex ) {
-                    try {
-                        mutex.wait(1000 );
-                    }
-                    catch ( InterruptedException e ) {
-                        //no op
-                    }
-                }
-            }
-        }
-    }
-}

Reply via email to