Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 775257d27 -> f56e1b0d1
Use CQL batch statement for timeout operation as well. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2afaa920 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2afaa920 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2afaa920 Branch: refs/heads/usergrid-1318-queue Commit: 2afaa9201776bbfa8d999a6532a1d5eae3b650bc Parents: 775257d Author: Dave Johnson <[email protected]> Authored: Tue Oct 11 15:23:02 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Oct 11 15:23:02 2016 -0400 ---------------------------------------------------------------------- .../distributed/actors/QueueTimeouter.java | 25 +----- .../qakka/distributed/actors/QueueWriter.java | 2 +- .../QueueMessageSerialization.java | 5 ++ .../impl/QueueMessageSerializationImpl.java | 81 ++++++++++++++++---- 4 files changed, 75 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index b7a95df..58afc76 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -99,30 +99,7 @@ public class QueueTimeouter extends UntypedActor { if ((currentTime - queueMessage.getInflightAt()) > qakkaFig.getQueueTimeoutSeconds() * 1000) { // put message back in messages_available table as new queue message with new UUID - - UUID newQueueMessageId = QakkaUtils.getTimeUuid(); - - DatabaseQueueMessage newMessage = new DatabaseQueueMessage( - queueMessage.getMessageId(), - DatabaseQueueMessage.Type.DEFAULT, - queueMessage.getQueueName(), - queueMessage.getRegion(), - null, - queueMessage.getQueuedAt(), - queueMessage.getInflightAt(), - newQueueMessageId ); - - messageSerialization.writeMessage( newMessage ); - - // remove message from inflight table - - messageSerialization.deleteMessage( - queueName, - actorSystemFig.getRegionLocal(), - null, - DatabaseQueueMessage.Type.INFLIGHT, - queueMessage.getQueueMessageId() ); - + messageSerialization.timeoutInflight( queueMessage ); count++; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index a7dbbd0..c9be47f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -92,7 +92,7 @@ public class QueueWriter extends UntypedActor { qa.getDestRegion(), null, currentTime, - currentTime, + -1L, queueMessageId ); messageSerialization.writeMessage( dbqm ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java index 86c50a5..434c965 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java @@ -56,4 +56,9 @@ public interface QueueMessageSerialization extends Migration { * Write message to inflight table and remove from available table */ void putInflight( DatabaseQueueMessage queueMessage ); + + /** + * Remove message from inflight table, write message to available table. + */ + void timeoutInflight( DatabaseQueueMessage queueMessage ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2afaa920/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index fba2bed..708132c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounter import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.java8.FuturesConvertersImpl; import java.util.Collection; import java.util.Collections; @@ -274,18 +275,14 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public void putInflight( DatabaseQueueMessage message ) { - // create statement to write new queue message to inflight table - - Shard.Type shardType = Shard.Type.INFLIGHT; - Shard shard = shardStrategy.selectShard( - message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() ); + // create statement to write queue message to inflight table DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage( message.getMessageId(), DatabaseQueueMessage.Type.INFLIGHT, message.getQueueName(), message.getRegion(), - shard.getShardId(), + null, message.getQueuedAt(), System.currentTimeMillis(), message.getQueueMessageId() ); @@ -321,6 +318,54 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization } + @Override + public void timeoutInflight( DatabaseQueueMessage message ) { + + // create statement to write queue message back to available table, with new UUID + + UUID newQueueMessageId = QakkaUtils.getTimeUuid(); + + DatabaseQueueMessage newMessage = new DatabaseQueueMessage( + message.getMessageId(), + DatabaseQueueMessage.Type.DEFAULT, + message.getQueueName(), + message.getRegion(), + null, + System.currentTimeMillis(), + -1L, + newQueueMessageId ); + + Statement write = createWriteMessageStatement( newMessage ); + + // create statement to remove message from inflight table + + Statement delete = createDeleteMessageStatement( + message.getQueueName(), + message.getRegion(), + message.getShardId(), + message.getType(), + message.getQueueMessageId()); + + // execute statements as a batch + + BatchStatement batchStatement = new BatchStatement(); + batchStatement.add( write ); + batchStatement.add( delete ); + cassandraClient.getQueueMessageSession().execute( batchStatement ); + + // bump counters + + shardCounterSerialization.incrementCounter( + message.getQueueName(), Shard.Type.DEFAULT, message.getShardId(), 1 ); + + messageCounterSerialization.incrementCounter( + message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L ); + + messageCounterSerialization.decrementCounter( + message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L ); + } + + private Statement createDeleteMessageStatement( final String queueName, final String region, final Long shardIdOrNull, @@ -357,20 +402,30 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization final UUID queueMessageId = message.getQueueMessageId() == null ? QakkaUtils.getTimeUuid() : message.getQueueMessageId(); - long queuedAt = message.getQueuedAt() == null ? - System.currentTimeMillis() : message.getQueuedAt(); + final long shardId; + + if ( message.getShardId() != null ) { + shardId = message.getShardId(); - long inflightAt = message.getInflightAt() == null ? - message.getQueuedAt() : message.getInflightAt(); + } else if ( DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() )) { + Shard shard = shardStrategy.selectShard( + message.getQueueName(), message.getRegion(), Shard.Type.DEFAULT, message.getQueueMessageId() ); + shardId = shard.getShardId(); + + } else { + Shard shard = shardStrategy.selectShard( + message.getQueueName(), message.getRegion(), Shard.Type.INFLIGHT, message.getQueueMessageId() ); + shardId = shard.getShardId(); + } Statement insert = QueryBuilder.insertInto(getTableName(message.getType())) .value( COLUMN_QUEUE_NAME, message.getQueueName()) .value( COLUMN_REGION, message.getRegion()) - .value( COLUMN_SHARD_ID, message.getShardId()) + .value( COLUMN_SHARD_ID, shardId) .value( COLUMN_MESSAGE_ID, message.getMessageId()) .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId) - .value( COLUMN_INFLIGHT_AT, inflightAt ) - .value( COLUMN_QUEUED_AT, queuedAt) + .value( COLUMN_INFLIGHT_AT, message.getInflightAt()) + .value( COLUMN_QUEUED_AT, message.getQueuedAt()) .using( QueryBuilder.ttl( maxTtl ) ); return insert;
