Move router producer registration to CpEntityManagerFactory
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/79c54997 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/79c54997 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/79c54997 Branch: refs/heads/usergrid-1318-queue Commit: 79c5499799007e24ea1784f6fd8b21f2225c668b Parents: 99dbfc2 Author: Dave Johnson <[email protected]> Authored: Fri Sep 16 14:52:32 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Sep 16 14:52:32 2016 -0400 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 32 ++++++++++++-------- .../impl/DistributedQueueServiceImpl.java | 9 +----- 2 files changed, 20 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/79c54997/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 2a88302..5d8c417 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -60,6 +60,9 @@ import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +116,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final GraphManagerFactory graphManagerFactory; private final CollectionSettingsFactory collectionSettingsFactory; private ActorSystemManager actorSystemManager; - private UniqueValuesService uniqueValuesService; private final LockManager lockManager; private final QueueManagerFactory queueManagerFactory; @@ -128,16 +130,17 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.cassandraService = cassandraService; this.counterUtils = counterUtils; this.injector = injector; - this.reIndexService = injector.getInstance(ReIndexService.class); - this.entityManagerFig = injector.getInstance(EntityManagerFig.class); - this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); - this.managerCache = injector.getInstance( ManagerCache.class ); - this.metricsFactory = injector.getInstance( MetricsFactory.class ); - this.indexService = injector.getInstance( AsyncEventService.class ); - this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); - this.collectionService = injector.getInstance( CollectionService.class ); - this.connectionService = injector.getInstance( ConnectionService.class ); - this.collectionSettingsFactory = injector.getInstance( CollectionSettingsFactory.class ); + + this.reIndexService = injector.getInstance(ReIndexService.class); + this.entityManagerFig = injector.getInstance(EntityManagerFig.class); + this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); + this.managerCache = injector.getInstance( ManagerCache.class ); + this.metricsFactory = injector.getInstance( MetricsFactory.class ); + this.indexService = injector.getInstance( AsyncEventService.class ); + this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); + this.collectionService = injector.getInstance( CollectionService.class ); + this.connectionService = injector.getInstance( ConnectionService.class ); + this.collectionSettingsFactory = injector.getInstance( CollectionSettingsFactory.class ); Properties properties = cassandraService.getProperties(); this.entityManagers = createEntityManagerCache( properties ); @@ -148,10 +151,13 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application try { logger.info("Akka cluster starting..."); - this.uniqueValuesService = injector.getInstance( UniqueValuesService.class ); this.actorSystemManager = injector.getInstance( ActorSystemManager.class ); - actorSystemManager.registerRouterProducer( uniqueValuesService ); + actorSystemManager.registerRouterProducer( injector.getInstance( UniqueValuesService.class ) ); + actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) ); + actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) ); + actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) ); + actorSystemManager.start(); actorSystemManager.waitForClientActor(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/79c54997/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 0b9cf59..1243c23 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -56,18 +56,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { public DistributedQueueServiceImpl( ActorSystemManager actorSystemManager, QueueManager queueManager, - QakkaFig qakkaFig, - QueueActorRouterProducer queueActorRouterProducer, - QueueWriterRouterProducer queueWriterRouterProducer, - QueueSenderRouterProducer queueSenderRouterProducer ) { + QakkaFig qakkaFig ) { this.actorSystemManager = actorSystemManager; this.queueManager = queueManager; this.qakkaFig = qakkaFig; - - actorSystemManager.registerRouterProducer( queueActorRouterProducer ); - actorSystemManager.registerRouterProducer( queueWriterRouterProducer ); - actorSystemManager.registerRouterProducer( queueSenderRouterProducer ); }
