Changes in message fetching logic.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a867951
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a867951
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a867951

Branch: refs/heads/usergrid-1318-queue
Commit: 2a86795123cf3899bdcd899ebf25608be9416b1b
Parents: f6e6d5d
Author: Dave Johnson <[email protected]>
Authored: Wed Nov 9 10:46:59 2016 -0500
Committer: Dave Johnson <[email protected]>
Committed: Wed Nov 9 10:46:59 2016 -0500

----------------------------------------------------------------------
 .../qakka/core/impl/InMemoryQueue.java          |  4 ++
 .../qakka/distributed/actors/QueueActor.java    | 47 ++++++++----
 .../distributed/actors/QueueActorHelper.java    | 76 ++++++++++++--------
 3 files changed, 85 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 4315df0..09bb8de 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -94,4 +94,8 @@ public class InMemoryQueue {
     public int size( String queueName ) {
         return getQueue( queueName ).size();
     }
+
+    public void clear( String queueName ) {
+        getQueue( queueName ).clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 92e8607..8c2b26e 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -24,8 +24,11 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -38,10 +41,14 @@ import java.util.*;
 public class QueueActor extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueActor.class );
 
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
+    private final QakkaFig         qakkaFig;
+    private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
 
-    //private final Map<String, ActorRef> queueReadersByQueueName    = new 
HashMap<>();
+    private final Map<String, ActorRef> queueReadersByQueueName    = new 
HashMap<>();
     private final Map<String, ActorRef> queueTimeoutersByQueueName = new 
HashMap<>();
     private final Map<String, ActorRef> shardAllocatorsByQueueName = new 
HashMap<>();
 
@@ -49,10 +56,14 @@ public class QueueActor extends UntypedActor {
     @Inject
     public QueueActor(
         QueueActorHelper queueActorHelper,
-        MetricsService   metricsService
+        MetricsService   metricsService,
+        QakkaFig         qakkaFig,
+        InMemoryQueue    inMemoryQueue
     ) {
         this.queueActorHelper = queueActorHelper;
         this.metricsService = metricsService;
+        this.qakkaFig = qakkaFig;
+        this.inMemoryQueue = inMemoryQueue;
     }
 
 
@@ -60,21 +71,26 @@ public class QueueActor extends UntypedActor {
     public void onReceive(Object message) {
 
         if ( message instanceof QueueRefreshRequest ) {
-            QueueRefreshRequest request = (QueueRefreshRequest)message;
+            QueueRefreshRequest request = (QueueRefreshRequest) message;
 
             // NOT asynchronous because we want this to happen locally in this 
JVM
-            queueActorHelper.queueRefresh( request.getQueueName() );
-
-            /* if ( queueReadersByQueueName.get( request.getQueueName() ) == 
null ) {
-                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( 
request.getQueueName()) == null ) {
-                    ActorRef readerRef = getContext().actorOf(
-                        Props.create( GuiceActorProducer.class, 
QueueRefresher.class ),
-                        request.getQueueName() + "_reader");
-                    queueReadersByQueueName.put( request.getQueueName(), 
readerRef );
+
+
+            if ( qakkaFig.getInMemoryCache() && 
qakkaFig.getInMemoryRefreshAsync()) {
+                 if ( queueReadersByQueueName.get( request.getQueueName() ) == 
null ) {
+                    if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( 
request.getQueueName()) == null ) {
+                        ActorRef readerRef = getContext().actorOf(
+                            Props.create( GuiceActorProducer.class, 
QueueRefresher.class ),
+                            request.getQueueName() + "_reader");
+                        queueReadersByQueueName.put( request.getQueueName(), 
readerRef );
+                    }
                 }
+                // hand-off to queue's reader
+                queueReadersByQueueName.get( request.getQueueName() ).tell( 
request, self() );
+
+            } else {
+                queueActorHelper.queueRefresh( request.getQueueName() );
             }
-            // hand-off to queue's reader
-            queueReadersByQueueName.get( request.getQueueName() ).tell( 
request, self() ); */
 
 
         } else if ( message instanceof QueueTimeoutRequest ) {
@@ -116,7 +132,10 @@ public class QueueActor extends UntypedActor {
             try {
 
                 Collection<DatabaseQueueMessage> messages = 
queueActorHelper.getMessages( queueName, numRequested);
-                logger.trace("Returning queue {} messages {}", queueName, 
messages.size() );
+
+                if ( !messages.isEmpty() ) {
+                    logger.trace("{}: Returning queue {} messages {}", name, 
queueName, messages.size() );
+                }
 
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, messages, 
queueName ), getSender() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a867951/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 4696a67..89c79ec 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -22,7 +22,9 @@ package 
org.apache.usergrid.persistence.qakka.distributed.actors;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -45,6 +47,8 @@ import java.util.*;
 public class QueueActorHelper {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueActorHelper.class );
 
+    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
+
     private final ActorSystemFig            actorSystemFig;
     private final QueueMessageSerialization messageSerialization;
     private final AuditLogSerialization     auditLogSerialization;
@@ -53,7 +57,9 @@ public class QueueActorHelper {
     private final MetricsService            metricsService;
     private final CassandraClient           cassandraClient;
 
-    Map<String, Long> startingShards = new HashMap<>();
+    private Map<String, Long> startingShards = new HashMap<>();
+    private Map<String, Long> lastRefreshTimeMillis = new HashMap<>();
+    private Map<String, UUID> newestFetchedUuid = new HashMap<>();
 
 
     @Inject
@@ -106,7 +112,7 @@ public class QueueActorHelper {
     }
 
 
-    Collection<DatabaseQueueMessage> getMessagesFromMemory(String queueName, 
int numRequested ) {
+    private Collection<DatabaseQueueMessage> getMessagesFromMemory(String 
queueName, int numRequested ) {
 
         Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
 
@@ -131,28 +137,30 @@ public class QueueActorHelper {
     }
 
 
-    Collection<DatabaseQueueMessage> getMessagesFromStorage(String queueName, 
int numRequested ) {
+    private Collection<DatabaseQueueMessage> getMessagesFromStorage(String 
queueName, int numRequested ) {
 
         Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
 
-        final Optional shardIdOptional;
-        final String shardKey =
-            createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
+//        final Optional shardIdOptional;
+//        final String shardKey =
+//            createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
+//
+//        Long shardId = startingShards.get( shardKey );
+//        if ( shardId != null ) {
+//            shardIdOptional = Optional.of( shardId );
+//        } else {
+//            shardIdOptional = Optional.empty();
+//        }
 
-        Long shardId = startingShards.get( shardKey );
-        if ( shardId != null ) {
-            shardIdOptional = Optional.of( shardId );
-        } else {
-            shardIdOptional = Optional.empty();
-        }
+        UUID since = newestFetchedUuid.get( queueName );
 
         String region = actorSystemFig.getRegionLocal();
 
         ShardIterator shardIterator = new ShardIterator(
-            cassandraClient, queueName, region, Shard.Type.DEFAULT, 
shardIdOptional );
+            cassandraClient, queueName, region, Shard.Type.DEFAULT, 
Optional.empty() );
 
         MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
-            cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+            cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT, shardIterator, since);
 
         int count = 0;
 
@@ -160,16 +168,22 @@ public class QueueActorHelper {
             DatabaseQueueMessage queueMessage = multiShardIterator.next();
 
             if ( queueMessage != null && putInflight( queueMessage ) ) {
+                long timestamp = queueMessage.getQueueMessageId().timestamp();
+                if ( since != null && timestamp > since.timestamp() ) {
+                    since = queueMessage.getQueueMessageId();
+                }
                 queueMessages.add( queueMessage );
                 count++;
             }
         }
 
-        Shard currentShard = multiShardIterator.getCurrentShard();
-        if ( currentShard != null ) {
-            shardId = currentShard.getShardId();
-            startingShards.put( shardKey, shardId );
-        }
+        newestFetchedUuid.put( queueName, since );
+
+//        Shard currentShard = multiShardIterator.getCurrentShard();
+//        if ( currentShard != null ) {
+//            shardId = currentShard.getShardId();
+//            startingShards.put( shardKey, shardId );
+//        }
 
         //logger.debug("{} returning {} for queue {}", this, 
queueMessages.size(), queueName);
         return queueMessages;
@@ -273,6 +287,13 @@ public class QueueActorHelper {
 
             if (inMemoryQueue.size( queueName ) < 
qakkaFig.getQueueInMemorySize()) {
 
+                // if queue has not been refreshed in 5 x queue refresh time, 
then consider it stale
+                long now = System.currentTimeMillis();
+                Long lastRefreshed = lastRefreshTimeMillis.get( queueName );
+                if ( lastRefreshed != null && now - lastRefreshed > 
qakkaFig.getQueueRefreshMilliseconds() * 5 ) {
+                    inMemoryQueue.clear( queueName );
+                }
+
                 final Optional shardIdOptional;
                 final String shardKey =
                     createShardKey( queueName, Shard.Type.DEFAULT, 
actorSystemFig.getRegionLocal() );
@@ -289,14 +310,12 @@ public class QueueActorHelper {
                     Shard.Type.DEFAULT, shardIdOptional );
 
                 UUID since = inMemoryQueue.getNewest( queueName );
-
                 String region = actorSystemFig.getRegionLocal();
 
                 MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
                     cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT,
                     shardIterator, since);
 
-
                 int need = qakkaFig.getQueueInMemorySize() - 
inMemoryQueue.size( queueName );
                 int count = 0;
 
@@ -306,14 +325,15 @@ public class QueueActorHelper {
                     count++;
                 }
 
-                Shard currentShard = multiShardIterator.getCurrentShard();
-                if ( currentShard != null ) {
-                    shardId = currentShard.getShardId();
-                    startingShards.put( shardKey, shardId );
-                }
+                startingShards.put( shardKey, shardId );
+
+                lastRefreshTimeMillis.put( queueName, 
System.currentTimeMillis() );
 
-                logger.trace("Refreshed queue {} region {} shard {} since {} 
found {}",
-                    queueName, region, shardId, since, count );
+                if ( count > 0 ) {
+                    Object shard = shardIdOptional.isPresent() ? 
shardIdOptional.get() : "null";
+                    logger.trace( "Refreshed queue {} region {} shard {} since 
{} found {} inmemory {}",
+                        queueName, region, shard, since, count, 
inMemoryQueue.size( queueName ) );
+                }
             }
 
         } finally {

Reply via email to