Remove explicit init message and option to turn off in-memory queue cache.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/522a5515 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/522a5515 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/522a5515 Branch: refs/heads/usergrid-1318-queue Commit: 522a5515baf016bafaf6ee5e819dfa4437fc1412 Parents: 521047c Author: Dave Johnson <[email protected]> Authored: Thu Nov 3 17:56:03 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Thu Nov 3 17:56:03 2016 -0400 ---------------------------------------------------------------------- .../qakka/core/impl/QueueManagerImpl.java | 4 +- .../distributed/DistributedQueueService.java | 4 - .../qakka/distributed/actors/QueueActor.java | 93 ++------------ .../distributed/actors/QueueActorHelper.java | 126 ++++++++++++++++++- .../distributed/actors/QueueActorRouter.java | 76 ++++++++++- .../distributed/actors/QueueRefresher.java | 95 +------------- .../impl/DistributedQueueServiceImpl.java | 55 +++----- 7 files changed, 227 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java index d51fe2d..88d307c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java @@ -96,7 +96,7 @@ public class QueueManagerImpl implements QueueManager { messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.DEFAULT, 0L ); messageCounterSerialization.incrementCounter( queue.getName(), DatabaseQueueMessage.Type.INFLIGHT, 0L ); - distributedQueueService.initQueue( queue.getName() ); + //distributedQueueService.initQueue( queue.getName() ); distributedQueueService.refreshQueue( queue.getName() ); } @@ -105,7 +105,7 @@ public class QueueManagerImpl implements QueueManager { queueSerialization.writeQueue(queue.toDatabaseQueue()); - distributedQueueService.initQueue( queue.getName() ); + //distributedQueueService.initQueue( queue.getName() ); distributedQueueService.refreshQueue( queue.getName() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java index b11dcff..984cea2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java @@ -34,8 +34,6 @@ public interface DistributedQueueService { void init(); - void initQueue(String queueName); - void refresh(); void shutdown(); @@ -57,6 +55,4 @@ public interface DistributedQueueService { Status ackMessage(String queueName, UUID messageId); Status requeueMessage(String queueName, UUID messageId); - - Status clearMessages(String queueName); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 248f9cd..92e8607 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -20,121 +20,52 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.ActorRef; -import akka.actor.Cancellable; import akka.actor.Props; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; -import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; -import org.apache.usergrid.persistence.qakka.QakkaFig; -import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; -import java.text.DecimalFormat; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; public class QueueActor extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueActor.class ); - private final QakkaFig qakkaFig; - private final InMemoryQueue inMemoryQueue; private final QueueActorHelper queueActorHelper; private final MetricsService metricsService; - private final MessageCounterSerialization messageCounterSerialization; - private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>(); - private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>(); - private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>(); - - private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>(); + //private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>(); private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>(); private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>(); - private final Set<String> queuesSeen = new HashSet<>(); - @Inject public QueueActor( - QakkaFig qakkaFig, - InMemoryQueue inMemoryQueue, QueueActorHelper queueActorHelper, - MetricsService metricsService, - MessageCounterSerialization messageCounterSerialization + MetricsService metricsService ) { - this.qakkaFig = qakkaFig; - this.inMemoryQueue = inMemoryQueue; this.queueActorHelper = queueActorHelper; this.metricsService = metricsService; - this.messageCounterSerialization = messageCounterSerialization; } @Override public void onReceive(Object message) { - if ( message instanceof QueueInitRequest) { - QueueInitRequest request = (QueueInitRequest)message; - - queuesSeen.add( request.getQueueName() ); - - if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) { - Cancellable scheduler = getContext().system().scheduler().schedule( - Duration.create( 0, TimeUnit.MILLISECONDS), - Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS), - self(), - new QueueRefreshRequest( request.getQueueName(), false ), - getContext().dispatcher(), - getSelf()); - refreshSchedulersByQueueName.put( request.getQueueName(), scheduler ); - logger.debug("Created refresher for queue {}", request.getQueueName() ); - } - - if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == null ) { - Cancellable scheduler = getContext().system().scheduler().schedule( - Duration.create( 0, TimeUnit.MILLISECONDS), - Duration.create( qakkaFig.getQueueTimeoutSeconds()/2, TimeUnit.SECONDS), - self(), - new QueueTimeoutRequest( request.getQueueName() ), - getContext().dispatcher(), - getSelf()); - timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler ); - logger.debug("Created timeouter for queue {}", request.getQueueName() ); - } - - if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) { - Cancellable scheduler = getContext().system().scheduler().schedule( - Duration.create( 0, TimeUnit.MILLISECONDS), - Duration.create( qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS), - self(), - new ShardCheckRequest( request.getQueueName() ), - getContext().dispatcher(), - getSelf()); - shardAllocationSchedulersByQueueName.put( request.getQueueName(), scheduler ); - logger.debug("Created shard allocater for queue {}", request.getQueueName() ); - } - - - } else if ( message instanceof QueueRefreshRequest ) { + if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest)message; - queuesSeen.add( request.getQueueName() ); - -// // NOT asynchronous -// queueActorHelper.queueRefresh( request.getQueueName() ); - if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { + // NOT asynchronous because we want this to happen locally in this JVM + queueActorHelper.queueRefresh( request.getQueueName() ); + /* if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { ActorRef readerRef = getContext().actorOf( Props.create( GuiceActorProducer.class, QueueRefresher.class ), @@ -142,16 +73,13 @@ public class QueueActor extends UntypedActor { queueReadersByQueueName.put( request.getQueueName(), readerRef ); } } - // hand-off to queue's reader - queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); + queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); */ } else if ( message instanceof QueueTimeoutRequest ) { QueueTimeoutRequest request = (QueueTimeoutRequest)message; - queuesSeen.add( request.getQueueName() ); - if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( Props.create( GuiceActorProducer.class, QueueTimeouter.class), @@ -166,8 +94,6 @@ public class QueueActor extends UntypedActor { } else if ( message instanceof ShardCheckRequest ) { ShardCheckRequest request = (ShardCheckRequest)message; - queuesSeen.add( request.getQueueName() ); - if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( Props.create( GuiceActorProducer.class, ShardAllocator.class), @@ -186,12 +112,11 @@ public class QueueActor extends UntypedActor { String queueName = queueGetRequest.getQueueName(); int numRequested = queueGetRequest.getNumRequested(); - queuesSeen.add( queueName ); - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time(); try { Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested); + logger.trace("Returning queue {} messages {}", queueName, messages.size() ); getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() ); @@ -206,4 +131,6 @@ public class QueueActor extends UntypedActor { } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index 6382661..4696a67 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import com.codahale.metrics.Timer; import com.google.inject.Inject; +import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; @@ -37,12 +38,10 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Optional; -import java.util.UUID; +import java.util.*; +@Singleton public class QueueActorHelper { private static final Logger logger = LoggerFactory.getLogger( QueueActorHelper.class ); @@ -54,6 +53,8 @@ public class QueueActorHelper { private final MetricsService metricsService; private final CassandraClient cassandraClient; + Map<String, Long> startingShards = new HashMap<>(); + @Inject public QueueActorHelper( @@ -97,6 +98,16 @@ public class QueueActorHelper { Collection<DatabaseQueueMessage> getMessages(String queueName, int numRequested ) { + if ( qakkaFig.getInMemoryCache() ) { + return getMessagesFromMemory( queueName, numRequested ); + } else { + return getMessagesFromStorage( queueName, numRequested ); + } + } + + + Collection<DatabaseQueueMessage> getMessagesFromMemory(String queueName, int numRequested ) { + Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); while (queueMessages.size() < numRequested) { @@ -117,7 +128,51 @@ public class QueueActorHelper { //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName); return queueMessages; + } + + + Collection<DatabaseQueueMessage> getMessagesFromStorage(String queueName, int numRequested ) { + + Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); + + final Optional shardIdOptional; + final String shardKey = + createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); + + Long shardId = startingShards.get( shardKey ); + if ( shardId != null ) { + shardIdOptional = Optional.of( shardId ); + } else { + shardIdOptional = Optional.empty(); + } + + String region = actorSystemFig.getRegionLocal(); + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, region, Shard.Type.DEFAULT, shardIdOptional ); + + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); + + int count = 0; + + while ( multiShardIterator.hasNext() && count < numRequested ) { + DatabaseQueueMessage queueMessage = multiShardIterator.next(); + if ( queueMessage != null && putInflight( queueMessage ) ) { + queueMessages.add( queueMessage ); + count++; + } + } + + Shard currentShard = multiShardIterator.getCurrentShard(); + if ( currentShard != null ) { + shardId = currentShard.getShardId(); + startingShards.put( shardKey, shardId ); + } + + //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName); + return queueMessages; } @@ -208,4 +263,67 @@ public class QueueActorHelper { return DistributedQueueService.Status.ERROR; } } + + + void queueRefresh( String queueName ) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); + + try { + + if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + + final Optional shardIdOptional; + final String shardKey = + createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); + Long shardId = startingShards.get( shardKey ); + + if ( shardId != null ) { + shardIdOptional = Optional.of( shardId ); + } else { + shardIdOptional = Optional.empty(); + } + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, shardIdOptional ); + + UUID since = inMemoryQueue.getNewest( queueName ); + + String region = actorSystemFig.getRegionLocal(); + + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, + shardIterator, since); + + + int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); + int count = 0; + + while ( multiShardIterator.hasNext() && count < need ) { + DatabaseQueueMessage queueMessage = multiShardIterator.next(); + inMemoryQueue.add( queueName, queueMessage ); + count++; + } + + Shard currentShard = multiShardIterator.getCurrentShard(); + if ( currentShard != null ) { + shardId = currentShard.getShardId(); + startingShards.put( shardKey, shardId ); + } + + logger.trace("Refreshed queue {} region {} shard {} since {} found {}", + queueName, region, shardId, since, count ); + } + + } finally { + timer.close(); + } + + } + + private String createShardKey(String queueName, Shard.Type type, String region ) { + return queueName + "_" + type + region; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index b5b9c30..71cf332 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.ActorRef; +import akka.actor.Cancellable; import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.ConsistentHashingRouter; @@ -27,23 +28,43 @@ import akka.routing.FromConfig; import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.messages.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; /** * Use consistent hashing to route messages to QueueActors */ public class QueueActorRouter extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueActorRouter.class ); private final ActorRef routerRef; private final QueueActorRouterProducer queueActorRouterProducer; + private final QakkaFig qakkaFig; + private final Set<String> queuesSeen = new HashSet<>(); + + private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>(); + private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>(); + private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>(); + @Inject - public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer ) { + public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer, QakkaFig qakkaFig ) { this.queueActorRouterProducer = queueActorRouterProducer; + this.qakkaFig = qakkaFig; this.routerRef = getContext().actorOf( FromConfig.getInstance().props( Props.create( GuiceActorProducer.class, QueueActor.class) @@ -56,6 +77,10 @@ public class QueueActorRouter extends UntypedActor { if ( queueActorRouterProducer.getMessageTypes().contains( message.getClass() ) ) { QakkaMessage qakkaMessage = (QakkaMessage) message; + if ( qakkaMessage.getQueueName() != null ) { + initIfNeeded( qakkaMessage.getQueueName() ); + } + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qakkaMessage.getQueueName() ); routerRef.tell( envelope, getSender() ); @@ -64,4 +89,53 @@ public class QueueActorRouter extends UntypedActor { unhandled(message); } } + + + /** + * Create scheduled refresh, timeout and shard-allocation tasks just in time. + */ + private void initIfNeeded( String queueName ) { + + if (!queuesSeen.contains( queueName )) { + + queuesSeen.add( queueName ); + + if ( qakkaFig.getInMemoryCache() && refreshSchedulersByQueueName.get( queueName ) == null) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS ), + Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS ), + self(), + new QueueRefreshRequest( queueName, false ), + getContext().dispatcher(), + getSelf() ); + refreshSchedulersByQueueName.put( queueName, scheduler ); + logger.debug( "Created refresher for queue {}", queueName ); + } + + if ( timeoutSchedulersByQueueName.get( queueName ) == null) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS ), + Duration.create( qakkaFig.getQueueTimeoutSeconds() / 2, TimeUnit.SECONDS ), + self(), + new QueueTimeoutRequest( queueName ), + getContext().dispatcher(), + getSelf() ); + timeoutSchedulersByQueueName.put( queueName, scheduler ); + logger.debug( "Created timeouter for queue {}", queueName ); + } + + if ( shardAllocationSchedulersByQueueName.get( queueName ) == null) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS ), + Duration.create( qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS ), + self(), + new ShardCheckRequest( queueName ), + getContext().dispatcher(), + getSelf() ); + shardAllocationSchedulersByQueueName.put( queueName, scheduler ); + logger.debug( "Created shard allocater for queue {}", queueName ); + } + } + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index 509ccd9..86f94f1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -20,52 +20,21 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; -import com.codahale.metrics.Timer; import com.google.inject.Inject; -import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; -import org.apache.usergrid.persistence.qakka.MetricsService; -import org.apache.usergrid.persistence.qakka.QakkaFig; -import org.apache.usergrid.persistence.qakka.core.CassandraClient; -import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; -import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; -import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; -import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - public class QueueRefresher extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); - private final ActorSystemFig actorSystemFig; - private final InMemoryQueue inMemoryQueue; - private final QakkaFig qakkaFig; - private final MetricsService metricsService; - private final CassandraClient cassandraClient; + private final QueueActorHelper queueActorHelper; @Inject - public QueueRefresher( - ActorSystemFig actorSystemFig, - InMemoryQueue inMemoryQueue, - QakkaFig qakkaFig, - MetricsService metricsService, - CassandraClient cassandraClient - ) { - this.actorSystemFig = actorSystemFig; - this.inMemoryQueue = inMemoryQueue; - this.qakkaFig = qakkaFig; - this.metricsService = metricsService; - this.cassandraClient = cassandraClient; + public QueueRefresher( QueueActorHelper queueActorHelper ) { + this.queueActorHelper = queueActorHelper; } @@ -83,64 +52,8 @@ public class QueueRefresher extends UntypedActor { } } - Map<String, Long> startingShards = new HashMap<>(); - void queueRefresh( String queueName ) { - - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); - - try { - - if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { - - final Optional shardIdOptional; - final String shardKey = - createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); - Long shardId = startingShards.get( shardKey ); - - if ( shardId != null ) { - shardIdOptional = Optional.of( shardId ); - } else { - shardIdOptional = Optional.empty(); - } - - ShardIterator shardIterator = new ShardIterator( - cassandraClient, queueName, actorSystemFig.getRegionLocal(), - Shard.Type.DEFAULT, shardIdOptional ); - - UUID since = inMemoryQueue.getNewest( queueName ); - - String region = actorSystemFig.getRegionLocal(); - - MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( - cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, - shardIterator, since); - - - int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); - int count = 0; - - while ( multiShardIterator.hasNext() && count < need ) { - DatabaseQueueMessage queueMessage = multiShardIterator.next(); - inMemoryQueue.add( queueName, queueMessage ); - count++; - } - - startingShards.put( shardKey, shardId ); - - logger.debug("Refreshed queue {} region {} shard {} since {} found {}", - queueName, region, shardId, since, count ); - } - - } finally { - timer.close(); - } - + queueActorHelper.queueRefresh( queueName ); } - - private String createShardKey(String queueName, Shard.Type type, String region ) { - return queueName + "_" + type + region; - } - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/522a5515/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 51f6fd3..96e8cab 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -80,22 +80,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void init() { - try { - List<String> queues = queueManager.getListOfQueues(); - for (String queueName : queues) { - initQueue( queueName ); - } - } catch (InvalidQueryException e) { - - if (e.getMessage().contains( "unconfigured columnfamily" )) { - logger.info( "Unable to initialize queues since system is bootstrapping. " + - "Queues will be initialized when created" ); - } else { - throw e; - } - - } - StringBuilder logMessage = new StringBuilder(); logMessage.append( "DistributedQueueServiceImpl initialized with config:\n" ); Method[] methods = qakkaFig.getClass().getMethods(); @@ -114,15 +98,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override - public void initQueue(String queueName) { - logger.info("Initializing queue: {}", queueName); - QueueInitRequest request = new QueueInitRequest( queueName ); - ActorRef clientActor = actorSystemManager.getClientActor(); - clientActor.tell( request, null ); - } - - - @Override public void refresh() { for ( String queueName : queueManager.getListOfQueues() ) { refreshQueue( queueName ); @@ -132,10 +107,13 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void refreshQueue(String queueName) { - logger.info("{} Requesting refresh for queue: {}", this, queueName); - QueueRefreshRequest request = new QueueRefreshRequest( queueName, false ); - ActorRef clientActor = actorSystemManager.getClientActor(); - clientActor.tell( request, null ); + if ( qakkaFig.getInMemoryCache() ) { + logger.trace( "{} Requesting refresh for queue: {}", this, queueName ); + QueueRefreshRequest request = new QueueRefreshRequest( queueName, false ); + ActorRef clientActor = actorSystemManager.getClientActor(); + clientActor.tell( request, null ); + } + } @@ -157,6 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { String queueName, String sourceRegion, String destRegion, UUID messageId, Long deliveryTime, Long expirationTime ) { + logger.trace("Sending message to queue {} region {}", queueName, destRegion); + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time(); try { @@ -186,9 +166,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.debug("SUCCESS after {} retries", retries ); } - // send refresh-queue-if-empty message - QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); - clientActor.tell( qrr, null ); + if ( qakkaFig.getInMemoryCache() ) { + // send refresh-queue-if-empty message + QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); + clientActor.tell( qrr, null ); + } return qarm.getSendStatus(); @@ -280,7 +262,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries ); } } - logger.trace("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size()); return qprm.getQueueMessages(); @@ -335,14 +316,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } - @Override - public Status clearMessages(String queueName) { - - // TODO: implement clear queue - throw new UnsupportedOperationException(); - } - - private Status sendMessageToLocalRouters( QakkaMessage message ) { int maxRetries = 5;
