Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue e09e74932 -> 03eb31246


Adding distributedQueueManager.init() to CpEntityManagerFactry startup, needed 
to start queue refreshers, time-outers and shard allocators.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/03eb3124
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/03eb3124
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/03eb3124

Branch: refs/heads/usergrid-1318-queue
Commit: 03eb31246839bca237c7f761cc4cdece74596bb3
Parents: e09e749
Author: Dave Johnson <snoopd...@apache.org>
Authored: Sat Sep 17 13:01:02 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Sat Sep 17 13:01:02 2016 -0400

----------------------------------------------------------------------
 .../usergrid/corepersistence/CpEntityManagerFactory.java      | 7 ++++++-
 .../persistence/actorsystem/ActorSystemManagerImpl.java       | 1 -
 .../persistence/qakka/distributed/actors/QueueActor.java      | 7 +++++--
 .../persistence/qakka/distributed/actors/QueueRefresher.java  | 4 +++-
 .../apache/usergrid/persistence/queue/guice/QueueModule.java  | 1 +
 .../persistence/queue/impl/QueueManagerFactoryImpl.java       | 1 +
 6 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4bec92d..9caa67e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.App;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
@@ -160,10 +161,14 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
                 actorSystemManager.registerRouterProducer( 
injector.getInstance( QueueActorRouterProducer.class ) );
                 actorSystemManager.registerRouterProducer( 
injector.getInstance( QueueWriterRouterProducer.class ) );
                 actorSystemManager.registerRouterProducer( 
injector.getInstance( QueueSenderRouterProducer.class ) );
-
                 actorSystemManager.start();
                 actorSystemManager.waitForClientActor();
 
+                DistributedQueueService distributedQueueService =
+                    injector.getInstance( DistributedQueueService.class );
+
+                distributedQueueService.init();
+
             } catch (Throwable t) {
                 logger.error("Error starting Akka", t);
                 throw t;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index ed9344c..9fb39b8 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -390,7 +390,6 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
 
             } else {
 
-                List<String> regionSeeds = getSeedsByRegion().get( region );
                 Set<ActorPath> seedPaths = new HashSet<>(20);
                 for ( String seed : getSeedsByRegion().get( region ) ) {
                     seedPaths.add( ActorPaths.fromString( seed + 
"/system/receptionist") );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 6ecffba..6fed13b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -85,6 +85,7 @@ public class QueueActor extends UntypedActor {
                     getContext().dispatcher(),
                     getSelf());
                 refreshSchedulersByQueueName.put( request.getQueueName(), 
scheduler );
+                logger.debug("Created refresher for queue {}", 
request.getQueueName() );
             }
 
             if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == 
null ) {
@@ -96,6 +97,7 @@ public class QueueActor extends UntypedActor {
                         getContext().dispatcher(),
                         getSelf());
                 timeoutSchedulersByQueueName.put( request.getQueueName(), 
scheduler );
+                logger.debug("Created scheduler for queue {}", 
request.getQueueName() );
             }
 
             if ( shardAllocationSchedulersByQueueName.get( 
request.getQueueName() ) == null ) {
@@ -107,6 +109,7 @@ public class QueueActor extends UntypedActor {
                         getContext().dispatcher(),
                         getSelf());
                 shardAllocationSchedulersByQueueName.put( 
request.getQueueName(), scheduler );
+                logger.debug("Created shard allocater for queue {}", 
request.getQueueName() );
             }
 
         } else if ( message instanceof QueueRefreshRequest ) {
@@ -164,8 +167,8 @@ public class QueueActor extends UntypedActor {
                             queueMessages.add( queueMessage );
                         }
                     } else {
-                        logger.debug("in-memory queue for {} is empty, object 
is: {}",
-                                queueGetRequest.getQueueName(), inMemoryQueue 
);
+//                        logger.debug("in-memory queue for {} is empty, 
object is: {}",
+//                                queueGetRequest.getQueueName(), 
inMemoryQueue );
                         break;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index 96ed658..ebc328f 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -76,6 +76,8 @@ public class QueueRefresher extends UntypedActor {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
 
+            logger.debug( "running for queue {}", queueName );
+
             if (!request.getQueueName().equals( queueName )) {
                 throw new QakkaRuntimeException(
                         "QueueWriter for " + queueName + ": Incorrect 
queueName " + request.getQueueName() );
@@ -108,7 +110,7 @@ public class QueueRefresher extends UntypedActor {
                     }
 
                     if ( count > 0 ) {
-                        logger.info( "Added {} in-memory for queue {}, new 
size = {}",
+                        logger.debug( "Added {} in-memory for queue {}, new 
size = {}",
                                 count, queueName, inMemoryQueue.size( 
queueName ) );
                     }
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index fff187e..f0e0900 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -42,6 +42,7 @@ public class QueueModule extends AbstractModule {
         install(new GuicyFigModule(LegacyQueueFig.class));
 
         
bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
+
         install( new 
FactoryModuleBuilder().implement(LegacyQueueManager.class, 
QakkaQueueManager.class)
             .build(LegacyQueueManagerInternalFactory.class));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/03eb3124/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index c1bdc72..e1bf239 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -53,6 +53,7 @@ public class QueueManagerFactoryImpl implements 
LegacyQueueManagerFactory {
                     if ( queueFig.overrideQueueForDefault() ){
 
                         LegacyQueueManager manager = defaultManager.get( 
scope.getName() );
+                        logger.info("Using Queue Implemention: {}", 
manager.getClass().getSimpleName());
                         if ( manager == null ) {
                             manager = new LocalQueueManager();
                             defaultManager.put( scope.getName(), manager );

Reply via email to