Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-614 36b5bad2d -> 70e0e75aa


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 9c0c62d..9a8a00f 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -120,12 +120,9 @@ public class GraphManagerImpl implements GraphManager {
     @Inject
     public GraphManagerImpl( final EdgeMetadataSerialization 
edgeMetadataSerialization,
                              final EdgeSerialization storageEdgeSerialization,
-                             final NodeSerialization nodeSerialization,
-                             final GraphFig graphFig,
-                             final EdgeDeleteListener edgeDeleteListener,
-                             final NodeDeleteListener nodeDeleteListener,
-                             final ApplicationScope scope,
-                             MetricsFactory metricsFactory) {
+                             final NodeSerialization nodeSerialization, final 
GraphFig graphFig,
+                             final EdgeDeleteListener edgeDeleteListener, 
final NodeDeleteListener nodeDeleteListener,
+                             final ApplicationScope scope, MetricsFactory 
metricsFactory ) {
 
 
         ValidationUtils.validateApplicationScope( scope );
@@ -146,36 +143,34 @@ public class GraphManagerImpl implements GraphManager {
 
         this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
         this.nodeDelete = MetricSubscriber.INSTANCE;
-        this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, 
"write.edge.meter");
-        this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, 
"write.edge.timer");
-        this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, 
"delete.edge.meter");
-        this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, 
"delete.edge.timer");
-        this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, 
"delete.node.meter");
-        this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, 
"delete.node.timer");
-        this.loadEdgesFromSourceMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter");
-        this.loadEdgesFromSourceTimer = 
metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer");
-        this.loadEdgesToTargetMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter");
-        this.loadEdgesToTargetTimer = 
metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer");
-        this.loadEdgesVersionsMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter");
-        this.loadEdgesVersionsTimer = 
metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer");
-        this.loadEdgesFromSourceByTypeMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter");
-        this.loadEdgesFromSourceByTypeTimer = 
metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer");
-        this.loadEdgesToTargetByTypeMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter");
-        this.loadEdgesToTargetByTypeTimer = 
metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer");
-
-        this.getEdgeTypesFromSourceTimer = 
metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer");
-        this.getEdgeTypesFromSourceMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.from.meter");
-
-        this.getIdTypesFromSourceTimer = 
metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer");
-        this.getIdTypesFromSourceMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter");
-
-        this.getEdgeTypesToTargetTimer = 
metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer");
-        this.getEdgeTypesToTargetMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter");
-
-        this.getIdTypesToTargetTimer = 
metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer");
-        this.getIdTypesToTargetMeter = 
metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter");
-
-
+        this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, 
"write.edge.meter" );
+        this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, 
"write.edge.timer" );
+        this.deleteEdgeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "delete.edge.meter" );
+        this.deleteEdgeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.edge.timer" );
+        this.deleteNodeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "delete.node.meter" );
+        this.deleteNodeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "delete.node.timer" );
+        this.loadEdgesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.from.meter" );
+        this.loadEdgesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from.timer" );
+        this.loadEdgesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.to.meter" );
+        this.loadEdgesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to.timer" );
+        this.loadEdgesVersionsMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.versions.meter" );
+        this.loadEdgesVersionsTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.versions.timer" );
+        this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.from.type.meter" );
+        this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.from.type.timer" );
+        this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "load.to.type.meter" );
+        this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "load.to.type.timer" );
+
+        this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.from.timer" );
+        this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.edge.from.meter" );
+
+        this.getIdTypesFromSourceTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.from.timer" );
+        this.getIdTypesFromSourceMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.idtype.from.meter" );
+
+        this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.edge.to.timer" );
+        this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.edge.to.meter" );
+
+        this.getIdTypesToTargetTimer = metricsFactory.getTimer( 
GraphManagerImpl.class, "get.idtype.to.timer" );
+        this.getIdTypesToTargetMeter = metricsFactory.getMeter( 
GraphManagerImpl.class, "get.idtype.to.meter" );
     }
 
 
@@ -209,41 +204,39 @@ public class GraphManagerImpl implements GraphManager {
 
                 return edge;
             }
-        } )
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( new Action1<Notification<? super Edge>>() {
+            @Override
+            public void call( Notification<? super Edge> notification ) {
+                meter.mark();
+            }
+        } ).doOnCompleted( new Action0() {
+            @Override
+            public void call() {
+                timer.stop();
+            }
+        } );
     }
 
 
     @Override
-    public Observable<Edge> deleteEdge( final Edge edge ) {
-        GraphValidation.validateEdge(edge);
+    public Observable<Edge> markEdge( final Edge edge ) {
+        GraphValidation.validateEdge( edge );
 
-        final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true);
+        final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
 
         final Timer.Context timer = deleteEdgeTimer.time();
         final Meter meter = deleteEdgeMeter;
-        return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, 
Edge>() {
             @Override
-            public Edge call(final MarkedEdge edge) {
+            public Edge call( final MarkedEdge edge ) {
 
                 final UUID timestamp = UUIDGenerator.newTimeUUID();
 
 
-                final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge(scope, edge, timestamp);
+                final MutationBatch edgeMutation = 
storageEdgeSerialization.writeEdge( scope, edge, timestamp );
 
 
-                LOG.debug("Marking edge {} as deleted to commit log", edge);
+                LOG.debug( "Marking edge {} as deleted to commit log", edge );
                 try {
                     edgeMutation.execute();
                 }
@@ -254,73 +247,50 @@ public class GraphManagerImpl implements GraphManager {
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, 
markedEdge,
                 // timestamp )).subscribeOn( Schedulers.io() ).subscribe( 
edgeDeleteSubcriber );
-                edgeDeleteListener.receive(scope, markedEdge, 
timestamp).subscribeOn(Schedulers.io())
-                    .subscribe(edgeDeleteSubcriber);
+                edgeDeleteListener.receive( scope, markedEdge, timestamp 
).subscribeOn( Schedulers.io() )
+                                  .subscribe( edgeDeleteSubcriber );
 
 
                 return edge;
             }
-        })
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
     @Override
-    public Observable<Id> deleteNode( final Id node, final long timestamp ) {
+    public Observable<Id> markNode( final Id node, final long timestamp ) {
         final Timer.Context timer = deleteNodeTimer.time();
         final Meter meter = deleteNodeMeter;
-        return Observable.just( node ).map( new Func1<Id, Id>() {
-            @Override
-            public Id call( final Id id ) {
+        return Observable.just( node ).map( id -> {
 
-                //mark the node as deleted
+            //mark the node as deleted
 
 
-                final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
+            final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
 
-                final MutationBatch nodeMutation = nodeSerialization.mark( 
scope, id, timestamp );
+            final MutationBatch nodeMutation = nodeSerialization.mark( scope, 
id, timestamp );
 
 
-                LOG.debug( "Marking node {} as deleted to node mark", node );
-                try {
-                    nodeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute mutation", 
e );
-                }
+            LOG.debug( "Marking node {} as deleted to node mark", node );
+            try {
+                nodeMutation.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to execute mutation", e );
+            }
 
 
-                //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, 
eventTimestamp  )).subscribeOn(
-                // Schedulers.io() ).subscribe( nodeDelete );
-                nodeDeleteListener.receive( scope, id, eventTimestamp 
).subscribeOn( Schedulers.io() )
-                                  .subscribe( nodeDelete );
+            //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, 
eventTimestamp  )).subscribeOn(
+            // Schedulers.io() ).subscribe( nodeDelete );
+            nodeDeleteListener.receive( scope, id, eventTimestamp 
).subscribeOn( Schedulers.io() )
+                              .subscribe( nodeDelete );
 
-                return id;
-            }
-        } )
-            .doOnEach(new Action1<Notification<? super Id>>() {
-                @Override
-                public void call(Notification<? super Id> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+            return id;
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -333,20 +303,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgeVersions( scope, 
searchByEdge );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new 
EdgeBufferFilter(searchByEdge.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( 
searchByEdge.getMaxTimestamp(),  searchByEdge.filterMarked() ) )
+                         .cast( Edge.class ).doOnEach( notification -> {
+                meter.mark();
+            } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -359,20 +320,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesFromSource( scope, 
search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new 
EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -385,20 +337,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTarget( scope, 
search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new 
EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -411,21 +354,12 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return 
storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new 
EdgeBufferFilter(search.getMaxTimestamp()))
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) )
 
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+                         .cast( Edge.class ).doOnEach( notification -> {
+                meter.mark();
+            } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -438,20 +372,11 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<MarkedEdge> getIterator() {
                 return storageEdgeSerialization.getEdgesToTargetBySourceType( 
scope, search );
             }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap(new 
EdgeBufferFilter(search.getMaxTimestamp()))
-                         .cast(Edge.class)
-            .doOnEach(new Action1<Notification<? super Edge>>() {
-                @Override
-                public void call(Notification<? super Edge> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).buffer( graphFig.getScanPageSize() )
+                         .flatMap( new EdgeBufferFilter( 
search.getMaxTimestamp(),  search.filterMarked() ) ).cast( Edge.class )
+                         .doOnEach( notification -> {
+                             meter.mark();
+                         } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -464,19 +389,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesFromSource( 
scope, search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -489,19 +404,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesFromSource( scope, 
search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -514,19 +419,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getEdgeTypesToTarget( scope, 
search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -539,19 +434,9 @@ public class GraphManagerImpl implements GraphManager {
             protected Iterator<String> getIterator() {
                 return edgeMetadataSerialization.getIdTypesToTarget( scope, 
search );
             }
-        } )
-            .doOnEach(new Action1<Notification<? super String>>() {
-                @Override
-                public void call(Notification<? super String> notification) {
-                    meter.mark();
-                }
-            })
-            .doOnCompleted(new Action0() {
-                @Override
-                public void call() {
-                    timer.stop();
-                }
-            });
+        } ).doOnEach( notification -> {
+            meter.mark();
+        } ).doOnCompleted( () -> timer.stop() );
     }
 
 
@@ -561,10 +446,12 @@ public class GraphManagerImpl implements GraphManager {
     private class EdgeBufferFilter implements Func1<List<MarkedEdge>, 
Observable<MarkedEdge>> {
 
         private final long maxVersion;
+        private final boolean filterMarked;
 
 
-        private EdgeBufferFilter( final long maxVersion ) {
+        private EdgeBufferFilter( final long maxVersion, final boolean 
filterMarked ) {
             this.maxVersion = maxVersion;
+            this.filterMarked = filterMarked;
         }
 
 
@@ -579,57 +466,36 @@ public class GraphManagerImpl implements GraphManager {
         public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges 
) {
 
             final Map<Id, Long> markedVersions = 
nodeSerialization.getMaxVersions( scope, markedEdges );
-            return Observable.from( markedEdges ).filter( new EdgeFilter( 
this.maxVersion, markedVersions ) );
-        }
-    }
-
-
-    /**
-     * Filter the returned values based on the max uuid and if it's been 
marked for deletion or not
-     */
-    private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
-
-        private final long maxTimestamp;
-
-        private final Map<Id, Long> markCache;
-
-
-        private EdgeFilter( final long maxTimestamp, Map<Id, Long> markCache ) 
{
-            this.maxTimestamp = maxTimestamp;
-            this.markCache = markCache;
-        }
-
-
-        @Override
-        public Boolean call( final MarkedEdge edge ) {
-
+            final long maxTimestamp = maxVersion;
 
-            final long edgeTimestamp = edge.getTimestamp();
+            return Observable.from( markedEdges ).filter( edge -> {
+                final long edgeTimestamp = edge.getTimestamp();
 
-            //our edge needs to not be deleted and have a version that's > max 
Version
-            if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp 
) > 0 ) {
-                return false;
-            }
+                //our edge needs to not be deleted and have a version that's > 
max Version
+                if ( edge.isDeleted() || Long.compare( edgeTimestamp, 
maxTimestamp ) > 0 ) {
+                    return false;
+                }
 
 
-            final Long sourceTimestamp = markCache.get( edge.getSourceNode() );
+                final Long sourceTimestamp = markedVersions.get( 
edge.getSourceNode() );
 
-            //the source Id has been marked for deletion.  It's version is <= 
to the marked version for deletion,
-            // so we need to discard it
-            if ( sourceTimestamp != null && Long.compare( edgeTimestamp, 
sourceTimestamp ) < 1 ) {
-                return false;
-            }
+                //the source Id has been marked for deletion.  It's version is 
<= to the marked version for deletion,
+                // so we need to discard it
+                if ( sourceTimestamp != null && Long.compare( edgeTimestamp, 
sourceTimestamp ) < 1 ) {
+                    return false;
+                }
 
-            final Long targetTimestamp = markCache.get( edge.getTargetNode() );
+                final Long targetTimestamp = markedVersions.get( 
edge.getTargetNode() );
 
-            //the target Id has been marked for deletion.  It's version is <= 
to the marked version for deletion,
-            // so we need to discard it
-            if ( targetTimestamp != null && Long.compare( edgeTimestamp, 
targetTimestamp ) < 1 ) {
-                return false;
-            }
+                //the target Id has been marked for deletion.  It's version is 
<= to the marked version for deletion,
+                // so we need to discard it
+                if ( targetTimestamp != null && Long.compare( edgeTimestamp, 
targetTimestamp ) < 1 ) {
+                    return false;
+                }
 
 
-            return true;
+                return true;
+            } );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index a28a0bb..9a23caf 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -33,7 +33,6 @@ import com.google.common.base.Preconditions;
 
 /**
  * Simple bean implementation of search by edge
- *
  */
 public class SimpleSearchByEdge implements SearchByEdge {
 
@@ -43,24 +42,44 @@ public class SimpleSearchByEdge implements SearchByEdge {
     private final long maxTimestamp;
     private final Optional<Edge> last;
     private final SearchByEdgeType.Order order;
+    private final boolean filterMarked;
 
 
     /**
      * Create the search modules
+     *
      * @param sourceNode The source node of the edge
      * @param targetNode The target node of the edge
      * @param type The edge type
      * @param maxTimestamp The maximum timestamp to seek from
      * @param last The value to start seeking from.  Must be >= this value
      */
-    public SimpleSearchByEdge( final Id sourceNode, final String type, final 
Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, 
final Optional<Edge> last ) {
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final 
Id targetNode, final long maxTimestamp,
+                               final SearchByEdgeType.Order order, final 
Optional<Edge> last ) {
+        this( sourceNode, type, targetNode, maxTimestamp, order, last, true );
+    }
+
+
+    /**
+     * Create the search modules
+     *
+     * @param sourceNode The source node of the edge
+     * @param type The edge type
+     * @param targetNode The target node of the edge
+     * @param maxTimestamp The maximum timestamp to seek from
+     * @param last The value to start seeking from.  Must be >= this value
+     */
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final 
Id targetNode, final long maxTimestamp,
+                               final SearchByEdgeType.Order order, final 
Optional<Edge> last,
+                               final boolean filterMarked ) {
 
-        ValidationUtils.verifyIdentity(sourceNode);
-        ValidationUtils.verifyIdentity(targetNode);
-        ValidationUtils.verifyString(type, "type");
-        GraphValidation.validateTimestamp(maxTimestamp, "maxTimestamp");
-        Preconditions.checkNotNull(order, "order must not be null");
-        Preconditions.checkNotNull(last, "last can never be null");
+
+        ValidationUtils.verifyIdentity( sourceNode );
+        ValidationUtils.verifyIdentity( targetNode );
+        ValidationUtils.verifyString( type, "type" );
+        GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+        Preconditions.checkNotNull( order, "order must not be null" );
+        Preconditions.checkNotNull( last, "last can never be null" );
 
 
         this.sourceNode = sourceNode;
@@ -69,6 +88,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
         this.maxTimestamp = maxTimestamp;
         this.order = order;
         this.last = last;
+        this.filterMarked = filterMarked;
     }
 
 
@@ -97,6 +117,10 @@ public class SimpleSearchByEdge implements SearchByEdge {
 
 
     @Override
+    public boolean filterMarked() { return filterMarked; }
+
+
+    @Override
     public Optional<Edge> last() {
         return last;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 1687162..9392dbc 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -41,6 +41,7 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
     private final long maxTimestamp;
     private final Optional<Edge> last;
     private final Order order;
+    private final boolean filterMarked;
 
 
     /**
@@ -55,7 +56,7 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
      * //TODO, make last an optional
      */
     public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order, final Edge last ) {
-        this(node, type, maxTimestamp, order, Optional.fromNullable(last));
+        this(node, type, maxTimestamp, order, Optional.fromNullable(last), 
true);
     }
 
 
@@ -70,7 +71,24 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
      *
      * //TODO, make last an optional
      */
-    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order, final Optional<Edge> last ) {
+    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order,
+                                   final Optional<Edge> last ) {
+        this( node, type, maxTimestamp, order, last, true );
+    }
+
+
+    /**
+     * Create the search modules
+     * @param node The node to search from
+     * @param type The edge type
+     * @param maxTimestamp The maximum timestamp to return
+     * @param order The order order.  Descending is most efficient
+     * @param last The value to start seeking from.  Must be >= this value
+     * @param filterMarked
+     */
+    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order,
+                                   final Optional<Edge> last, final boolean 
filterMarked ) {
+
 
         Preconditions.checkNotNull( order, "order is required");
         ValidationUtils.verifyIdentity( node );
@@ -84,6 +102,7 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
         this.maxTimestamp = maxTimestamp;
         this.order = order;
         this.last = last;
+        this.filterMarked = filterMarked;
     }
 
 
@@ -118,6 +137,12 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
 
 
     @Override
+    public boolean filterMarked() {
+        return filterMarked;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 6135121..619e65d 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,7 +29,9 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.annotation.Nullable;
 
@@ -37,13 +39,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -63,6 +63,8 @@ import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -85,7 +87,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
 
-    private final TaskExecutor taskExecutor;
+    private final ListeningExecutorService taskExecutor;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
@@ -104,8 +106,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                                      final NodeShardAllocation 
nodeShardAllocation,
                                      final ShardedEdgeSerialization 
shardedEdgeSerialization,
                                      final EdgeColumnFamilies 
edgeColumnFamilies, final Keyspace keyspace,
-                                     final EdgeShardSerialization 
edgeShardSerialization,
-                                     @GraphTaskExecutor final TaskExecutor 
taskExecutor ) {
+                                     final EdgeShardSerialization 
edgeShardSerialization) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -119,7 +120,10 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
-        this.taskExecutor = taskExecutor;
+
+        this.taskExecutor = MoreExecutors.listeningDecorator( 
TaskExecutorFactory
+            .createTaskExecutor( "ShardCompaction", 
graphFig.getShardAuditWorkerCount(),
+                graphFig.getShardAuditWorkerQueueSize() ) );
     }
 
 
@@ -139,8 +143,8 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
 
         Preconditions.checkNotNull( group, "group cannot be null" );
         Preconditions.checkArgument( group.isCompactionPending(), "Compaction 
is pending" );
-        Preconditions.checkArgument( group.shouldCompact( startTime ),
-                "Compaction cannot be run yet.  Ignoring compaction." );
+        Preconditions
+            .checkArgument( group.shouldCompact( startTime ), "Compaction 
cannot be run yet.  Ignoring compaction." );
 
 
         final CompactionResult.CompactionBuilder resultBuilder = 
CompactionResult.builder();
@@ -170,8 +174,9 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
 
 
         for ( Shard sourceShard : sourceShards ) {
-            Iterator<MarkedEdge> edges = edgeMeta.loadEdges( 
shardedEdgeSerialization, edgeColumnFamilies, scope,
-                    Collections.singleton( sourceShard ), Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING );
+            Iterator<MarkedEdge> edges = edgeMeta
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, 
scope, Collections.singleton( sourceShard ),
+                    Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
 
             while ( edges.hasNext() ) {
                 final MarkedEdge edge = edges.next();
@@ -186,13 +191,13 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                 }
 
 
-                newRowBatch.mergeShallow(
-                        edgeMeta.writeEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, targetShard, edge,
-                                timestamp ) );
+                newRowBatch.mergeShallow( edgeMeta
+                        .writeEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, targetShard, edge,
+                            timestamp ) );
 
-                deleteRowBatch.mergeShallow(
-                        edgeMeta.deleteEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, sourceShard, edge,
-                                timestamp ) );
+                deleteRowBatch.mergeShallow( edgeMeta
+                        .deleteEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, sourceShard, edge,
+                            timestamp ) );
 
                 edgeCount++;
 
@@ -298,7 +303,18 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
          * Try and submit.  During back pressure, we may not be able to 
submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = taskExecutor.submit( new 
ShardAuditTask( scope, edgeMeta, group ) );
+        final ListenableFuture<AuditResult> future;
+
+        try {
+            future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, 
group ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+
+            //ignore, if this happens we don't care, we're saturated, we can 
check later
+            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and 
group {}", scope, edgeMeta, group );
+
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+        }
 
         /**
          * Log our success or failures for debugging purposes
@@ -320,7 +336,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult> {
+    private final class ShardAuditTask implements Callable<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -334,20 +350,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
             this.group = group;
         }
 
-         @Override
-        public void exceptionThrown( final Throwable throwable ) {
-            LOG.error( "Unable to execute audit for shard of {}", throwable );
-        }
-
-
-        @Override
-        public AuditResult rejected() {
-            //ignore, if this happens we don't care, we're saturated, we can 
check later
-            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and 
group {}", scope, edgeMeta, group );
-
-            return AuditResult.NOT_CHECKED;
-        }
-
 
         @Override
         public AuditResult call() throws Exception {
@@ -401,10 +403,8 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group 
);
-                    LOG.info(
-                            "Compaction result for compaction of scope {} with 
edge meta data of {} and shard group " +
-                                    "{} is {}",
-                            new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info( "Compaction result for compaction of scope {} 
with edge meta data of {} and shard group "
+                            + "{} is {}", new Object[] { scope, edgeMeta, 
group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, 
group );
@@ -418,8 +418,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-
-
     /**
      * Inner class used to track running tasks per instance
      */
@@ -534,8 +532,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-
-
     public static final class CompactionResult {
 
         public final long copiedEdges;
@@ -566,12 +562,12 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
         @Override
         public String toString() {
             return "CompactionResult{" +
-                    "copiedEdges=" + copiedEdges +
-                    ", targetShard=" + targetShard +
-                    ", sourceShards=" + sourceShards +
-                    ", removedShards=" + removedShards +
-                    ", compactedShard=" + compactedShard +
-                    '}';
+                "copiedEdges=" + copiedEdges +
+                ", targetShard=" + targetShard +
+                ", sourceShards=" + sourceShards +
+                ", removedShards=" + removedShards +
+                ", compactedShard=" + compactedShard +
+                '}';
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
index b471119..658f4bf 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
@@ -66,14 +66,14 @@ public class CommittedGraphManagerIT extends GraphManagerIT 
{
 
 
         @Override
-        public Observable<Edge> deleteEdge( final Edge edge ) {
-            return graphManager.deleteEdge( edge );
+        public Observable<Edge> markEdge( final Edge edge ) {
+            return graphManager.markEdge( edge );
         }
 
 
         @Override
-        public Observable<Id> deleteNode( final Id node, final long timestamp) 
{
-            return graphManager.deleteNode( node, timestamp );
+        public Observable<Id> markNode( final Id node, final long timestamp ) {
+            return graphManager.markNode( node, timestamp );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 2ed4f13..340c712 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -584,7 +584,7 @@ public abstract class GraphManagerIT {
 
 
         //now delete it
-        returned = gm.deleteEdge( edge ).toBlocking().last();
+        returned = gm.markEdge( edge ).toBlocking().last();
 
 
         //now test retrieval, should be null
@@ -645,7 +645,7 @@ public abstract class GraphManagerIT {
 
 
         //now delete it
-        gm.deleteEdge( edge ).toBlocking().last();
+        gm.markEdge( edge ).toBlocking().last();
 
         //now test retrieval, should be null
         edges = gm.loadEdgesToTarget( search );
@@ -1005,7 +1005,7 @@ public abstract class GraphManagerIT {
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
@@ -1025,7 +1025,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
@@ -1081,7 +1081,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTarget( createSearchByEdge( 
edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
@@ -1096,7 +1096,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
         edges = gm.loadEdgesToTarget( createSearchByEdge( 
edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
@@ -1146,7 +1146,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesFromSourceByType(
@@ -1171,7 +1171,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
 
         edges = gm.loadEdgesFromSourceByType(
@@ -1223,7 +1223,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTargetByType(
@@ -1249,7 +1249,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTargetByType(
@@ -1320,7 +1320,7 @@ public abstract class GraphManagerIT {
         assertFalse( "No more edges", results.hasNext() );
 
         //mark the source node
-        gm.deleteNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
+        gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
 
 
         //now re-read, nothing should be there since they're marked
@@ -1402,7 +1402,7 @@ public abstract class GraphManagerIT {
         assertFalse( "No more edges", results.hasNext() );
 
         //mark the source node
-        gm.deleteNode( targetId, edge2.getTimestamp() ).toBlocking().last();
+        gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last();
 
 
         //now re-read, nothing should be there since they're marked
@@ -1641,7 +1641,7 @@ public abstract class GraphManagerIT {
     public void invalidEdgeTypesDelete( ) {
         final GraphManager em = emf.createEdgeManager( scope );
 
-        em.deleteEdge( null );
+        em.markEdge( null );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
index 6f3d388..84825aa 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
@@ -116,16 +116,16 @@ public class StorageGraphManagerIT extends GraphManagerIT 
{
 
 
         @Override
-        public Observable<Edge> deleteEdge( final Edge edge ) {
+        public Observable<Edge> markEdge( final Edge edge ) {
             waitForComplete();
-            return graphManager.deleteEdge( edge );
+            return graphManager.markEdge( edge );
         }
 
 
         @Override
-        public Observable<Id> deleteNode( final Id node, final long timestamp 
) {
+        public Observable<Id> markNode( final Id node, final long timestamp ) {
             waitForComplete();
-            return graphManager.deleteNode( node, timestamp );
+            return graphManager.markNode( node, timestamp );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 68b6f8f..7d4b7f6 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -32,14 +32,12 @@ import org.junit.Test;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.netflix.astyanax.Keyspace;
 
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -79,8 +77,6 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( 
EdgeShardSerialization.class );
 
-        final TaskExecutor taskExecutor = mock( TaskExecutor.class );
-
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -97,7 +93,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
                 new ShardGroupCompactionImpl( timeService, graphFig, 
nodeShardAllocation, shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization, 
taskExecutor );
+                        edgeColumnFamilies, keyspace, edgeShardSerialization );
 
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( 
IdGenerator.createId( "source" ), "test" );

Reply via email to