Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7be8c274 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7be8c274 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7be8c274 Branch: refs/heads/master Commit: 7be8c274e83e1680221e06c327ad04e697c61ca6 Parents: 71fe06f Author: Dave Johnson <[email protected]> Authored: Mon Oct 10 09:37:32 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Oct 10 09:37:32 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 2 +- .../persistence/qakka/api/QueueResource.java | 83 ++++------ .../qakka/distributed/actors/QueueActor.java | 40 +---- .../distributed/actors/QueueActorHelper.java | 150 +++++------------ .../distributed/actors/QueueActorRouter.java | 50 +----- .../distributed/actors/QueueRefresher.java | 103 +++++++++++- .../qakka/distributed/actors/QueueSender.java | 16 +- .../distributed/actors/QueueTimeouter.java | 13 -- .../qakka/distributed/actors/QueueWriter.java | 49 ++++-- .../distributed/actors/QueueWriterRouter.java | 7 +- .../impl/DistributedQueueServiceImpl.java | 161 +++++++++++-------- .../impl/QueueActorRouterProducer.java | 3 +- .../impl/QueueWriterRouterProducer.java | 11 +- .../distributed/messages/QakkaMessage.java | 6 +- .../distributed/messages/QueueGetResponse.java | 14 +- .../distributed/messages/QueueSendResponse.java | 9 +- .../messages/QueueWriteResponse.java | 8 +- .../MultiShardMessageIterator.java | 6 + .../QueueMessageSerialization.java | 5 + .../impl/QueueMessageSerializationImpl.java | 151 ++++++++++++----- .../distributed/QueueActorServiceTest.java | 3 - .../actors/QueueActorHelperTest.java | 2 +- .../distributed/actors/QueueReaderTest.java | 10 +- .../DatabaseQueueMessageSerializationTest.java | 22 ++- .../queue/src/test/resources/log4j.properties | 4 +- 25 files changed, 507 insertions(+), 421 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 7d89187..3093c39 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable { /** How long to wait for response from queue actor before timing out and trying again */ @Key(QUEUE_GET_TIMEOUT) - @Default("4") + @Default("1") int getGetTimeoutSeconds(); /** Max number of times to retry call to queue writer for queue send operation */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java index 10dae04..b609de3 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java @@ -19,13 +19,10 @@ package org.apache.usergrid.persistence.qakka.api; -import com.codahale.metrics.Timer; import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; -import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.core.*; -import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +43,6 @@ public class QueueResource { private final QueueManager queueManager; private final QueueMessageManager queueMessageManager; - private final MetricsService metricsService; private final URIStrategy uriStrategy; private final Regions regions; @@ -55,13 +51,11 @@ public class QueueResource { public QueueResource( QueueManager queueManager, QueueMessageManager queueMessageManager, - MetricsService metricsService, URIStrategy uriStrategy, Regions regions ) { this.queueManager = queueManager; this.queueMessageManager = queueMessageManager; - this.metricsService = metricsService; this.uriStrategy = uriStrategy; this.regions = regions; } @@ -262,8 +256,6 @@ public class QueueResource { String contentType, ByteBuffer byteBuffer) { - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time(); - try { Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); @@ -285,9 +277,6 @@ public class QueueResource { apiResponse.setCount( 1 ); return Response.ok().entity( apiResponse ).build(); - } finally { - timer.close(); - } } @@ -297,38 +286,31 @@ public class QueueResource { public Response getNextMessages( @PathParam("queueName") String queueName, @QueryParam("count") @DefaultValue("1") String countParam) throws Exception { - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time(); - try { - - Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); + Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); - int count = 1; - try { - count = Integer.parseInt( countParam ); - } catch (Exception e) { - throw new IllegalArgumentException( "Invalid count parameter" ); - } - if (count <= 0) { - // invalid count - throw new IllegalArgumentException( "Count must be >= 1" ); - } - - List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count ); + int count = 1; + try { + count = Integer.parseInt( countParam ); + } catch (Exception e) { + throw new IllegalArgumentException( "Invalid count parameter" ); + } + if (count <= 0) { + // invalid count + throw new IllegalArgumentException( "Count must be >= 1" ); + } - ApiResponse apiResponse = new ApiResponse(); + List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count ); - if (messages != null && !messages.isEmpty()) { - apiResponse.setQueueMessages( messages ); + ApiResponse apiResponse = new ApiResponse(); - } else { // always return queueMessages field - apiResponse.setQueueMessages( Collections.EMPTY_LIST ); - } - apiResponse.setCount( apiResponse.getQueueMessages().size() ); - return Response.ok().entity( apiResponse ).build(); + if (messages != null && !messages.isEmpty()) { + apiResponse.setQueueMessages( messages ); - } finally { - timer.close(); + } else { // always return queueMessages field + apiResponse.setQueueMessages( Collections.EMPTY_LIST ); } + apiResponse.setCount( apiResponse.getQueueMessages().size() ); + return Response.ok().entity( apiResponse ).build(); } @@ -338,25 +320,18 @@ public class QueueResource { public Response ackMessage( @PathParam("queueName") String queueName, @PathParam("queueMessageId") String queueMessageId) throws Exception { - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time(); - try { - - Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); + Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); - UUID messageUuid; - try { - messageUuid = UUID.fromString( queueMessageId ); - } catch (Exception e) { - throw new IllegalArgumentException( "Invalid queue message UUID" ); - } - queueMessageManager.ackMessage( queueName, messageUuid ); - - ApiResponse apiResponse = new ApiResponse(); - return Response.ok().entity( apiResponse ).build(); - - } finally { - timer.close(); + UUID messageUuid; + try { + messageUuid = UUID.fromString( queueMessageId ); + } catch (Exception e) { + throw new IllegalArgumentException( "Invalid queue message UUID" ); } + queueMessageManager.ackMessage( queueName, messageUuid ); + + ApiResponse apiResponse = new ApiResponse(); + return Response.ok().entity( apiResponse ).build(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 9ce38ef..248f9cd 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -64,32 +64,23 @@ public class QueueActor extends UntypedActor { private final Set<String> queuesSeen = new HashSet<>(); - //private final Injector injector; - @Inject public QueueActor( - //Injector injector, QakkaFig qakkaFig, InMemoryQueue inMemoryQueue, QueueActorHelper queueActorHelper, MetricsService metricsService, MessageCounterSerialization messageCounterSerialization ) { - //this.injector = injector; this.qakkaFig = qakkaFig; this.inMemoryQueue = inMemoryQueue; this.queueActorHelper = queueActorHelper; this.metricsService = metricsService; this.messageCounterSerialization = messageCounterSerialization; - -// qakkaFig = injector.getInstance( QakkaFig.class ); -// inMemoryQueue = injector.getInstance( InMemoryQueue.class ); -// queueActorHelper = injector.getInstance( QueueActorHelper.class ); -// metricsService = injector.getInstance( MetricsService.class ); -// messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } + @Override public void onReceive(Object message) { @@ -134,6 +125,7 @@ public class QueueActor extends UntypedActor { logger.debug("Created shard allocater for queue {}", request.getQueueName() ); } + } else if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest)message; queuesSeen.add( request.getQueueName() ); @@ -154,6 +146,7 @@ public class QueueActor extends UntypedActor { // hand-off to queue's reader queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); + } else if ( message instanceof QueueTimeoutRequest ) { QueueTimeoutRequest request = (QueueTimeoutRequest)message; @@ -169,6 +162,7 @@ public class QueueActor extends UntypedActor { // ASYNCHRONOUS -> hand-off to queue's timeouter queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() ); + } else if ( message instanceof ShardCheckRequest ) { ShardCheckRequest request = (ShardCheckRequest)message; @@ -184,6 +178,7 @@ public class QueueActor extends UntypedActor { // ASYNCHRONOUS -> hand-off to queue's shard allocator shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() ); + } else if ( message instanceof QueueGetRequest) { QueueGetRequest queueGetRequest = (QueueGetRequest) message; @@ -199,30 +194,7 @@ public class QueueActor extends UntypedActor { Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested); getSender().tell( new QueueGetResponse( - DistributedQueueService.Status.SUCCESS, messages ), getSender() ); - - } finally { - timer.close(); - } - - - } else if ( message instanceof QueueAckRequest) { - - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time(); - try { - - QueueAckRequest queueAckRequest = (QueueAckRequest) message; - - queuesSeen.add( queueAckRequest.getQueueName() ); - - DistributedQueueService.Status status = queueActorHelper.ackQueueMessage( - queueAckRequest.getQueueName(), - queueAckRequest.getQueueMessageId() ); - - getSender().tell( new QueueAckResponse( - queueAckRequest.getQueueName(), - queueAckRequest.getQueueMessageId(), - status ), getSender() ); + DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() ); } finally { timer.close(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index e3996c5..fcb3fba 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -41,7 +41,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; public class QueueActorHelper { @@ -55,9 +54,6 @@ public class QueueActorHelper { private final MetricsService metricsService; private final CassandraClient cassandraClient; - private final AtomicLong runCount = new AtomicLong(0); - private final AtomicLong totalRead = new AtomicLong(0); - @Inject public QueueActorHelper( @@ -109,7 +105,7 @@ public class QueueActorHelper { if (queueMessage != null) { - if (putInflight( queueName, queueMessage )) { + if (putInflight( queueMessage )) { queueMessages.add( queueMessage ); } @@ -125,10 +121,48 @@ public class QueueActorHelper { } + boolean putInflight( DatabaseQueueMessage queueMessage ) { + + UUID qmid = queueMessage.getQueueMessageId(); + try { + + messageSerialization.putInflight( queueMessage ); + + } catch ( Throwable t ) { + logger.error("Error putting inflight queue message " + + qmid + " queue name: " + queueMessage.getQueueName(), t); + + auditLogSerialization.recordAuditLog( + AuditLog.Action.GET, + AuditLog.Status.ERROR, + queueMessage.getQueueName(), + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + qmid); + + return false; + } + + auditLogSerialization.recordAuditLog( + AuditLog.Action.GET, + AuditLog.Status.SUCCESS, + queueMessage.getQueueName(), + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + qmid); + + return true; + } + + DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) { - DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage( - queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT ); + DatabaseQueueMessage queueMessage = messageSerialization.loadMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.INFLIGHT, + queueMessageId ); if ( queueMessage == null ) { logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId); @@ -174,106 +208,4 @@ public class QueueActorHelper { return DistributedQueueService.Status.ERROR; } } - - - boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) { - - UUID qmid = queueMessage.getQueueMessageId(); - try { - - DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage( - queueMessage.getMessageId(), - DatabaseQueueMessage.Type.INFLIGHT, - queueName, - actorSystemFig.getRegionLocal(), - null, // let serialization select the shard - queueMessage.getQueuedAt(), - System.currentTimeMillis(), - qmid); - - messageSerialization.writeMessage( inflightMessage ); - - DatabaseQueueMessage retrieved = loadDatabaseQueueMessage( - queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT ); - if ( retrieved == null ) { - logger.error("Failed ot write queue message id {} to inflight table", qmid); - return false; - } - - messageSerialization.deleteMessage( - queueName, - actorSystemFig.getRegionLocal(), - null, - DatabaseQueueMessage.Type.DEFAULT, - qmid); - - //logger.debug("Put message {} inflight for queue name {}", qmid, queueName); - - } catch ( Throwable t ) { - logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t); - - auditLogSerialization.recordAuditLog( - AuditLog.Action.GET, - AuditLog.Status.ERROR, - queueName, - actorSystemFig.getRegionLocal(), - queueMessage.getMessageId(), - qmid); - - return false; - } - - auditLogSerialization.recordAuditLog( - AuditLog.Action.GET, - AuditLog.Status.SUCCESS, - queueName, - actorSystemFig.getRegionLocal(), - queueMessage.getMessageId(), - qmid); - - return true; - } - - - void queueRefresh( String queueName ) { - - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); - - try { - - if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { - - // TODO: need to track the starting shard - - ShardIterator shardIterator = new ShardIterator( - cassandraClient, queueName, actorSystemFig.getRegionLocal(), - Shard.Type.DEFAULT, Optional.empty() ); - - UUID since = inMemoryQueue.getNewest( queueName ); - - String region = actorSystemFig.getRegionLocal(); - MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( - cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, - shardIterator, since); - - int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); - int count = 0; - - while ( multiShardIterator.hasNext() && count < need ) { - DatabaseQueueMessage queueMessage = multiShardIterator.next(); - inMemoryQueue.add( queueName, queueMessage ); - count++; - } - - if ( count > 0 ) { - logger.debug( "Added {} in-memory for queue {}, new size = {}", - count, queueName, inMemoryQueue.size( queueName ) ); - } - } - - } finally { - timer.close(); - } - - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index c40a3d9..f908e7f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -27,6 +27,7 @@ import akka.routing.FromConfig; import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; import org.apache.usergrid.persistence.qakka.distributed.messages.*; @@ -36,10 +37,13 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*; public class QueueActorRouter extends UntypedActor { private final ActorRef routerRef; + private final QueueActorRouterProducer queueActorRouterProducer; @Inject - public QueueActorRouter( Injector injector ) { + public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer ) { + + this.queueActorRouterProducer = queueActorRouterProducer; this.routerRef = getContext().actorOf( FromConfig.getInstance().props( Props.create(GuiceActorProducer.class, QueueActor.class)), "router"); @@ -48,51 +52,13 @@ public class QueueActorRouter extends UntypedActor { @Override public void onReceive(Object message) { - // TODO: can we do something smarter than this if-then-else structure - // e.g. if message is recognized as one of ours, then we just pass it on? - - if ( message instanceof QueueGetRequest) { - QueueGetRequest qgr = (QueueGetRequest) message; + if ( queueActorRouterProducer.getMessageTypes().contains( message.getClass() ) ) { + QakkaMessage qakkaMessage = (QakkaMessage) message; ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() ); + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qakkaMessage.getQueueName() ); routerRef.tell( envelope, getSender() ); - } else if ( message instanceof QueueAckRequest) { - QueueAckRequest qar = (QueueAckRequest)message; - - ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); - routerRef.tell( envelope, getSender()); - - } else if ( message instanceof QueueInitRequest) { - QueueInitRequest qar = (QueueInitRequest)message; - - ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); - routerRef.tell( envelope, getSender()); - - } else if ( message instanceof QueueRefreshRequest) { - QueueRefreshRequest qar = (QueueRefreshRequest)message; - - ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); - routerRef.tell( envelope, getSender()); - - } else if ( message instanceof QueueTimeoutRequest) { - QueueTimeoutRequest qar = (QueueTimeoutRequest)message; - - ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); - routerRef.tell( envelope, getSender()); - - } else if ( message instanceof ShardCheckRequest) { - ShardCheckRequest qar = (ShardCheckRequest)message; - - ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); - routerRef.tell( envelope, getSender()); - } else { unhandled(message); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index ae9969c..afd5640 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -20,20 +20,52 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; import com.google.inject.Inject; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; +import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + public class QueueRefresher extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); - final QueueActorHelper helper; + private final ActorSystemFig actorSystemFig; + private final InMemoryQueue inMemoryQueue; + private final QakkaFig qakkaFig; + private final MetricsService metricsService; + private final CassandraClient cassandraClient; + @Inject - public QueueRefresher( QueueActorHelper helper ) { - this.helper = helper; + public QueueRefresher( + ActorSystemFig actorSystemFig, + InMemoryQueue inMemoryQueue, + QakkaFig qakkaFig, + MetricsService metricsService, + CassandraClient cassandraClient + ) { + this.actorSystemFig = actorSystemFig; + this.inMemoryQueue = inMemoryQueue; + this.qakkaFig = qakkaFig; + this.metricsService = metricsService; + this.cassandraClient = cassandraClient; } @@ -44,10 +76,73 @@ public class QueueRefresher extends UntypedActor { QueueRefreshRequest request = (QueueRefreshRequest) message; String queueName = request.getQueueName(); - helper.queueRefresh( queueName ); + queueRefresh( queueName ); } else { unhandled( message ); } } + + Map<String, Long> startingShards = new HashMap<>(); + + + void queueRefresh( String queueName ) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); + + try { + + if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + + final Optional shardIdOptional; + final String shardKey = + createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() ); + Long shardId = startingShards.get( shardKey ); + + if ( shardId != null ) { + shardIdOptional = Optional.of( shardId ); + } else { + shardIdOptional = Optional.empty(); + } + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, Optional.empty() ); + + UUID since = inMemoryQueue.getNewest( queueName ); + + String region = actorSystemFig.getRegionLocal(); + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, + shardIterator, since); + + int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); + int count = 0; + + while ( multiShardIterator.hasNext() && count < need ) { + DatabaseQueueMessage queueMessage = multiShardIterator.next(); + inMemoryQueue.add( queueName, queueMessage ); + count++; + } + + if ( multiShardIterator.getCurrentShard() != null ) { + startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() ); + } + + if ( count > 0 ) { + logger.debug( "Added {} in-memory for queue {}, new size = {}", + count, queueName, inMemoryQueue.size( queueName ) ); + } + } + + } finally { + timer.close(); + } + + } + + private String createShardKey(String queueName, Shard.Type type, String region ) { + return queueName + "_" + type + region; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index 3dc695e..ccc39f5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -26,18 +26,13 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.codahale.metrics.Timer; import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; -import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; -import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest; -import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse; -import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; -import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse; +import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.exceptions.QakkaException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; @@ -80,13 +75,6 @@ public class QueueSender extends UntypedActor { this.actorSystemFig = actorSystemFig; this.qakkaFig = qakkaFig; this.metricsService = metricsService; - -// actorSystemManager = injector.getInstance( ActorSystemManager.class ); -// transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); -// auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); -// actorSystemFig = injector.getInstance( ActorSystemFig.class ); -// qakkaFig = injector.getInstance( QakkaFig.class ); -// metricsService = injector.getInstance( MetricsService.class ); } @Override @@ -97,7 +85,7 @@ public class QueueSender extends UntypedActor { // as far as caller is concerned, we are done. getSender().tell( new QueueSendResponse( - DistributedQueueService.Status.SUCCESS ), getSender() ); + DistributedQueueService.Status.SUCCESS, qa.getQueueName() ), getSender() ); final QueueWriter.WriteStatus writeStatus = sendMessageToRegion( qa.getQueueName(), http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index 33f1dd9..b7a95df 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -22,29 +22,22 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; -import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.CassandraClient; -import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest; -import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.DecimalFormat; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; public class QueueTimeouter extends UntypedActor { @@ -70,12 +63,6 @@ public class QueueTimeouter extends UntypedActor { this.actorSystemFig = actorSystemFig; this.qakkaFig = qakkaFig; this.cassandraClient = cassandraClient; - -// messageSerialization = injector.getInstance( QueueMessageSerialization.class ); -// actorSystemFig = injector.getInstance( ActorSystemFig.class ); -// qakkaFig = injector.getInstance( QakkaFig.class ); -// metricsService = injector.getInstance( MetricsService.class ); -// cassandraClient = injector.getInstance( CassandraClientImpl.class ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index e014d59..a7dbbd0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -22,17 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.UntypedActor; import com.codahale.metrics.Timer; import com.google.inject.Inject; -import com.google.inject.Injector; -import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckResponse; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.slf4j.Logger; @@ -50,24 +49,21 @@ public class QueueWriter extends UntypedActor { private final TransferLogSerialization transferLogSerialization; private final AuditLogSerialization auditLogSerialization; private final MetricsService metricsService; - + private final QueueActorHelper queueActorHelper; @Inject public QueueWriter( QueueMessageSerialization messageSerialization, TransferLogSerialization transferLogSerialization, AuditLogSerialization auditLogSerialization, - MetricsService metricsService + MetricsService metricsService, + QueueActorHelper queueActorHelper ) { - this.messageSerialization = messageSerialization; + this.messageSerialization = messageSerialization; this.transferLogSerialization = transferLogSerialization; - this.auditLogSerialization = auditLogSerialization; - this.metricsService = metricsService; - -// messageSerialization = injector.getInstance( QueueMessageSerialization.class ); -// transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); -// auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); -// metricsService = injector.getInstance( MetricsService.class ); + this.auditLogSerialization = auditLogSerialization; + this.metricsService = metricsService; + this.queueActorHelper = queueActorHelper; } @Override @@ -86,6 +82,7 @@ public class QueueWriter extends UntypedActor { DatabaseQueueMessage dbqm = null; long currentTime = System.currentTimeMillis(); + String queueName = qa.getQueueName(); try { dbqm = new DatabaseQueueMessage( @@ -115,7 +112,7 @@ public class QueueWriter extends UntypedActor { dbqm.getMessageId() ); getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.ERROR ), getSender() ); + QueueWriter.WriteStatus.ERROR, queueName ), getSender() ); return; } @@ -136,7 +133,7 @@ public class QueueWriter extends UntypedActor { qa.getMessageId() ); getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() ); + QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED, queueName ), getSender() ); } catch (Throwable e) { logger.debug( "Unable to delete transfer log for {} {} {} {}", @@ -147,13 +144,33 @@ public class QueueWriter extends UntypedActor { logger.debug("Error deleting transferlog", e); getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() ); + QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED, queueName ), getSender() ); } } finally { timer.close(); } + } else if ( message instanceof QueueAckRequest ){ + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time(); + try { + + QueueAckRequest queueAckRequest = (QueueAckRequest) message; + + DistributedQueueService.Status status = queueActorHelper.ackQueueMessage( + queueAckRequest.getQueueName(), + queueAckRequest.getQueueMessageId() ); + + getSender().tell( new QueueAckResponse( + queueAckRequest.getQueueName(), + queueAckRequest.getQueueMessageId(), + status ), getSender() ); + + } finally { + timer.close(); + } + } else { unhandled( message ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java index 0e3e981..cb06c1d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java @@ -24,8 +24,8 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.FromConfig; import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; @@ -37,7 +37,7 @@ public class QueueWriterRouter extends UntypedActor { private final ActorRef router; @Inject - public QueueWriterRouter( Injector injector ) { + public QueueWriterRouter() { this.router = getContext().actorOf( FromConfig.getInstance().props( Props.create( GuiceActorProducer.class, QueueWriter.class )), "router"); @@ -46,7 +46,8 @@ public class QueueWriterRouter extends UntypedActor { @Override public void onReceive(Object message) { - if ( message instanceof QueueWriteRequest) { + if ( message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) { + router.tell( message, getSender() ); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 9063242..20bf608 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -22,14 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.impl; import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; +import com.codahale.metrics.*; +import com.codahale.metrics.Timer; import com.datastax.driver.core.exceptions.InvalidQueryException; -import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.ClientActor; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; +import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QueueManager; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; @@ -37,7 +39,6 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -56,19 +57,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { private final ActorSystemManager actorSystemManager; private final QueueManager queueManager; private final QakkaFig qakkaFig; - + private final MetricsService metricsService; @Inject public DistributedQueueServiceImpl( Injector injector, ActorSystemManager actorSystemManager, QueueManager queueManager, - QakkaFig qakkaFig + QakkaFig qakkaFig, + MetricsService metricsService ) { this.actorSystemManager = actorSystemManager; this.queueManager = queueManager; this.qakkaFig = qakkaFig; + this.metricsService = metricsService; GuiceActorProducer.INJECTOR = injector; } @@ -154,59 +157,66 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { String queueName, String sourceRegion, String destRegion, UUID messageId, Long deliveryTime, Long expirationTime ) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time(); + try { - int maxRetries = qakkaFig.getMaxSendRetries(); - int retries = 0; + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } - QueueSendRequest request = new QueueSendRequest( - queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime ); + int maxRetries = qakkaFig.getMaxSendRetries(); + int retries = 0; - while ( retries++ < maxRetries ) { - try { - Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS ); + QueueSendRequest request = new QueueSendRequest( + queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime ); - // send to current region via local clientActor - ActorRef clientActor = actorSystemManager.getClientActor(); - Future<Object> fut = Patterns.ask( clientActor, request, t ); + while ( retries++ < maxRetries ) { + try { + Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS ); - // wait for response... - final Object response = Await.result( fut, t.duration() ); + // send to current region via local clientActor + ActorRef clientActor = actorSystemManager.getClientActor(); + Future<Object> fut = Patterns.ask( clientActor, request, t ); - if ( response != null && response instanceof QueueSendResponse) { - QueueSendResponse qarm = (QueueSendResponse)response; + // wait for response... + final Object response = Await.result( fut, t.duration() ); - if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) { + if ( response != null && response instanceof QueueSendResponse) { + QueueSendResponse qarm = (QueueSendResponse)response; - if ( retries > 1 ) { - logger.debug("SUCCESS after {} retries", retries ); - } + if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) { - // send refresh-queue-if-empty message - QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); - clientActor.tell( qrr, null ); + if ( retries > 1 ) { + logger.debug("SUCCESS after {} retries", retries ); + } + + // send refresh-queue-if-empty message + QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); + clientActor.tell( qrr, null ); + + return qarm.getSendStatus(); + + } else { + logger.debug("ERROR STATUS sending to queue, retrying {}", retries ); + } - return qarm.getSendStatus(); + } else if ( response != null ) { + logger.debug("NULL RESPONSE sending to queue, retrying {}", retries ); } else { - logger.debug("ERROR STATUS sending to queue, retrying {}", retries ); + logger.debug("TIMEOUT sending to queue, retrying {}", retries ); } - } else if ( response != null ) { - logger.debug("NULL RESPONSE sending to queue, retrying {}", retries ); - - } else { - logger.debug("TIMEOUT sending to queue, retrying {}", retries ); + } catch ( Exception e ) { + logger.debug("ERROR sending to queue, retrying " + retries, e ); } - - } catch ( Exception e ) { - logger.debug("ERROR sending to queue, retrying " + retries, e ); } - } - throw new QakkaRuntimeException( "Error sending to queue after " + retries ); + throw new QakkaRuntimeException( "Error sending to queue after " + retries ); + + } finally { + timer.close(); + } } @@ -214,19 +224,32 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) { List<DatabaseQueueMessage> ret = new ArrayList<>(); - long startTime = System.currentTimeMillis(); + com.codahale.metrics.Timer.Context timer = + metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time(); + + try { - while ( ret.size() < count - && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) { + long startTime = System.currentTimeMillis(); - ret.addAll( getNextMessagesInternal( queueName, count )); + while ( ret.size() < count + && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) { - if ( ret.size() < count ) { - try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {} + ret.addAll( getNextMessagesInternal( queueName, count )); + + if ( ret.size() < count ) { + try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {} + } } + + if ( ret.isEmpty() ) { + logger.info( "Requested {} but queue '{}' is empty", count, queueName); + } + return ret; + + } finally { + timer.close(); } - return ret; } @@ -242,10 +265,10 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } int maxRetries = qakkaFig.getMaxGetRetries(); - int retries = 0; + int tries = 0; QueueGetRequest request = new QueueGetRequest( queueName, count ); - while ( ++retries < maxRetries ) { + while ( ++tries < maxRetries ) { try { Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS ); @@ -261,8 +284,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { if ( response != null && response instanceof QueueGetResponse) { QueueGetResponse qprm = (QueueGetResponse)response; if ( qprm.isSuccess() ) { - if (retries > 1) { - logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries ); + if (tries > 1) { + logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries ); } } logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size()); @@ -270,41 +293,49 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } else if ( response != null ) { - logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries ); + logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, tries ); } else { - logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries ); + logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, tries ); } } else if ( responseObject instanceof ClientActor.ErrorResponse ) { final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject; logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}", - errorResponse.getMessage(), retries ); + errorResponse.getMessage(), tries ); } else { - logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries ); + logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, tries ); } } catch ( Exception e ) { - logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e ); + logger.error("ERROR popping to queue " + queueName + " retrying " + tries, e ); } } throw new QakkaRuntimeException( - "Error getting from queue " + queueName + " after " + retries + " tries"); + "Error getting from queue " + queueName + " after " + tries + " tries"); } @Override public Status ackMessage(String queueName, UUID queueMessageId ) { - if ( queueManager.getQueueConfig( queueName ) == null ) { - throw new NotFoundException( "Queue not found: " + queueName ); - } + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time(); + try { - QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId ); - return sendMessageToLocalQueueActors( message ); + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId ); + return sendMessageToLocalRouters( message ); + + + } finally { + timer.close(); + } } @@ -316,7 +347,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } QueueAckRequest message = new QueueAckRequest( queueName, messageId ); - return sendMessageToLocalQueueActors( message ); + return sendMessageToLocalRouters( message ); } @@ -332,7 +363,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } - private Status sendMessageToLocalQueueActors( QakkaMessage message ) { + private Status sendMessageToLocalRouters( QakkaMessage message ) { int maxRetries = 5; int retries = 0; @@ -367,6 +398,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { } public void shutdown() { - actorSystemManager.shutdownAll(); + // no op } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java index d74936b..3bf6180 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java @@ -28,6 +28,7 @@ import akka.cluster.singleton.ClusterSingletonProxy; import akka.cluster.singleton.ClusterSingletonProxySettings; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.actorsystem.RouterProducer; @@ -41,6 +42,7 @@ import java.util.HashMap; import java.util.Map; +@Singleton public class QueueActorRouterProducer implements RouterProducer { static Injector injector; @@ -131,7 +133,6 @@ public class QueueActorRouterProducer implements RouterProducer { public Collection<Class> getMessageTypes() { return new ArrayList() {{ add( QueueGetRequest.class ); - add( QueueAckRequest.class ); add( QueueInitRequest.class ); add( QueueRefreshRequest.class ); add( QueueTimeoutRequest.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java index 006f1a7..c4e7acc 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java @@ -28,19 +28,22 @@ import akka.cluster.singleton.ClusterSingletonProxy; import akka.cluster.singleton.ClusterSingletonProxySettings; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.actorsystem.RouterProducer; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +@Singleton public class QueueWriterRouterProducer implements RouterProducer { static Injector injector; @@ -128,7 +131,11 @@ public class QueueWriterRouterProducer implements RouterProducer { @Override public Collection<Class> getMessageTypes() { - return Collections.singletonList( QueueWriteRequest.class ); + return new ArrayList() {{ + add( QueueAckRequest.class ); + add( QueueWriteRequest.class ); + }}; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java index a1bbf14..a3919ea 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java @@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.qakka.distributed.messages; import java.io.Serializable; -/** - * Marker interface - */ + public interface QakkaMessage extends Serializable { + + String getQueueName(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java index c8004fb..1776360 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java @@ -30,16 +30,19 @@ import java.util.Collections; public class QueueGetResponse implements QakkaMessage { private final Collection<DatabaseQueueMessage> queueMessages; private final DistributedQueueService.Status status; + private final String queueName; - - public QueueGetResponse(DistributedQueueService.Status status ) { + public QueueGetResponse(DistributedQueueService.Status status, String queueName ) { this.status = status; this.queueMessages = Collections.emptyList(); + this.queueName = queueName; } - public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) { + public QueueGetResponse( + DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages, String queueName) { this.status = status; this.queueMessages = queueMessages; + this.queueName = queueName; } public DistributedQueueService.Status getStatus() { @@ -60,4 +63,9 @@ public class QueueGetResponse implements QakkaMessage { .append( "status", status ) .toString(); } + + @Override + public String getQueueName() { + return queueName; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java index 0c295a0..9b065af 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java @@ -25,9 +25,12 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService public class QueueSendResponse implements QakkaMessage { private final DistributedQueueService.Status status; + private final String queueName; - public QueueSendResponse(DistributedQueueService.Status status) { + + public QueueSendResponse( DistributedQueueService.Status status, String queueName ) { this.status = status; + this.queueName = queueName; } public DistributedQueueService.Status getSendStatus() { @@ -40,4 +43,8 @@ public class QueueSendResponse implements QakkaMessage { .toString(); } + @Override + public String getQueueName() { + return queueName; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java index 1eb513c..463b4df 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java @@ -25,9 +25,11 @@ import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter; public class QueueWriteResponse implements QakkaMessage { private final QueueWriter.WriteStatus status; + private String queueName; - public QueueWriteResponse(QueueWriter.WriteStatus status) { + public QueueWriteResponse(QueueWriter.WriteStatus status, String queueName ) { this.status = status; + this.queueName = queueName; } public QueueWriter.WriteStatus getSendStatus() { @@ -40,4 +42,8 @@ public class QueueWriteResponse implements QakkaMessage { .toString(); } + @Override + public String getQueueName() { + return queueName; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java index 6ec0774..29327e2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java @@ -49,6 +49,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> private final Iterator<Shard> shardIterator; private Iterator<DatabaseQueueMessage> currentIterator; + private Shard currentShard; private UUID nextStart; @@ -184,4 +185,9 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> } + + public Shard getCurrentShard() { + return currentShard; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java index 3ebe735..86c50a5 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java @@ -51,4 +51,9 @@ public interface QueueMessageSerialization extends Migration { DatabaseQueueMessageBody loadMessageData(final UUID messageId); void deleteMessageData(final UUID messageId); + + /** + * Write message to inflight table and remove from available table + */ + void putInflight( DatabaseQueueMessage queueMessage ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index d9a2543..33de7bc 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl; +import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Clause; @@ -137,12 +138,6 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization final UUID queueMessageId = message.getQueueMessageId() == null ? QakkaUtils.getTimeUuid() : message.getQueueMessageId(); - long queuedAt = message.getQueuedAt() == null ? - System.currentTimeMillis() : message.getQueuedAt(); - - long inflightAt = message.getInflightAt() == null ? - message.getQueuedAt() : message.getInflightAt(); - Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ? Shard.Type.DEFAULT : Shard.Type.INFLIGHT; @@ -152,16 +147,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization message.setShardId( shard.getShardId() ); } - Statement insert = QueryBuilder.insertInto(getTableName(message.getType())) - .value( COLUMN_QUEUE_NAME, message.getQueueName()) - .value( COLUMN_REGION, message.getRegion()) - .value( COLUMN_SHARD_ID, message.getShardId()) - .value( COLUMN_MESSAGE_ID, message.getMessageId()) - .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId) - .value( COLUMN_INFLIGHT_AT, inflightAt ) - .value( COLUMN_QUEUED_AT, queuedAt) - .using( QueryBuilder.ttl( maxTtl ) ); - + Statement insert = createWriteMessageStatement( message ); cassandraClient.getQueueMessageSession().execute(insert); shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 ); @@ -233,28 +219,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization final DatabaseQueueMessage.Type type, final UUID queueMessageId ) { - final long shardId; - if ( shardIdOrNull == null ) { - Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? - Shard.Type.DEFAULT : Shard.Type.INFLIGHT; - Shard shard = shardStrategy.selectShard( - queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId ); - shardId = shard.getShardId(); - } else { - shardId = shardIdOrNull; - } - - Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ); - Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region ); - Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId ); - Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId); - - Statement delete = QueryBuilder.delete().from(getTableName( type )) - .where(queueNameClause) - .and(regionClause) - .and(shardIdClause) - .and(queueMessageIdClause); - + Statement delete = createDeleteMessageStatement( queueName, region, null, type,queueMessageId); cassandraClient.getQueueMessageSession().execute( delete ); messageCounterSerialization.decrementCounter( queueName, type, 1L ); @@ -297,14 +262,118 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization public void deleteMessageData( final UUID messageId ) { Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId); - - Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA) - .where(messageIdClause); + Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause); cassandraClient.getApplicationSession().execute(delete); } + @Override + public void putInflight( DatabaseQueueMessage message ) { + + // create statement to write new queue message to inflight table + + Shard.Type shardType = Shard.Type.INFLIGHT; + Shard shard = shardStrategy.selectShard( + message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() ); + + DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage( + message.getMessageId(), + DatabaseQueueMessage.Type.INFLIGHT, + message.getQueueName(), + message.getRegion(), + shard.getShardId(), + message.getQueuedAt(), + System.currentTimeMillis(), + message.getQueueMessageId() ); + + Statement insert = createWriteMessageStatement( inflightMessage ); + + // create statement to delete queue message from available table + + Statement delete = createDeleteMessageStatement( + message.getQueueName(), + message.getRegion(), + null, + DatabaseQueueMessage.Type.DEFAULT, + message.getQueueMessageId()); + + // execute statements as a batch + + BatchStatement batchStatement = new BatchStatement(); + batchStatement.add( insert ); + batchStatement.add( delete ); + cassandraClient.getQueueMessageSession().execute( batchStatement ); + + // bump counters + + shardCounterSerialization.incrementCounter( + message.getQueueName(), Shard.Type.INFLIGHT, message.getShardId(), 1 ); + + messageCounterSerialization.incrementCounter( + message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L ); + + messageCounterSerialization.decrementCounter( + message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L ); + } + + + private Statement createDeleteMessageStatement( final String queueName, + final String region, + final Long shardIdOrNull, + final DatabaseQueueMessage.Type type, + final UUID queueMessageId ) { + final long shardId; + if ( shardIdOrNull == null ) { + Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? + Shard.Type.DEFAULT : Shard.Type.INFLIGHT; + Shard shard = shardStrategy.selectShard( + queueName, region, shardType, queueMessageId ); + shardId = shard.getShardId(); + } else { + shardId = shardIdOrNull; + } + + Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ); + Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region ); + Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId ); + Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId); + + Statement delete = QueryBuilder.delete().from(getTableName( type )) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause) + .and(queueMessageIdClause); + + return delete; + } + + + private Statement createWriteMessageStatement( DatabaseQueueMessage message ) { + + final UUID queueMessageId = message.getQueueMessageId() == null ? + QakkaUtils.getTimeUuid() : message.getQueueMessageId(); + + long queuedAt = message.getQueuedAt() == null ? + System.currentTimeMillis() : message.getQueuedAt(); + + long inflightAt = message.getInflightAt() == null ? + message.getQueuedAt() : message.getInflightAt(); + + Statement insert = QueryBuilder.insertInto(getTableName(message.getType())) + .value( COLUMN_QUEUE_NAME, message.getQueueName()) + .value( COLUMN_REGION, message.getRegion()) + .value( COLUMN_SHARD_ID, message.getShardId()) + .value( COLUMN_MESSAGE_ID, message.getMessageId()) + .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId) + .value( COLUMN_INFLIGHT_AT, inflightAt ) + .value( COLUMN_QUEUED_AT, queuedAt) + .using( QueryBuilder.ttl( maxTtl ) ); + + return insert; + } + + public static String getTableName(DatabaseQueueMessage.Type messageType){ String table; http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index a5c95bd..5bd2b05 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -102,9 +102,6 @@ public class QueueActorServiceTest extends AbstractTest { ByteBuffer blob = dqmb.getBlob(); String returnedData = new String( blob.array(), "UTF-8" ); -// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() ); -// ObjectInputStream ios = new ObjectInputStream( bais ); -// String returnedData = (String)ios.readObject(); Assert.assertEquals( data, returnedData ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java index 791650e..0f5b46f 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java @@ -177,7 +177,7 @@ public class QueueActorHelperTest extends AbstractTest { // put message inflight QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); - helper.putInflight( queueName, message ); + helper.putInflight( message ); // message must be gone from messages_available table http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java index 5b42184..9fb8f29 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java @@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; @@ -53,6 +54,8 @@ public class QueueReaderTest extends AbstractTest { Injector injector = getInjector(); + injector.getInstance( DistributedQueueService.class ); // init the INJECTOR + QakkaFig qakkaFig = injector.getInstance( QakkaFig.class ); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class ); @@ -89,13 +92,16 @@ public class QueueReaderTest extends AbstractTest { // run the QueueRefresher to fill up the in-memory queue - QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); + ActorSystem system = ActorSystem.create("Test-" + queueName); + ActorRef queueReaderRef = system.actorOf( + Props.create( GuiceActorProducer.class, QueueRefresher.class ), "queueReader"); + QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false ); // need to wait for refresh to complete int maxRetries = 10; int retries = 0; while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) { - helper.queueRefresh( queueName ); + queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately Thread.sleep(1000); }
