Return to using asynchronous actor for in-memory queue refresh job.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a90a0bbd Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a90a0bbd Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a90a0bbd Branch: refs/heads/master Commit: a90a0bbd2fe341395e4bd084e566881feb98d9bf Parents: bd5835b Author: Dave Johnson <[email protected]> Authored: Wed Oct 5 10:02:45 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Oct 5 10:02:45 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 2 +- .../qakka/distributed/actors/QueueActor.java | 126 ++++++++----------- .../distributed/actors/QueueActorHelper.java | 118 ++++++++++++----- .../qakka/core/QueueMessageManagerTest.java | 3 - .../distributed/actors/QueueReaderTest.java | 25 ++-- .../queue/src/test/resources/log4j.properties | 4 +- 6 files changed, 152 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 472c241..7d89187 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable { /** How long to wait for response from queue actor before timing out and trying again */ @Key(QUEUE_GET_TIMEOUT) - @Default("2") + @Default("4") int getGetTimeoutSeconds(); /** Max number of times to retry call to queue writer for queue send operation */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index c706f7d..64f12d4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -129,22 +129,23 @@ public class QueueActor extends UntypedActor { } else if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest)message; - queuesSeen.add( request.getQueueName() ); - queueActorHelper.queueRefresh( request.getQueueName() ); -// if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { -// -// if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { -// ActorRef readerRef = getContext().actorOf( -// Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), -// request.getQueueName() + "_reader"); -// queueReadersByQueueName.put( request.getQueueName(), readerRef ); -// } -// } -// -// // hand-off to queue's reader -// queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); +// // NOT asynchronous +// queueActorHelper.queueRefresh( request.getQueueName() ); + + if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { + + if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { + ActorRef readerRef = getContext().actorOf( + Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), + request.getQueueName() + "_reader"); + queueReadersByQueueName.put( request.getQueueName(), readerRef ); + } + } + + // hand-off to queue's reader + queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); } else if ( message instanceof QueueTimeoutRequest ) { QueueTimeoutRequest request = (QueueTimeoutRequest)message; @@ -158,10 +159,9 @@ public class QueueActor extends UntypedActor { queueTimeoutersByQueueName.put( request.getQueueName(), readerRef ); } - // hand-off to queue's timeouter + // ASYNCHRONOUS -> hand-off to queue's timeouter queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() ); - } else if ( message instanceof ShardCheckRequest ) { ShardCheckRequest request = (ShardCheckRequest)message; @@ -174,75 +174,59 @@ public class QueueActor extends UntypedActor { shardAllocatorsByQueueName.put( request.getQueueName(), readerRef ); } - // hand-off to queue's shard allocator + // ASYNCHRONOUS -> hand-off to queue's shard allocator shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() ); - } else if ( message instanceof QueueGetRequest) { - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time(); - try { - QueueGetRequest queueGetRequest = (QueueGetRequest) message; - - queuesSeen.add( queueGetRequest.getQueueName() ); + QueueGetRequest queueGetRequest = (QueueGetRequest) message; - Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); + String queueName = queueGetRequest.getQueueName(); + int numRequested = queueGetRequest.getNumRequested(); - while (queueMessages.size() < queueGetRequest.getNumRequested()) { + queuesSeen.add( queueName ); - DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() ); + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time(); + try { - if (queueMessage != null) { - if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) { - queueMessages.add( queueMessage ); - } - } else { - logger.debug("in-memory queue for {} is empty, object is: {}", - queueGetRequest.getQueueName(), inMemoryQueue ); - break; - } - } + Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested); messageCounterSerialization.decrementCounter( - queueGetRequest.getQueueName(), + queueName, DatabaseQueueMessage.Type.DEFAULT, - queueMessages.size()); - - logger.debug("{} returning {} for queue {}", - this, queueMessages.size(), queueGetRequest.getQueueName()); + messages.size()); getSender().tell( new QueueGetResponse( - DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() ); - - long runs = runCount.incrementAndGet(); - long messagesReturned = messageCount.addAndGet( queueMessages.size() ); - - if ( logger.isDebugEnabled() && runs % 100 == 0 ) { - - final DecimalFormat format = new DecimalFormat("##.###"); - final long nano = 1000000000; - Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET ); - - logger.debug("QueueActor get stats (queues {}):\n" + - " Num runs={}\n" + - " Messages={}\n" + - " Mean={}\n" + - " One min rate={}\n" + - " Five min rate={}\n" + - " Snapshot mean={}\n" + - " Snapshot min={}\n" + - " Snapshot max={}", - queuesSeen.toArray(), - t.getCount(), - messagesReturned, - format.format( t.getMeanRate() ), - format.format( t.getOneMinuteRate() ), - format.format( t.getFiveMinuteRate() ), - format.format( t.getSnapshot().getMean() / nano ), - format.format( (double) t.getSnapshot().getMin() / nano ), - format.format( (double) t.getSnapshot().getMax() / nano ) ); - } + DistributedQueueService.Status.SUCCESS, messages ), getSender() ); +// long runs = runCount.incrementAndGet(); +// long messagesReturned = messageCount.addAndGet( queueMessages.size() ); +// +// if ( logger.isDebugEnabled() && runs % 100 == 0 ) { +// +// final DecimalFormat format = new DecimalFormat("##.###"); +// final long nano = 1000000000; +// Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET ); +// +// logger.debug("QueueActor get stats (queues {}):\n" + +// " Num runs={}\n" + +// " Messages={}\n" + +// " Mean={}\n" + +// " One min rate={}\n" + +// " Five min rate={}\n" + +// " Snapshot mean={}\n" + +// " Snapshot min={}\n" + +// " Snapshot max={}", +// queuesSeen.toArray(), +// t.getCount(), +// messagesReturned, +// format.format( t.getMeanRate() ), +// format.format( t.getOneMinuteRate() ), +// format.format( t.getFiveMinuteRate() ), +// format.format( t.getSnapshot().getMean() / nano ), +// format.format( (double) t.getSnapshot().getMin() / nano ), +// format.format( (double) t.getSnapshot().getMax() / nano ) ); +// } } finally { timer.close(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index 68250df..0b31e16 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -37,7 +37,8 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -98,12 +99,39 @@ public class QueueActorHelper { } + Collection<DatabaseQueueMessage> getMessages(String queueName, int numRequested ) { + + Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); + + while (queueMessages.size() < numRequested) { + + DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueName ); + + if (queueMessage != null) { + + if (putInflight( queueName, queueMessage )) { + queueMessages.add( queueMessage ); + } + + } else { + //logger.debug("in-memory queue for {} is empty, object is: {}", queueName, inMemoryQueue ); + break; + } + } + + //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName); + return queueMessages; + + } + + DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) { DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage( queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT ); if ( queueMessage == null ) { + logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId); return DistributedQueueService.Status.NOT_INFLIGHT; } @@ -152,10 +180,25 @@ public class QueueActorHelper { UUID qmid = queueMessage.getQueueMessageId(); try { - queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT ); - queueMessage.setShardId( null ); - queueMessage.setInflightAt( System.currentTimeMillis() ); - messageSerialization.writeMessage( queueMessage ); + + DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage( + queueMessage.getMessageId(), + DatabaseQueueMessage.Type.INFLIGHT, + queueName, + actorSystemFig.getRegionLocal(), + null, // let serialization select the shard + queueMessage.getQueuedAt(), + System.currentTimeMillis(), + qmid); + + messageSerialization.writeMessage( inflightMessage ); + + DatabaseQueueMessage retrieved = loadDatabaseQueueMessage( + queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT ); + if ( retrieved == null ) { + logger.error("Failed ot write queue message id {} to inflight table", qmid); + return false; + } messageSerialization.deleteMessage( queueName, @@ -191,6 +234,7 @@ public class QueueActorHelper { return true; } + void queueRefresh( String queueName ) { Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); @@ -199,12 +243,20 @@ public class QueueActorHelper { if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + // TODO: need to track the starting shard + ShardIterator shardIterator = new ShardIterator( cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT, Optional.empty() ); UUID since = inMemoryQueue.getNewest( queueName ); +// if ( since != null ) { +// logger.debug( "Loading queue {} messages newer than {}", queueName, since.timestamp() ); +// } else { +// logger.debug( "Loading queue {} messages newer than [null]", queueName ); +// } + String region = actorSystemFig.getRegionLocal(); MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, @@ -219,34 +271,34 @@ public class QueueActorHelper { count++; } - long runs = runCount.incrementAndGet(); - long readCount = totalRead.addAndGet( count ); - - if ( logger.isDebugEnabled() && runs % 100 == 0 ) { - - final DecimalFormat format = new DecimalFormat("##.###"); - final long nano = 1000000000; - Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME ); - - logger.debug("QueueRefresher for queue '{}' stats:\n" + - " Num runs={}\n" + - " Read count={}\n" + - " Mean={}\n" + - " One min rate={}\n" + - " Five min rate={}\n" + - " Snapshot mean={}\n" + - " Snapshot min={}\n" + - " Snapshot max={}", - queueName, - t.getCount(), - readCount, - format.format( t.getMeanRate() ), - format.format( t.getOneMinuteRate() ), - format.format( t.getFiveMinuteRate() ), - format.format( t.getSnapshot().getMean() / nano ), - format.format( (double) t.getSnapshot().getMin() / nano ), - format.format( (double) t.getSnapshot().getMax() / nano ) ); - } +// long runs = runCount.incrementAndGet(); +// long readCount = totalRead.addAndGet( count ); +// +// if ( logger.isDebugEnabled() && runs % 100 == 0 ) { +// +// final DecimalFormat format = new DecimalFormat("##.###"); +// final long nano = 1000000000; +// Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME ); +// +// logger.debug("QueueRefresher for queue '{}' stats:\n" + +// " Num runs={}\n" + +// " Read count={}\n" + +// " Mean={}\n" + +// " One min rate={}\n" + +// " Five min rate={}\n" + +// " Snapshot mean={}\n" + +// " Snapshot min={}\n" + +// " Snapshot max={}", +// queueName, +// t.getCount(), +// readCount, +// format.format( t.getMeanRate() ), +// format.format( t.getOneMinuteRate() ), +// format.format( t.getFiveMinuteRate() ), +// format.format( t.getSnapshot().getMean() / nano ), +// format.format( (double) t.getSnapshot().getMin() / nano ), +// format.format( (double) t.getSnapshot().getMax() / nano ) ); +// } if ( count > 0 ) { logger.debug( "Added {} in-memory for queue {}, new size = {}", http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 9e0a9d8..f77f31b 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferL import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.apache.usergrid.persistence.queue.TestModule; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,8 +239,6 @@ public class QueueMessageManagerTest extends AbstractTest { Injector injector = getInjector(); - CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - injector.getInstance( App.class ); // init the INJECTOR ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java index ba3c0f8..5b42184 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java @@ -22,22 +22,18 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; -import org.apache.usergrid.persistence.qakka.QakkaFig; -import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.AbstractTest; -import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; import org.junit.Assert; import org.junit.Test; @@ -57,14 +53,14 @@ public class QueueReaderTest extends AbstractTest { Injector injector = getInjector(); - QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class ); - ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); - ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class ); + QakkaFig qakkaFig = injector.getInstance( QakkaFig.class ); + ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); + ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class ); int numMessages = 200; // create queue messages, only first lot get queue message data - QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class ); + QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class ); String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(), @@ -88,21 +84,18 @@ public class QueueReaderTest extends AbstractTest { serialization.writeMessage( message ); } - InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class ); + InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class ); Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); // run the QueueRefresher to fill up the in-memory queue - ActorSystem system = ActorSystem.create("Test-" + queueName); - ActorRef queueReaderRef = system.actorOf( - Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), "queueReader"); - QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false ); + QueueActorHelper helper = injector.getInstance( QueueActorHelper.class ); // need to wait for refresh to complete int maxRetries = 10; int retries = 0; while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) { - queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately + helper.queueRefresh( queueName ); Thread.sleep(1000); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties index e1cbda4..eb45d2a 100644 --- a/stack/corepersistence/queue/src/test/resources/log4j.properties +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -28,6 +28,6 @@ log4j.logger.org.glassfish=WARN #log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG #log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG -log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=DEBUG -log4j.logger.org.apache.usergrid.persistence.queue=INFO +log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG +log4j.logger.org.apache.usergrid.persistence.queue=DEBUG log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO
