Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue e09e74932 -> 03eb31246
Adding distributedQueueManager.init() to CpEntityManagerFactry startup, needed to start queue refreshers, time-outers and shard allocators. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/03eb3124 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/03eb3124 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/03eb3124 Branch: refs/heads/usergrid-1318-queue Commit: 03eb31246839bca237c7f761cc4cdece74596bb3 Parents: e09e749 Author: Dave Johnson <snoopd...@apache.org> Authored: Sat Sep 17 13:01:02 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Sat Sep 17 13:01:02 2016 -0400 ---------------------------------------------------------------------- .../usergrid/corepersistence/CpEntityManagerFactory.java | 7 ++++++- .../persistence/actorsystem/ActorSystemManagerImpl.java | 1 - .../persistence/qakka/distributed/actors/QueueActor.java | 7 +++++-- .../persistence/qakka/distributed/actors/QueueRefresher.java | 4 +++- .../apache/usergrid/persistence/queue/guice/QueueModule.java | 1 + .../persistence/queue/impl/QueueManagerFactoryImpl.java | 1 + 6 files changed, 16 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 4bec92d..9caa67e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; @@ -160,10 +161,14 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) ); actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) ); actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) ); - actorSystemManager.start(); actorSystemManager.waitForClientActor(); + DistributedQueueService distributedQueueService = + injector.getInstance( DistributedQueueService.class ); + + distributedQueueService.init(); + } catch (Throwable t) { logger.error("Error starting Akka", t); throw t; http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index ed9344c..9fb39b8 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -390,7 +390,6 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } else { - List<String> regionSeeds = getSeedsByRegion().get( region ); Set<ActorPath> seedPaths = new HashSet<>(20); for ( String seed : getSeedsByRegion().get( region ) ) { seedPaths.add( ActorPaths.fromString( seed + "/system/receptionist") ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 6ecffba..6fed13b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -85,6 +85,7 @@ public class QueueActor extends UntypedActor { getContext().dispatcher(), getSelf()); refreshSchedulersByQueueName.put( request.getQueueName(), scheduler ); + logger.debug("Created refresher for queue {}", request.getQueueName() ); } if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == null ) { @@ -96,6 +97,7 @@ public class QueueActor extends UntypedActor { getContext().dispatcher(), getSelf()); timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler ); + logger.debug("Created scheduler for queue {}", request.getQueueName() ); } if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) { @@ -107,6 +109,7 @@ public class QueueActor extends UntypedActor { getContext().dispatcher(), getSelf()); shardAllocationSchedulersByQueueName.put( request.getQueueName(), scheduler ); + logger.debug("Created shard allocater for queue {}", request.getQueueName() ); } } else if ( message instanceof QueueRefreshRequest ) { @@ -164,8 +167,8 @@ public class QueueActor extends UntypedActor { queueMessages.add( queueMessage ); } } else { - logger.debug("in-memory queue for {} is empty, object is: {}", - queueGetRequest.getQueueName(), inMemoryQueue ); +// logger.debug("in-memory queue for {} is empty, object is: {}", +// queueGetRequest.getQueueName(), inMemoryQueue ); break; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index 96ed658..ebc328f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -76,6 +76,8 @@ public class QueueRefresher extends UntypedActor { QueueRefreshRequest request = (QueueRefreshRequest) message; + logger.debug( "running for queue {}", queueName ); + if (!request.getQueueName().equals( queueName )) { throw new QakkaRuntimeException( "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() ); @@ -108,7 +110,7 @@ public class QueueRefresher extends UntypedActor { } if ( count > 0 ) { - logger.info( "Added {} in-memory for queue {}, new size = {}", + logger.debug( "Added {} in-memory for queue {}, new size = {}", count, queueName, inMemoryQueue.size( queueName ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java index fff187e..f0e0900 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java @@ -42,6 +42,7 @@ public class QueueModule extends AbstractModule { install(new GuicyFigModule(LegacyQueueFig.class)); bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class); + install( new FactoryModuleBuilder().implement(LegacyQueueManager.class, QakkaQueueManager.class) .build(LegacyQueueManagerInternalFactory.class)); http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index c1bdc72..e1bf239 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -53,6 +53,7 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory { if ( queueFig.overrideQueueForDefault() ){ LegacyQueueManager manager = defaultManager.get( scope.getName() ); + logger.info("Using Queue Implemention: {}", manager.getClass().getSimpleName()); if ( manager == null ) { manager = new LocalQueueManager(); defaultManager.put( scope.getName(), manager );