Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 775257d27 -> f56e1b0d1


Use CQL batch statement for timeout operation as well.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2afaa920
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2afaa920
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2afaa920

Branch: refs/heads/usergrid-1318-queue
Commit: 2afaa9201776bbfa8d999a6532a1d5eae3b650bc
Parents: 775257d
Author: Dave Johnson <snoopd...@apache.org>
Authored: Tue Oct 11 15:23:02 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Tue Oct 11 15:23:02 2016 -0400

----------------------------------------------------------------------
 .../distributed/actors/QueueTimeouter.java      | 25 +-----
 .../qakka/distributed/actors/QueueWriter.java   |  2 +-
 .../QueueMessageSerialization.java              |  5 ++
 .../impl/QueueMessageSerializationImpl.java     | 81 ++++++++++++++++----
 4 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b7a95df..58afc76 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -99,30 +99,7 @@ public class QueueTimeouter extends UntypedActor {
                     if ((currentTime - queueMessage.getInflightAt()) > 
qakkaFig.getQueueTimeoutSeconds() * 1000) {
 
                         // put message back in messages_available table as new 
queue message with new UUID
-
-                        UUID newQueueMessageId = QakkaUtils.getTimeUuid();
-
-                        DatabaseQueueMessage newMessage = new 
DatabaseQueueMessage(
-                                queueMessage.getMessageId(),
-                                DatabaseQueueMessage.Type.DEFAULT,
-                                queueMessage.getQueueName(),
-                                queueMessage.getRegion(),
-                                null,
-                                queueMessage.getQueuedAt(),
-                                queueMessage.getInflightAt(),
-                                newQueueMessageId );
-
-                        messageSerialization.writeMessage( newMessage );
-
-                        // remove message from inflight table
-
-                        messageSerialization.deleteMessage(
-                                queueName,
-                                actorSystemFig.getRegionLocal(),
-                                null,
-                                DatabaseQueueMessage.Type.INFLIGHT,
-                                queueMessage.getQueueMessageId() );
-
+                        messageSerialization.timeoutInflight( queueMessage );
                         count++;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index a7dbbd0..c9be47f 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -92,7 +92,7 @@ public class QueueWriter extends UntypedActor {
                             qa.getDestRegion(),
                             null,
                             currentTime,
-                            currentTime,
+                            -1L,
                             queueMessageId );
 
                     messageSerialization.writeMessage( dbqm );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 86c50a5..434c965 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -56,4 +56,9 @@ public interface QueueMessageSerialization extends Migration {
      * Write message to inflight table and remove from available table
      */
     void putInflight( DatabaseQueueMessage queueMessage );
+
+    /**
+     * Remove message from inflight table, write message to available table.
+     */
+    void timeoutInflight( DatabaseQueueMessage queueMessage );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index fba2bed..708132c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -44,6 +44,7 @@ import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounter
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.java8.FuturesConvertersImpl;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -274,18 +275,14 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public void putInflight( DatabaseQueueMessage message ) {
 
-        // create statement to write new queue message to inflight table
-
-        Shard.Type shardType = Shard.Type.INFLIGHT;
-        Shard shard = shardStrategy.selectShard(
-            message.getQueueName(), message.getRegion(), shardType, 
message.getQueueMessageId() );
+        // create statement to write queue message to inflight table
 
         DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
             message.getMessageId(),
             DatabaseQueueMessage.Type.INFLIGHT,
             message.getQueueName(),
             message.getRegion(),
-            shard.getShardId(),
+            null,
             message.getQueuedAt(),
             System.currentTimeMillis(),
             message.getQueueMessageId() );
@@ -321,6 +318,54 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     }
 
 
+    @Override
+    public void timeoutInflight( DatabaseQueueMessage message ) {
+
+        // create statement to write queue message back to available table, 
with new UUID
+
+        UUID newQueueMessageId = QakkaUtils.getTimeUuid();
+
+        DatabaseQueueMessage newMessage = new DatabaseQueueMessage(
+            message.getMessageId(),
+            DatabaseQueueMessage.Type.DEFAULT,
+            message.getQueueName(),
+            message.getRegion(),
+            null,
+            System.currentTimeMillis(),
+            -1L,
+            newQueueMessageId );
+
+        Statement write = createWriteMessageStatement( newMessage );
+
+        // create statement to remove message from inflight table
+
+        Statement delete = createDeleteMessageStatement(
+            message.getQueueName(),
+            message.getRegion(),
+            message.getShardId(),
+            message.getType(),
+            message.getQueueMessageId());
+
+        // execute statements as a batch
+
+        BatchStatement batchStatement = new BatchStatement();
+        batchStatement.add( write );
+        batchStatement.add( delete );
+        cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+        // bump counters
+
+        shardCounterSerialization.incrementCounter(
+            message.getQueueName(), Shard.Type.DEFAULT, message.getShardId(), 
1 );
+
+        messageCounterSerialization.incrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+
+        messageCounterSerialization.decrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+    }
+
+
     private Statement createDeleteMessageStatement( final String queueName,
                                                     final String region,
                                                     final Long shardIdOrNull,
@@ -357,20 +402,30 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
         final UUID queueMessageId =  message.getQueueMessageId() == null ?
             QakkaUtils.getTimeUuid() : message.getQueueMessageId();
 
-        long queuedAt = message.getQueuedAt() == null ?
-            System.currentTimeMillis() : message.getQueuedAt();
+        final long shardId;
+
+        if ( message.getShardId() != null ) {
+            shardId = message.getShardId();
 
-        long inflightAt = message.getInflightAt() == null ?
-            message.getQueuedAt() : message.getInflightAt();
+        } else if ( DatabaseQueueMessage.Type.DEFAULT.equals( 
message.getType() )) {
+            Shard shard = shardStrategy.selectShard(
+                message.getQueueName(), message.getRegion(), 
Shard.Type.DEFAULT, message.getQueueMessageId() );
+            shardId = shard.getShardId();
+
+        } else {
+            Shard shard = shardStrategy.selectShard(
+                message.getQueueName(), message.getRegion(), 
Shard.Type.INFLIGHT, message.getQueueMessageId() );
+            shardId = shard.getShardId();
+        }
 
         Statement insert = 
QueryBuilder.insertInto(getTableName(message.getType()))
             .value( COLUMN_QUEUE_NAME,       message.getQueueName())
             .value( COLUMN_REGION,           message.getRegion())
-            .value( COLUMN_SHARD_ID,         message.getShardId())
+            .value( COLUMN_SHARD_ID,         shardId)
             .value( COLUMN_MESSAGE_ID,       message.getMessageId())
             .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
-            .value( COLUMN_INFLIGHT_AT,      inflightAt )
-            .value( COLUMN_QUEUED_AT,        queuedAt)
+            .value( COLUMN_INFLIGHT_AT,      message.getInflightAt())
+            .value( COLUMN_QUEUED_AT,        message.getQueuedAt())
             .using( QueryBuilder.ttl( maxTtl ) );
 
         return insert;

Reply via email to