Changes in message fetching logic.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a867951 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a867951 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a867951 Branch: refs/heads/usergrid-1318-queue Commit: 2a86795123cf3899bdcd899ebf25608be9416b1b Parents: f6e6d5d Author: Dave Johnson <[email protected]> Authored: Wed Nov 9 10:46:59 2016 -0500 Committer: Dave Johnson <[email protected]> Committed: Wed Nov 9 10:46:59 2016 -0500 ---------------------------------------------------------------------- .../qakka/core/impl/InMemoryQueue.java | 4 ++ .../qakka/distributed/actors/QueueActor.java | 47 ++++++++---- .../distributed/actors/QueueActorHelper.java | 76 ++++++++++++-------- 3 files changed, 85 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java index 4315df0..09bb8de 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -94,4 +94,8 @@ public class InMemoryQueue { public int size( String queueName ) { return getQueue( queueName ).size(); } + + public void clear( String queueName ) { + getQueue( queueName ).clear(); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 92e8607..8c2b26e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -24,8 +24,11 @@ import akka.actor.Props; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; @@ -38,10 +41,14 @@ import java.util.*; public class QueueActor extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueActor.class ); + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + + private final QakkaFig qakkaFig; + private final InMemoryQueue inMemoryQueue; private final QueueActorHelper queueActorHelper; private final MetricsService metricsService; - //private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>(); + private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>(); private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>(); private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>(); @@ -49,10 +56,14 @@ public class QueueActor extends UntypedActor { @Inject public QueueActor( QueueActorHelper queueActorHelper, - MetricsService metricsService + MetricsService metricsService, + QakkaFig qakkaFig, + InMemoryQueue inMemoryQueue ) { this.queueActorHelper = queueActorHelper; this.metricsService = metricsService; + this.qakkaFig = qakkaFig; + this.inMemoryQueue = inMemoryQueue; } @@ -60,21 +71,26 @@ public class QueueActor extends UntypedActor { public void onReceive(Object message) { if ( message instanceof QueueRefreshRequest ) { - QueueRefreshRequest request = (QueueRefreshRequest)message; + QueueRefreshRequest request = (QueueRefreshRequest) message; // NOT asynchronous because we want this to happen locally in this JVM - queueActorHelper.queueRefresh( request.getQueueName() ); - - /* if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { - if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { - ActorRef readerRef = getContext().actorOf( - Props.create( GuiceActorProducer.class, QueueRefresher.class ), - request.getQueueName() + "_reader"); - queueReadersByQueueName.put( request.getQueueName(), readerRef ); + + + if ( qakkaFig.getInMemoryCache() && qakkaFig.getInMemoryRefreshAsync()) { + if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { + if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { + ActorRef readerRef = getContext().actorOf( + Props.create( GuiceActorProducer.class, QueueRefresher.class ), + request.getQueueName() + "_reader"); + queueReadersByQueueName.put( request.getQueueName(), readerRef ); + } } + // hand-off to queue's reader + queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); + + } else { + queueActorHelper.queueRefresh( request.getQueueName() ); } - // hand-off to queue's reader - queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); */ } else if ( message instanceof QueueTimeoutRequest ) { @@ -116,7 +132,10 @@ public class QueueActor extends UntypedActor { try { Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested); - logger.trace("Returning queue {} messages {}", queueName, messages.size() ); + + if ( !messages.isEmpty() ) { + logger.trace("{}: Returning queue {} messages {}", name, queueName, messages.size() ); + } getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index 4696a67..89c79ec 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -22,7 +22,9 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.CassandraClient; @@ -45,6 +47,8 @@ import java.util.*; public class QueueActorHelper { private static final Logger logger = LoggerFactory.getLogger( QueueActorHelper.class ); + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + private final ActorSystemFig actorSystemFig; private final QueueMessageSerialization messageSerialization; private final AuditLogSerialization auditLogSerialization; @@ -53,7 +57,9 @@ public class QueueActorHelper { private final MetricsService metricsService; private final CassandraClient cassandraClient; - Map<String, Long> startingShards = new HashMap<>(); + private Map<String, Long> startingShards = new HashMap<>(); + private Map<String, Long> lastRefreshTimeMillis = new HashMap<>(); + private Map<String, UUID> newestFetchedUuid = new HashMap<>(); @Inject @@ -106,7 +112,7 @@ public class QueueActorHelper { } - Collection<DatabaseQueueMessage> getMessagesFromMemory(String queueName, int numRequested ) { + private Collection<DatabaseQueueMessage> getMessagesFromMemory(String queueName, int numRequested ) { Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); @@ -131,28 +137,30 @@ public class QueueActorHelper { } - Collection<DatabaseQueueMessage> getMessagesFromStorage(String queueName, int numRequested ) { + private Collection<DatabaseQueueMessage> getMessagesFromStorage(String queueName, int numRequested ) { Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); - final Optional shardIdOptional; - final String shardKey = - createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); +// final Optional shardIdOptional; +// final String shardKey = +// createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); +// +// Long shardId = startingShards.get( shardKey ); +// if ( shardId != null ) { +// shardIdOptional = Optional.of( shardId ); +// } else { +// shardIdOptional = Optional.empty(); +// } - Long shardId = startingShards.get( shardKey ); - if ( shardId != null ) { - shardIdOptional = Optional.of( shardId ); - } else { - shardIdOptional = Optional.empty(); - } + UUID since = newestFetchedUuid.get( queueName ); String region = actorSystemFig.getRegionLocal(); ShardIterator shardIterator = new ShardIterator( - cassandraClient, queueName, region, Shard.Type.DEFAULT, shardIdOptional ); + cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty() ); MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( - cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, shardIterator, since); int count = 0; @@ -160,16 +168,22 @@ public class QueueActorHelper { DatabaseQueueMessage queueMessage = multiShardIterator.next(); if ( queueMessage != null && putInflight( queueMessage ) ) { + long timestamp = queueMessage.getQueueMessageId().timestamp(); + if ( since != null && timestamp > since.timestamp() ) { + since = queueMessage.getQueueMessageId(); + } queueMessages.add( queueMessage ); count++; } } - Shard currentShard = multiShardIterator.getCurrentShard(); - if ( currentShard != null ) { - shardId = currentShard.getShardId(); - startingShards.put( shardKey, shardId ); - } + newestFetchedUuid.put( queueName, since ); + +// Shard currentShard = multiShardIterator.getCurrentShard(); +// if ( currentShard != null ) { +// shardId = currentShard.getShardId(); +// startingShards.put( shardKey, shardId ); +// } //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName); return queueMessages; @@ -273,6 +287,13 @@ public class QueueActorHelper { if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + // if queue has not been refreshed in 5 x queue refresh time, then consider it stale + long now = System.currentTimeMillis(); + Long lastRefreshed = lastRefreshTimeMillis.get( queueName ); + if ( lastRefreshed != null && now - lastRefreshed > qakkaFig.getQueueRefreshMilliseconds() * 5 ) { + inMemoryQueue.clear( queueName ); + } + final Optional shardIdOptional; final String shardKey = createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); @@ -289,14 +310,12 @@ public class QueueActorHelper { Shard.Type.DEFAULT, shardIdOptional ); UUID since = inMemoryQueue.getNewest( queueName ); - String region = actorSystemFig.getRegionLocal(); MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, shardIterator, since); - int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); int count = 0; @@ -306,14 +325,15 @@ public class QueueActorHelper { count++; } - Shard currentShard = multiShardIterator.getCurrentShard(); - if ( currentShard != null ) { - shardId = currentShard.getShardId(); - startingShards.put( shardKey, shardId ); - } + startingShards.put( shardKey, shardId ); + + lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() ); - logger.trace("Refreshed queue {} region {} shard {} since {} found {}", - queueName, region, shardId, since, count ); + if ( count > 0 ) { + Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null"; + logger.trace( "Refreshed queue {} region {} shard {} since {} found {} inmemory {}", + queueName, region, shard, since, count, inMemoryQueue.size( queueName ) ); + } } } finally {
