Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 549f637e1 -> 7f2a4bbf4
Wired to indexing service Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7f2a4bbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7f2a4bbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7f2a4bbf Branch: refs/heads/two-dot-o-dev Commit: 7f2a4bbf4e81f18005c57140d4cc49156b2585c9 Parents: 549f637 Author: Todd Nine <[email protected]> Authored: Mon Apr 27 17:09:27 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Apr 27 17:09:27 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/InMemoryAsyncEventService.java | 38 ++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f2a4bbf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 41e98be..8574e89 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.index.IndexService; -import org.apache.usergrid.exception.NotImplementedException; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; @@ -63,13 +62,13 @@ public class InMemoryAsyncEventService implements AsyncEventService { @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { //process the entity immediately //only process the same version, otherwise ignore - log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); + log.debug( "Indexing in app scope {} entity {}", entity, applicationScope ); final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); @@ -80,19 +79,36 @@ public class InMemoryAsyncEventService implements AsyncEventService { @Override public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { - throw new NotImplementedException( "Implement me" ); + + log.debug( "Indexing in app scope {} with entity {} and new edge {}", + new Object[] { entity, applicationScope, newEdge } ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.indexEdge( applicationScope, entity, newEdge ); + + run( edgeObservable ); } @Override public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { - throw new NotImplementedException( "Implement me" ); + log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.deleteIndexEdge( applicationScope, edge ); + + run( edgeObservable ); } @Override public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { - throw new NotImplementedException( "Implement me" ); + log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId ); + + final Observable<IndexOperationMessage> edgeObservable = + indexService.deleteEntityIndexes( applicationScope, entityId ); + + //TODO chain graph operations here + + run( edgeObservable ); } @@ -110,11 +126,13 @@ public class InMemoryAsyncEventService implements AsyncEventService { .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } - public void run( Observable<?> observable ){ - //start it in the background on an i/o thread - if(!resolveSynchronously){ + + public void run( Observable<?> observable ) { + //start it in the background on an i/o thread + if ( !resolveSynchronously ) { observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - }else { + } + else { observable.toBlocking().last(); } }
