Refactor of InMemoryAsyncEventService into EventBuilderImpl. All asyc scheduling mechanisms, will need to invoke this central logic flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f64b29a Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f64b29a Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f64b29a Branch: refs/heads/USERGRID-641 Commit: 3f64b29a44a7e192b5866ce35565b91ea6a838f8 Parents: 2e5937b Author: Todd Nine <[email protected]> Authored: Sun May 10 19:02:57 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Sun May 10 19:02:57 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 9 +- .../asyncevents/AsyncIndexProvider.java | 8 +- .../asyncevents/EventBuilder.java | 105 +++++++++++++ .../asyncevents/EventBuilderImpl.java | 154 +++++++++++++++++++ .../asyncevents/InMemoryAsyncEventService.java | 59 +++---- .../index/InMemoryAsycIndexServiceTest.java | 5 +- 6 files changed, 293 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index f1e1596..a02bffd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -20,6 +20,8 @@ import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.index.IndexServiceImpl; @@ -132,9 +134,14 @@ public class CoreModule extends AbstractModule { bind( IndexService.class ).to( IndexServiceImpl.class ); + + //bind the event handlers + bind( EventBuilder.class).to( EventBuilderImpl.class ); + //bind the queue provider + bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class ); + - bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class ); install( new GuicyFigModule( IndexProcessorFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 9f801b4..ec968af 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -45,6 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EventBuilder eventBuilder; private AsyncEventService asyncEventService; @@ -53,13 +54,15 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EventBuilder eventBuilder ) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.eventBuilder = eventBuilder; } @@ -82,8 +85,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { switch ( impl ) { case LOCAL: - return new InMemoryAsyncEventService( indexService, rxTaskScheduler, - entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously()); + return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously()); case SQS: return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, entityCollectionManagerFactory, rxTaskScheduler ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java new file mode 100644 index 0000000..f48451c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.util.List; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Interface for constructing an observable stream to perform asynchonous events + */ +public interface EventBuilder { + /** + * Return the cold observable of entity index update operations + * @param applicationScope + * @param entity + * @return + */ + Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity ); + + /** + * Return the cold observable of the new edge operation + * @param applicationScope + * @param entity + * @param newEdge + * @return + */ + Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge ); + + /** + * Return the cold observable of the deleted edge operations + * @param applicationScope + * @param edge + * @return + */ + Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge ); + + /** + * Return a ben with 2 obervable streams for entity delete. + * @param applicationScope + * @param entityId + * @return + */ + EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId ); + + /** + * Re-index an entity in the scope provided + * @param entityIdScope + * @return + */ + Observable<IndexOperationMessage> index( EntityIdScope entityIdScope ); + + /** + * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that + * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed + */ + final class EntityDeleteResults { + private final Observable<IndexOperationMessage> indexOperationMessageObservable; + private final Observable<List<MvccLogEntry>> entitiesCompacted; + + + public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable, + final Observable<List<MvccLogEntry>> entitiesCompacted ) { + this.indexOperationMessageObservable = indexOperationMessageObservable; + this.entitiesCompacted = entitiesCompacted; + } + + + public Observable<IndexOperationMessage> getIndexObservable() { + return indexOperationMessageObservable; + } + + + public Observable<List<MvccLogEntry>> getEntitiesCompacted() { + return entitiesCompacted; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java new file mode 100644 index 0000000..c0d82d2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + + +/** + * Service that executes event flows + */ +@Singleton +public class EventBuilderImpl implements EventBuilder { + + private static final Logger log = LoggerFactory.getLogger( EventBuilderImpl.class ); + + private final IndexService indexService; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final GraphManagerFactory graphManagerFactory; + private final SerializationFig serializationFig; + + + @Inject + public EventBuilderImpl( final IndexService indexService, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) { + this.indexService = indexService; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.graphManagerFactory = graphManagerFactory; + this.serializationFig = serializationFig; + } + + + @Override + public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope, + final Entity entity ) { + + //process the entity immediately + //only process the same version, otherwise ignore + + + log.debug( "Indexing in app scope {} entity {}", entity, applicationScope ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); + + return edgeObservable; + } + + + @Override + public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity, + final Edge newEdge ) { + + log.debug( "Indexing in app scope {} with entity {} and new edge {}", + new Object[] { entity, applicationScope, newEdge } ); + + final Observable<IndexOperationMessage> edgeObservable = + indexService.indexEdge( applicationScope, entity, newEdge ); + + return edgeObservable; + } + + + @Override + public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope, + final Edge edge ) { + log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge ); + + final Observable<IndexOperationMessage> edgeObservable = + indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> { + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + gm.deleteEdge( edge ); + } ); + + return edgeObservable; + } + + + @Override + public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId ); + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + + + //observable of index operation messages + final Observable<IndexOperationMessage> edgeObservable = + indexService.deleteEntityIndexes( applicationScope, entityId ); + + + //observable of entries as the batches are deleted + final Observable<List<MvccLogEntry>> entries = + ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() ) + .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) ); + + + return new EntityDeleteResults( edgeObservable, entries ); + } + + + @Override + public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) { + + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + + final Id entityId = entityIdScope.getId(); + + //load the entity + return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) + //perform indexing on the task scheduler and start it + .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 8d0e8c3..6faa695 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -23,13 +23,10 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.index.IndexService; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -39,24 +36,27 @@ import com.google.inject.Singleton; import rx.Observable; +/** + * TODO refactor this implementation into another class. The AsyncEventService impl will then invoke this class + * + * Performs in memory asynchronous execution using a task scheduler to limit throughput via RX. + */ @Singleton public class InMemoryAsyncEventService implements AsyncEventService { private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncEventService.class ); - private final IndexService indexService; + private final EventBuilder eventBuilder; private final RxTaskScheduler rxTaskScheduler; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final boolean resolveSynchronously; + @Inject - public InMemoryAsyncEventService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - boolean resolveSynchronously ) { - this.indexService = indexService; + public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean + resolveSynchronously ) { + this.eventBuilder = eventBuilder; this.rxTaskScheduler = rxTaskScheduler; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.resolveSynchronously = resolveSynchronously; } @@ -68,62 +68,39 @@ public class InMemoryAsyncEventService implements AsyncEventService { //only process the same version, otherwise ignore - log.debug( "Indexing in app scope {} entity {}", entity, applicationScope ); - - final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); - - - run( edgeObservable ); + run( eventBuilder.queueEntityIndexUpdate( applicationScope, entity ) ); } @Override public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { - - log.debug( "Indexing in app scope {} with entity {} and new edge {}", - new Object[] { entity, applicationScope, newEdge } ); - - final Observable<IndexOperationMessage> edgeObservable = indexService.indexEdge( applicationScope, entity, newEdge ); - - run( edgeObservable ); + run( eventBuilder.queueNewEdge( applicationScope, entity, newEdge ) ); } @Override public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { - log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge ); - final Observable<IndexOperationMessage> edgeObservable = indexService.deleteIndexEdge( applicationScope, edge ); - - run( edgeObservable ); + run( eventBuilder.queueDeleteEdge( applicationScope, edge ) ); } @Override public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { - log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId ); - - final Observable<IndexOperationMessage> edgeObservable = - indexService.deleteEntityIndexes( applicationScope, entityId ); - //TODO chain graph operations here + final EventBuilderImpl.EntityDeleteResults results = + eventBuilder.queueEntityDelete( applicationScope, entityId ); - run( edgeObservable ); + run( results.getIndexObservable() ); + run( results.getEntitiesCompacted() ); } @Override public void index( final EntityIdScope entityIdScope ) { - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - - final Id entityId = entityIdScope.getId(); - //load the entity - entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ) - //perform indexing on the task scheduler and start it - .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ) - .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + run(eventBuilder.index( entityIdScope )); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java index 2860c89..77d7cab 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java @@ -25,6 +25,7 @@ import org.junit.runner.RunWith; import org.apache.usergrid.corepersistence.TestIndexModule; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService; import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; @@ -46,7 +47,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { @Inject - public IndexService indexService; + public EventBuilder eventBuilder; @Inject public RxTaskScheduler rxTaskScheduler; @@ -54,7 +55,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { @Override protected AsyncEventService getAsyncEventService() { - return new InMemoryAsyncEventService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false ); + return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false ); }
