Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue c08e8f0e7 -> 5a19ba9a7
Implement configurable long polling for Qakka queue gets Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/434e53e7 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/434e53e7 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/434e53e7 Branch: refs/heads/usergrid-1318-queue Commit: 434e53e7eed46fbdc3cdeafde9464c90101fa7e3 Parents: c08e8f0 Author: Dave Johnson <[email protected]> Authored: Tue Sep 20 12:18:33 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Sep 20 12:18:33 2016 -0400 ---------------------------------------------------------------------- .../actorsystem/ActorSystemManagerImpl.java | 2 +- .../apache/usergrid/persistence/qakka/App.java | 25 ++++++--- .../usergrid/persistence/qakka/QakkaFig.java | 13 +++++ .../impl/DistributedQueueServiceImpl.java | 23 ++++++++- .../qakka/common/CassandraClientTest.java | 46 ----------------- .../qakka/core/CassandraClientTest.java | 46 +++++++++++++++++ .../qakka/core/QueueMessageManagerTest.java | 3 +- .../distributed/QueueActorServiceTest.java | 8 +-- .../queue/src/test/resources/qakka.properties | 4 ++ .../services/notifications/QueueListener.java | 54 ++++++++++++-------- 10 files changed, 140 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 9fb39b8..3fc191d 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -333,7 +333,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager { /** * Create cluster system for this the current region */ - private ActorSystem createClusterSystem( Config config ) { + private synchronized ActorSystem createClusterSystem( Config config ) { // there is only 1 akka system for a Usergrid cluster final String clusterName = getClusterName(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java index 41bc6fa..abbf3da 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java @@ -22,11 +22,16 @@ package org.apache.usergrid.persistence.qakka; import com.codahale.metrics.MetricRegistry; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.core.migration.schema.MigrationException; import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; +import org.apache.usergrid.persistence.qakka.core.Queue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +40,7 @@ import org.slf4j.LoggerFactory; /** * Akka queueing application */ +@Singleton public class App implements MetricsService { private static final Logger logger = LoggerFactory.getLogger( App.class ); @@ -50,6 +56,7 @@ public class App implements MetricsService { @Inject public App( Injector injector, + QakkaFig qakkaFig, ActorSystemFig actorSystemFig, ActorSystemManager actorSystemManager, DistributedQueueService distributedQueueService, @@ -59,12 +66,18 @@ public class App implements MetricsService { this.actorSystemFig = actorSystemFig; this.actorSystemManager = actorSystemManager; this.distributedQueueService = distributedQueueService; -// -// try { -// migrationManager.migrate(); -// } catch (MigrationException e) { -// throw new QakkaRuntimeException( "Error running migration", e ); -// } + + if ( qakkaFig.getStandalone() ) { + + try { + migrationManager.migrate(); + } catch (MigrationException e) { + throw new QakkaRuntimeException( "Error running migration", e ); + } + actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) ); + actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) ); + actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) ); + } } /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 3b901b2..aa4e349 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -30,6 +30,8 @@ import java.io.Serializable; @FigSingleton public interface QakkaFig extends GuicyFig, Serializable { + String QUEUE_STANDALONE = "queue.standalone"; + String QUEUE_NUM_ACTORS = "queue.num.actors"; String QUEUE_SENDER_NUM_ACTORS = "queue.sender.num.actors"; @@ -58,6 +60,13 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_SHARD_MAX_SIZE = "queue.shard.max.size"; + String QUEUE_LONG_POLL_TIME_MILLIS = "queue.long.polling.time.millis"; + + + /** True if Qakka is running standlone */ + @Key(QUEUE_STANDALONE) + @Default("false") + boolean getStandalone(); /** Queue senders send to queue writers */ @Key(QUEUE_SENDER_NUM_ACTORS) @@ -128,4 +137,8 @@ public interface QakkaFig extends GuicyFig, Serializable { @Key(QUEUE_SHARD_MAX_SIZE) @Default("400000") long getMaxShardSize(); + + @Key(QUEUE_LONG_POLL_TIME_MILLIS) + @Default("5000") + long getLongPollTimeMillis(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 4737347..be20cde 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -25,6 +25,7 @@ import akka.util.Timeout; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.log4j.net.SyslogAppender; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.ClientActor; import org.apache.usergrid.persistence.qakka.QakkaFig; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; @@ -107,7 +109,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void refreshQueue(String queueName) { - logger.info("Refreshing queue: {}", queueName); + logger.info("{} Requesting refresh for queue: {}", this, queueName); QueueRefreshRequest request = new QueueRefreshRequest( queueName ); ActorRef clientActor = actorSystemManager.getClientActor(); clientActor.tell( request, null ); @@ -186,6 +188,25 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) { + List<DatabaseQueueMessage> ret = new ArrayList<>(); + + long startTime = System.currentTimeMillis(); + + while ( ret.size() < count + && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) { + + ret.addAll( getNextMessagesInternal( queueName, count )); + + if ( ret.size() < count ) { + try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {} + } + } + + return ret; + } + + + public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) { List<String> queueNames = queueManager.getListOfQueues(); if ( !queueNames.contains( queueName ) ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java deleted file mode 100644 index e1f0c7e..0000000 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.qakka.common; - -import com.datastax.driver.core.Session; -import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; -import org.apache.usergrid.persistence.qakka.AbstractTest; -import org.apache.usergrid.persistence.qakka.core.CassandraClient; -import org.junit.Test; - - -/** - * Created by russo on 6/8/16. - */ -public class CassandraClientTest extends AbstractTest { - - @Test - public void getClient(){ - - CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - - Session session = cassandraClient.getApplicationSession(); - - session.getLoggedKeyspace(); - - } - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java new file mode 100644 index 0000000..416de0e --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.datastax.driver.core.Session; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.junit.Test; + + +/** + * Created by russo on 6/8/16. + */ +public class CassandraClientTest extends AbstractTest { + + @Test + public void getClient(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + + Session session = cassandraClient.getApplicationSession(); + + session.getLoggedKeyspace(); + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 124cb86..0413f81 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -171,8 +171,7 @@ public class QueueMessageManagerTest extends AbstractTest { int maxRetries = 15; int retries = 0; while ( retries++ < maxRetries ) { - //distributedQueueService.refresh(); - Thread.sleep( 1000 ); + distributedQueueService.refresh(); if (inMemoryQueue.size( queueName ) == 40) { break; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 0883650..7423424 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -65,8 +65,6 @@ public class QueueActorServiceTest extends AbstractTest { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -117,8 +115,6 @@ public class QueueActorServiceTest extends AbstractTest { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -151,16 +147,16 @@ public class QueueActorServiceTest extends AbstractTest { queueName, region, region, messageId , null, null); } - int maxRetries = 15; + int maxRetries = 25; int retries = 0; int count = 0; while ( retries++ < maxRetries ) { - Thread.sleep( 1000 ); distributedQueueService.refresh(); if (inMemoryQueue.size( queueName ) == 100) { count = 100; break; } + Thread.sleep(1000); } Assert.assertEquals( 100, count ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index dc7ef48..aacc187 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -18,6 +18,8 @@ # Properties for JUnit tests +queue.standalone=true + usergrid.cluster_name=Test Cluster usergrid.cluster.hostname=localhost @@ -43,6 +45,8 @@ queue.shard.allocation.advance.time.millis=200 queue.max.inmemory.shard.counter = 100 +queue.long.polling.time.millis=2000 + cassandra.hosts=localhost cassandra.keyspace.application=qakka_test_application http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index 20fbd84..796450b 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -72,12 +72,14 @@ public class QueueListener { private int consecutiveCallsToRemoveDevices; public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){ - this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class); + this.queueManagerFactory = + smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class); this.smf = smf; this.emf = emf; this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class); this.properties = props; - this.applicationQueueManagerCache = smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class); + this.applicationQueueManagerCache = + smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class); } @@ -94,12 +96,15 @@ public class QueueListener { try { - sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP)); - sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP)); - consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200)); + sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP)); + sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP)); + + consecutiveCallsToRemoveDevices = + new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200)); queueName = ApplicationQueueManagerImpl.getQueueNames(properties); - int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS)); + int maxThreads = + new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS)); futures = new ArrayList<>(maxThreads); @@ -166,6 +171,13 @@ public class QueueListener { while ( true ) { + if(sleepBetweenRuns > 0) { + if (logger.isTraceEnabled()) { + logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns); + } + try { Thread.sleep(sleepBetweenRuns); } catch (InterruptedException ignored) { } + } + Timer.Context timerContext = timer.time(); rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class)) .buffer(MAX_TAKE) @@ -173,7 +185,7 @@ public class QueueListener { try { if (logger.isTraceEnabled()) { - logger.trace("retrieved batch of {} messages from queue {}", messages.size(), queueName); + logger.trace("retrieved batch of {} messages from queue {}",messages.size(),queueName); } if (messages.size() > 0) { @@ -185,12 +197,13 @@ public class QueueListener { ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody(); UUID applicationId = queueMessage.getApplicationId(); - //Groups queue messages by application Id, ( they are all probably going to the same place ) + // Groups queue messages by application Id, + // (they are all probably going to the same place) if (!messageMap.containsKey(applicationId)) { //For each app id it sends the set. - List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>(); - applicationQueueMessages.add(message); - messageMap.put(applicationId, applicationQueueMessages); + List<LegacyQueueMessage> lqms = new ArrayList<LegacyQueueMessage>(); + lqms.add(message); + messageMap.put(applicationId, lqms); } else { messageMap.get(applicationId).add(message); } @@ -207,13 +220,15 @@ public class QueueListener { .getApplicationQueueManager( emf.getEntityManager(applicationId), legacyQueueManager, - new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)), + new JobScheduler(smf.getServiceManager(applicationId), + emf.getEntityManager(applicationId)), metricsService, properties ); if (logger.isTraceEnabled()) { - logger.trace("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size()); + logger.trace("send batch for app {} of {} messages", + entry.getKey(), entry.getValue().size()); } Observable current = manager.sendBatchToProviders(entry.getValue(),queueName); @@ -231,20 +246,15 @@ public class QueueListener { meter.mark(messages.size()); if (logger.isTraceEnabled()) { - logger.trace("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now); + logger.trace("sent batch {} messages duration {} ms", + messages.size(), System.currentTimeMillis() - now); } - if(sleepBetweenRuns > 0) { - if (logger.isTraceEnabled()) { - logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns); - } - Thread.sleep(sleepBetweenRuns); - } if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){ - for(ApplicationQueueManager applicationQueueManager : applicationQueueManagerCache.asMap().values()){ + for(ApplicationQueueManager aqm : applicationQueueManagerCache.asMap().values()){ try { - applicationQueueManager.asyncCheckForInactiveDevices(); + aqm.asyncCheckForInactiveDevices(); }catch (Exception inactiveDeviceException){ logger.error("Inactive Device Get failed",inactiveDeviceException); }
