Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue c08e8f0e7 -> 5a19ba9a7


Implement configurable long polling for Qakka queue gets


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/434e53e7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/434e53e7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/434e53e7

Branch: refs/heads/usergrid-1318-queue
Commit: 434e53e7eed46fbdc3cdeafde9464c90101fa7e3
Parents: c08e8f0
Author: Dave Johnson <[email protected]>
Authored: Tue Sep 20 12:18:33 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Tue Sep 20 12:18:33 2016 -0400

----------------------------------------------------------------------
 .../actorsystem/ActorSystemManagerImpl.java     |  2 +-
 .../apache/usergrid/persistence/qakka/App.java  | 25 ++++++---
 .../usergrid/persistence/qakka/QakkaFig.java    | 13 +++++
 .../impl/DistributedQueueServiceImpl.java       | 23 ++++++++-
 .../qakka/common/CassandraClientTest.java       | 46 -----------------
 .../qakka/core/CassandraClientTest.java         | 46 +++++++++++++++++
 .../qakka/core/QueueMessageManagerTest.java     |  3 +-
 .../distributed/QueueActorServiceTest.java      |  8 +--
 .../queue/src/test/resources/qakka.properties   |  4 ++
 .../services/notifications/QueueListener.java   | 54 ++++++++++++--------
 10 files changed, 140 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 9fb39b8..3fc191d 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -333,7 +333,7 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
     /**
      * Create cluster system for this the current region
      */
-    private ActorSystem createClusterSystem( Config config ) {
+    private synchronized ActorSystem createClusterSystem( Config config ) {
 
         // there is only 1 akka system for a Usergrid cluster
         final String clusterName = getClusterName();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index 41bc6fa..abbf3da 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -22,11 +22,16 @@ package org.apache.usergrid.persistence.qakka;
 import com.codahale.metrics.MetricRegistry;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import 
org.apache.usergrid.persistence.core.migration.schema.MigrationException;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.qakka.core.Queue;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +40,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Akka queueing application
  */
+@Singleton
 public class App implements MetricsService {
     private static final Logger logger = LoggerFactory.getLogger( App.class );
 
@@ -50,6 +56,7 @@ public class App implements MetricsService {
     @Inject
     public App(
             Injector                  injector,
+            QakkaFig                  qakkaFig,
             ActorSystemFig            actorSystemFig,
             ActorSystemManager        actorSystemManager,
             DistributedQueueService   distributedQueueService,
@@ -59,12 +66,18 @@ public class App implements MetricsService {
         this.actorSystemFig = actorSystemFig;
         this.actorSystemManager = actorSystemManager;
         this.distributedQueueService = distributedQueueService;
-//
-//        try {
-//            migrationManager.migrate();
-//        } catch (MigrationException e) {
-//            throw new QakkaRuntimeException( "Error running migration", e );
-//        }
+
+        if ( qakkaFig.getStandalone() ) {
+
+            try {
+                migrationManager.migrate();
+            } catch (MigrationException e) {
+                throw new QakkaRuntimeException( "Error running migration", e 
);
+            }
+            actorSystemManager.registerRouterProducer( injector.getInstance( 
QueueActorRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( injector.getInstance( 
QueueWriterRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( injector.getInstance( 
QueueSenderRouterProducer.class ) );
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 3b901b2..aa4e349 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -30,6 +30,8 @@ import java.io.Serializable;
 @FigSingleton
 public interface QakkaFig extends GuicyFig, Serializable {
 
+    String QUEUE_STANDALONE                       = "queue.standalone";
+
     String QUEUE_NUM_ACTORS                       = "queue.num.actors";
 
     String QUEUE_SENDER_NUM_ACTORS                = "queue.sender.num.actors";
@@ -58,6 +60,13 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_SHARD_MAX_SIZE                   = "queue.shard.max.size";
 
+    String QUEUE_LONG_POLL_TIME_MILLIS            = 
"queue.long.polling.time.millis";
+
+
+    /** True if Qakka is running standlone */
+    @Key(QUEUE_STANDALONE)
+    @Default("false")
+    boolean getStandalone();
 
     /** Queue senders send to queue writers */
     @Key(QUEUE_SENDER_NUM_ACTORS)
@@ -128,4 +137,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_SHARD_MAX_SIZE)
     @Default("400000")
     long getMaxShardSize();
+
+    @Key(QUEUE_LONG_POLL_TIME_MILLIS)
+    @Default("5000")
+    long getLongPollTimeMillis();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 4737347..be20cde 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.log4j.net.SyslogAppender;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -107,7 +109,7 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
 
     @Override
     public void refreshQueue(String queueName) {
-        logger.info("Refreshing queue: {}", queueName);
+        logger.info("{} Requesting refresh for queue: {}", this, queueName);
         QueueRefreshRequest request = new QueueRefreshRequest( queueName );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );
@@ -186,6 +188,25 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
 
     @Override
     public Collection<DatabaseQueueMessage> getNextMessages( String queueName, 
int count ) {
+        List<DatabaseQueueMessage> ret = new ArrayList<>();
+
+        long startTime = System.currentTimeMillis();
+
+        while ( ret.size() < count
+            && System.currentTimeMillis() - startTime < 
qakkaFig.getLongPollTimeMillis()) {
+
+            ret.addAll( getNextMessagesInternal( queueName, count ));
+
+            if ( ret.size() < count ) {
+                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } 
catch (Exception ignored) {}
+            }
+        }
+
+        return ret;
+    }
+
+
+    public Collection<DatabaseQueueMessage> getNextMessagesInternal( String 
queueName, int count ) {
 
         List<String> queueNames = queueManager.getListOfQueues();
         if ( !queueNames.contains( queueName ) ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
deleted file mode 100644
index e1f0c7e..0000000
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.qakka.common;
-
-import com.datastax.driver.core.Session;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.junit.Test;
-
-
-/**
- * Created by russo on 6/8/16.
- */
-public class CassandraClientTest extends AbstractTest {
-
-    @Test
-    public void getClient(){
-
-        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
-
-        Session session = cassandraClient.getApplicationSession();
-
-        session.getLoggedKeyspace();
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
new file mode 100644
index 0000000..416de0e
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class CassandraClientTest extends AbstractTest {
+
+    @Test
+    public void getClient(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+
+        Session session = cassandraClient.getApplicationSession();
+
+        session.getLoggedKeyspace();
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 124cb86..0413f81 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -171,8 +171,7 @@ public class QueueMessageManagerTest extends AbstractTest {
         int maxRetries = 15;
         int retries = 0;
         while ( retries++ < maxRetries ) {
-            //distributedQueueService.refresh();
-            Thread.sleep( 1000 );
+            distributedQueueService.refresh();
             if (inMemoryQueue.size( queueName ) == 40) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 0883650..7423424 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -65,8 +65,6 @@ public class QueueActorServiceTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( 
CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = injector.getInstance( 
ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -117,8 +115,6 @@ public class QueueActorServiceTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( 
CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = injector.getInstance( 
ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -151,16 +147,16 @@ public class QueueActorServiceTest extends AbstractTest {
                     queueName, region, region, messageId , null, null);
         }
 
-        int maxRetries = 15;
+        int maxRetries = 25;
         int retries = 0;
         int count = 0;
         while ( retries++ < maxRetries ) {
-            Thread.sleep( 1000 );
             distributedQueueService.refresh();
             if (inMemoryQueue.size( queueName ) == 100) {
                 count = 100;
                 break;
             }
+            Thread.sleep(1000);
         }
 
         Assert.assertEquals( 100, count );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties 
b/stack/corepersistence/queue/src/test/resources/qakka.properties
index dc7ef48..aacc187 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -18,6 +18,8 @@
 
 # Properties for JUnit tests
 
+queue.standalone=true
+
 usergrid.cluster_name=Test Cluster
 
 usergrid.cluster.hostname=localhost
@@ -43,6 +45,8 @@ queue.shard.allocation.advance.time.millis=200
 
 queue.max.inmemory.shard.counter = 100
 
+queue.long.polling.time.millis=2000
+
 cassandra.hosts=localhost
 
 cassandra.keyspace.application=qakka_test_application

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 20fbd84..796450b 100644
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ 
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -72,12 +72,14 @@ public class QueueListener  {
     private int consecutiveCallsToRemoveDevices;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, 
Properties props){
-        this.queueManagerFactory = smf.getApplicationContext().getBean( 
Injector.class ).getInstance(LegacyQueueManagerFactory.class);
+        this.queueManagerFactory =
+            smf.getApplicationContext().getBean( Injector.class 
).getInstance(LegacyQueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = smf.getApplicationContext().getBean( 
Injector.class ).getInstance(MetricsFactory.class);
         this.properties = props;
-        this.applicationQueueManagerCache = 
smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+        this.applicationQueueManagerCache =
+            
smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
 
     }
 
@@ -94,12 +96,15 @@ public class QueueListener  {
 
             try {
 
-                sleepBetweenRuns = new 
Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
-                sleepWhenNoneFound = new 
Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
-                consecutiveCallsToRemoveDevices = new 
Integer(properties.getProperty("usergrid.notifications.inactive.interval", 
""+200));
+                sleepBetweenRuns = new 
Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+                sleepWhenNoneFound = new 
Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+
+                consecutiveCallsToRemoveDevices =
+                    new 
Integer(properties.getProperty("usergrid.notifications.inactive.interval", 
""+200));
                 queueName = 
ApplicationQueueManagerImpl.getQueueNames(properties);
 
-                int maxThreads = new 
Integer(properties.getProperty("usergrid.push.worker_count", 
""+PUSH_CONSUMER_MAX_THREADS));
+                int maxThreads =
+                    new 
Integer(properties.getProperty("usergrid.push.worker_count", 
""+PUSH_CONSUMER_MAX_THREADS));
 
                 futures = new ArrayList<>(maxThreads);
 
@@ -166,6 +171,13 @@ public class QueueListener  {
 
         while ( true ) {
 
+                if(sleepBetweenRuns > 0) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("sleep between rounds...sleep...{}", 
sleepBetweenRuns);
+                    }
+                    try { Thread.sleep(sleepBetweenRuns); } catch 
(InterruptedException ignored) { }
+                }
+
                 Timer.Context timerContext = timer.time();
                 rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, 
ApplicationQueueMessage.class))
                     .buffer(MAX_TAKE)
@@ -173,7 +185,7 @@ public class QueueListener  {
 
                         try {
                             if (logger.isTraceEnabled()) {
-                                logger.trace("retrieved batch of {} messages 
from queue {}", messages.size(), queueName);
+                                logger.trace("retrieved batch of {} messages 
from queue {}",messages.size(),queueName);
                             }
 
                             if (messages.size() > 0) {
@@ -185,12 +197,13 @@ public class QueueListener  {
                                     ApplicationQueueMessage queueMessage = 
(ApplicationQueueMessage) message.getBody();
                                     UUID applicationId = 
queueMessage.getApplicationId();
 
-                                    //Groups queue messages by application Id, 
( they are all probably going to the same place )
+                                    // Groups queue messages by application Id,
+                                    // (they are all probably going to the 
same place)
                                     if 
(!messageMap.containsKey(applicationId)) {
                                         //For each app id it sends the set.
-                                        List<LegacyQueueMessage> 
applicationQueueMessages = new ArrayList<LegacyQueueMessage>();
-                                        applicationQueueMessages.add(message);
-                                        messageMap.put(applicationId, 
applicationQueueMessages);
+                                        List<LegacyQueueMessage> lqms = new 
ArrayList<LegacyQueueMessage>();
+                                        lqms.add(message);
+                                        messageMap.put(applicationId, lqms);
                                     } else {
                                         
messageMap.get(applicationId).add(message);
                                     }
@@ -207,13 +220,15 @@ public class QueueListener  {
                                         .getApplicationQueueManager(
                                             
emf.getEntityManager(applicationId),
                                             legacyQueueManager,
-                                            new 
JobScheduler(smf.getServiceManager(applicationId), 
emf.getEntityManager(applicationId)),
+                                            new 
JobScheduler(smf.getServiceManager(applicationId),
+                                                             
emf.getEntityManager(applicationId)),
                                             metricsService,
                                             properties
                                         );
 
                                     if (logger.isTraceEnabled()) {
-                                        logger.trace("send batch for app {} of 
{} messages", entry.getKey(), entry.getValue().size());
+                                        logger.trace("send batch for app {} of 
{} messages",
+                                            entry.getKey(), 
entry.getValue().size());
                                     }
                                     Observable current = 
manager.sendBatchToProviders(entry.getValue(),queueName);
 
@@ -231,20 +246,15 @@ public class QueueListener  {
 
                                 meter.mark(messages.size());
                                 if (logger.isTraceEnabled()) {
-                                    logger.trace("sent batch {} messages 
duration {} ms", messages.size(), System.currentTimeMillis() - now);
+                                    logger.trace("sent batch {} messages 
duration {} ms",
+                                        messages.size(), 
System.currentTimeMillis() - now);
                                 }
 
-                                if(sleepBetweenRuns > 0) {
-                                    if (logger.isTraceEnabled()) {
-                                        logger.trace("sleep between 
rounds...sleep...{}", sleepBetweenRuns);
-                                    }
-                                    Thread.sleep(sleepBetweenRuns);
-                                }
 
                                 if(runCount.incrementAndGet() % 
consecutiveCallsToRemoveDevices == 0){
-                                    for(ApplicationQueueManager 
applicationQueueManager : applicationQueueManagerCache.asMap().values()){
+                                    for(ApplicationQueueManager aqm : 
applicationQueueManagerCache.asMap().values()){
                                         try {
-                                            
applicationQueueManager.asyncCheckForInactiveDevices();
+                                            aqm.asyncCheckForInactiveDevices();
                                         }catch (Exception 
inactiveDeviceException){
                                             logger.error("Inactive Device Get 
failed",inactiveDeviceException);
                                         }

Reply via email to