Change read repair to interact with c* directly and only fire and index operation message to get the ES document removed from all regions.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b43bda3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b43bda3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b43bda3 Branch: refs/heads/USERGRID-909 Commit: 1b43bda3f801172b0e59f927ff3ae52a559d36cc Parents: 70d7a95 Author: Michael Russo <[email protected]> Authored: Tue Oct 27 10:50:16 2015 -0700 Committer: Michael Russo <[email protected]> Committed: Tue Oct 27 12:56:13 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/AsyncEventService.java | 5 ++ .../asyncevents/InMemoryAsyncEventService.java | 5 ++ .../read/traverse/AbstractReadGraphFilter.java | 69 +++++++++++++++++--- .../traverse/ReadGraphCollectionFilter.java | 10 ++- .../traverse/ReadGraphConnectionFilter.java | 10 ++- .../impl/stage/NodeDeleteListenerImpl.java | 27 ++++---- 6 files changed, 97 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index dcfffcb..dbf8996 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -77,6 +77,11 @@ public interface AsyncEventService extends ReIndexAction { */ void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId); + /** + * + * @param indexOperationMessage + */ + void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ); /** * current queue depth http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index fc6385c..d8334b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -105,6 +105,11 @@ public class InMemoryAsyncEventService implements AsyncEventService { run( results.getCompactedNode() ); } + @Override + public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage){ + //this is not used locally + } + public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) { final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 9d050c8..89230d7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -20,7 +20,11 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,7 @@ import com.google.common.base.Optional; import rx.Observable; + /** * Command for reading graph edges */ @@ -51,15 +56,21 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, private final GraphManagerFactory graphManagerFactory; private final RxTaskScheduler rxTaskScheduler; + private final EventBuilder eventBuilder; + private final AsyncEventService asyncEventService; /** * Create a new instance of our command */ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory, - final RxTaskScheduler rxTaskScheduler) { + final RxTaskScheduler rxTaskScheduler, + final EventBuilder eventBuilder, + final AsyncEventService asyncEventService ) { this.graphManagerFactory = graphManagerFactory; this.rxTaskScheduler = rxTaskScheduler; + this.eventBuilder = eventBuilder; + this.asyncEventService = asyncEventService; } @@ -107,28 +118,56 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted(); + if (isDeleted) { - if(isDeleted){ logger.trace("Edge {} is deleted, deleting the edge", markedEdge); - graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge); + + indexMessageObservable + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); + } - if(isSourceNodeDeleted){ - final Id sourceNodeId = markedEdge.getSourceNode(); + if (isSourceNodeDeleted) { + final Id sourceNodeId = markedEdge.getSourceNode(); logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId); - graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId); + + entityDeleteResults.getIndexObservable() + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); + + Observable.merge(entityDeleteResults.getEntitiesDeleted(), + entityDeleteResults.getCompactedNode()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()). + subscribe(); + } - if(isTargetNodeDelete){ + if (isTargetNodeDelete) { final Id targetNodeId = markedEdge.getTargetNode(); + logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId); - logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId ); - graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + final EventBuilderImpl.EntityDeleteResults + entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId); + + entityDeleteResults.getIndexObservable() + .compose(applyCollector()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) .subscribe(); + + Observable.merge(entityDeleteResults.getEntitiesDeleted(), + entityDeleteResults.getCompactedNode()) + .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()). + subscribe(); + } @@ -202,4 +241,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, return cursorEdge; } } + + private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() { + + return observable -> observable + .filter((IndexOperationMessage msg) -> !msg.isEmpty()) + .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) + .doOnNext(indexOperation -> { + asyncEventService.queueIndexOperationMessage(indexOperation); + }); + + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java index 1d63bc6..3d7df3b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -41,8 +43,12 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) { - super( graphManagerFactory, rxTaskScheduler ); + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, + final RxTaskScheduler rxTaskScheduler, + final EventBuilder eventBuilder, + final AsyncEventService asyncEventService, + @Assisted final String collectionName ) { + super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService ); this.collectionName = collectionName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java index efe94db..b2d368b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -41,8 +43,12 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) { - super( graphManagerFactory, rxTaskScheduler ); + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, + final RxTaskScheduler rxTaskScheduler, + final EventBuilder eventBuilder, + final AsyncEventService asyncEventService, + @Assisted final String connectionName ) { + super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService ); this.connectionName = connectionName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b43bda3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java index e4eb5fc..343cc77 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java @@ -157,26 +157,26 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener { //get all edges pointing to the target node and buffer then into groups for deletion Observable<MarkedEdge> targetEdges = - getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null, null ) ) - .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getTargetEdges" ) { + getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null)) + .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges") { @Override protected Iterator<MarkedEdge> getIterator() { - return storageSerialization.getEdgesToTarget( scope, - new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); + return storageSerialization.getEdgesToTarget(scope, + new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent())); } - } ) ); + })); //get all edges pointing to the source node and buffer them into groups for deletion Observable<MarkedEdge> sourceEdges = - getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null, null ) ) - .subscribeOn( Schedulers.io() ).flatMap( edgeType -> Observable.create( new ObservableIterator<MarkedEdge>( "getSourceEdges" ) { + getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null)) + .flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges") { @Override protected Iterator<MarkedEdge> getIterator() { - return storageSerialization.getEdgesFromSource( scope, - new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); + return storageSerialization.getEdgesFromSource(scope, + new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent())); } - } ) ); + })); //merge both source and target into 1 observable. We'll need to check them all regardless of order return Observable.merge( targetEdges, sourceEdges ) @@ -235,12 +235,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener { //run both the source/target edge type cleanup, then proceed return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null ) - .flatMap( new Func1<Integer, Observable<MarkedEdge>>() { - @Override - public Observable<MarkedEdge> call( final Integer integer ) { - return Observable.from( markedEdges ); - } - } ); + .flatMap(integer -> Observable.from( markedEdges )); } ); }
