Fix logic to compare timestamps, not UUIDs and to zero-out the "since" time 
when queue is empty.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/81233b0b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/81233b0b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/81233b0b

Branch: refs/heads/usergrid-1318-queue
Commit: 81233b0be7f2fda77f8bd4449d0ca93889510429
Parents: 88d429f
Author: Dave Johnson <[email protected]>
Authored: Wed Oct 5 09:58:25 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Wed Oct 5 09:58:25 2016 -0400

----------------------------------------------------------------------
 .../qakka/core/impl/InMemoryQueue.java          | 33 +++++++++-----------
 1 file changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/81233b0b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 1f6fe6e..e70ec75 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -19,34 +19,31 @@
 
 package org.apache.usergrid.persistence.qakka.core.impl;
 
-import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 
 @Singleton
 public class InMemoryQueue {
-    private static final Logger logger = LoggerFactory.getLogger( 
QueueRefresher.class );
+    private static final Logger logger = LoggerFactory.getLogger( 
InMemoryQueue.class );
 
+    /** In-memory queues by name */
     private final Map<String, Queue<DatabaseQueueMessage>> queuesByName;
+
+    /** Newest message seen by in-memory queue */
     private final Map<String, UUID> newestByQueueName;
 
 
     @Inject
     InMemoryQueue(QakkaFig qakkaFig) {
-        queuesByName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
+        queuesByName      = new HashMap<>( qakkaFig.getQueueInMemorySize() );
         newestByQueueName = new HashMap<>( qakkaFig.getQueueInMemorySize() );
     }
 
@@ -60,27 +57,27 @@ public class InMemoryQueue {
     }
 
     public void add( String queueName, DatabaseQueueMessage 
databaseQueueMessage ) {
+
         UUID newest = newestByQueueName.get( queueName );
         if ( newest == null ) {
             newest = databaseQueueMessage.getQueueMessageId();
+
         } else {
-            if ( databaseQueueMessage.getQueueMessageId().compareTo( newest ) 
> 0 ) {
+            if ( databaseQueueMessage.getQueueMessageId().timestamp() > 
newest.timestamp() ) {
                 newest = databaseQueueMessage.getQueueMessageId();
+                logger.debug("New newest for queue {} is {}", queueName, 
newest.timestamp());
             }
         }
+
         newestByQueueName.put( queueName, newest );
         getQueue( queueName ).add( databaseQueueMessage );
     }
 
     public UUID getNewest( String queueName ) {
-        UUID newest = newestByQueueName.get( queueName );
-//        if ( newest == null ) {
-//            // Create oldest UUID from a UNIX timestamp via DataStax utility
-//            // 
https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
-//            newest = UUIDs.startOf( 0L );
-//            newestByQueueName.put( queueName, newest );
-//        }
-        return newest;
+        if ( getQueue( queueName ).isEmpty() ) {
+            newestByQueueName.remove( queueName );
+        }
+        return newestByQueueName.get( queueName );
     }
 
     public DatabaseQueueMessage poll( String queueName ) {

Reply via email to