De-couple entity deletes from edge removal from graph.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/032ffd48 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/032ffd48 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/032ffd48 Branch: refs/heads/master Commit: 032ffd48127e600d026446535cfb776f92aac000 Parents: d35fea5 Author: Michael Russo <[email protected]> Authored: Fri Oct 23 12:59:57 2015 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Oct 23 12:59:57 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 7 +++--- .../asyncevents/EventBuilder.java | 25 +++++++++++++++----- .../asyncevents/EventBuilderImpl.java | 10 ++++---- .../asyncevents/InMemoryAsyncEventService.java | 5 ++-- 4 files changed, 31 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 7034a67..fe53776 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -586,9 +586,10 @@ public class AmazonAsyncEventService implements AsyncEventService { entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); - entityDeleteResults - .getEntitiesCompacted() - .collect(() -> new ArrayList<>(), (list, item) -> list.add(item)).toBlocking().lastOrDefault(null); + // Delete the entities and remove from graph separately + entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null); + + entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); return entityDeleteResults.getIndexObservable(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index d246e2f..480756f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -80,17 +80,25 @@ public interface EventBuilder { /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that - * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed + * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed */ final class EntityDeleteResults { private final Observable<IndexOperationMessage> indexOperationMessageObservable; - private final Observable<List<MvccLogEntry>> entitiesCompacted; + private final Observable<List<MvccLogEntry>> entitiesDeleted; + + + + private final Observable<Id> compactedNode; + + public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable, - final Observable<List<MvccLogEntry>> entitiesCompacted ) { + final Observable<List<MvccLogEntry>> entitiesDeleted, + final Observable<Id> compactedNode) { this.indexOperationMessageObservable = indexOperationMessageObservable; - this.entitiesCompacted = entitiesCompacted; + this.entitiesDeleted = entitiesDeleted; + this.compactedNode = compactedNode; } @@ -98,9 +106,14 @@ public interface EventBuilder { return indexOperationMessageObservable; } + public Observable<List<MvccLogEntry>> getEntitiesDeleted() { + return entitiesDeleted; + } - public Observable<List<MvccLogEntry>> getEntitiesCompacted() { - return entitiesCompacted; + public Observable<Id> getCompactedNode() { + return compactedNode; } + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index cc0356b..d819f39 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -128,8 +128,6 @@ public class EventBuilderImpl implements EventBuilder { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - //needs get versions here. - //TODO: change this to be an observable //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete. @@ -155,10 +153,14 @@ public class EventBuilderImpl implements EventBuilder { //observable of entries as the batches are deleted final Observable<List<MvccLogEntry>> entries = ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() ) - .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) ); + .doOnNext( buffer -> ecm.delete( buffer ) ); + + + // observable of the edge delete from graph + final Observable<Id> compactedNode = gm.compactNode(entityId); - return new EntityDeleteResults( edgeObservable, entries ); + return new EntityDeleteResults( edgeObservable, entries, compactedNode ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/032ffd48/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index d5a0398..fc6385c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -22,8 +22,6 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; @@ -103,7 +101,8 @@ public class InMemoryAsyncEventService implements AsyncEventService { eventBuilder.buildEntityDelete( applicationScope, entityId ); run( results.getIndexObservable() ); - run( results.getEntitiesCompacted() ); + run( results.getEntitiesDeleted() ); + run( results.getCompactedNode() ); }
