Now supports elasticsearch.queue_impl=MULTIREGION setting instead of SNS, also more/better DEBUG logging
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2cd8ecb3 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2cd8ecb3 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2cd8ecb3 Branch: refs/heads/usergrid-1318-queue Commit: 2cd8ecb30fc5f97927ec000573daba2ba16e914f Parents: 00eb139 Author: Dave Johnson <[email protected]> Authored: Fri Sep 23 12:47:29 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Fri Sep 23 12:47:29 2016 -0400 ---------------------------------------------------------------------- .../asyncevents/AsyncIndexProvider.java | 46 ++++++++++++---- .../qakka/distributed/actors/QueueActor.java | 53 ++++++++++++++++-- .../distributed/actors/QueueRefresher.java | 56 +++++++++++++++----- .../distributed/actors/QueueTimeouter.java | 44 +++++++++++++-- .../impl/DistributedQueueServiceImpl.java | 12 +++-- .../queue/impl/QueueManagerFactoryImpl.java | 7 ++- .../queue/src/test/resources/log4j.properties | 11 ++-- tests/performance/results.txt | 1 + 8 files changed, 188 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index aac0e66..f4a9bd2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -103,16 +103,44 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final Implementations impl = Implementations.valueOf(value); switch (impl) { + case LOCAL: - AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler); + AsyncEventServiceImpl eventService = + new AsyncEventServiceImpl(scope -> new LocalQueueManager(), + indexProcessorFig, + indexProducer, + metricsFactory, + entityCollectionManagerFactory, + indexLocationStrategyFactory, + entityIndexFactory, + eventBuilder, + mapManagerFactory, + queueFig,rxTaskScheduler); eventService.MAX_TAKE = 1000; return eventService; + case SQS: - throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); + throw new IllegalArgumentException( + "Configuration value of SQS is no longer allowed. Use SNS instead with only a single region."); + case SNS: - return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); + throw new IllegalArgumentException( + "Configuration value of SNS is no longer allowed. Use MULTIREGION instead. "); + + case MULTIREGION: + return new AsyncEventServiceImpl( + queueManagerFactory, + indexProcessorFig, + indexProducer, + metricsFactory, + entityCollectionManagerFactory, + indexLocationStrategyFactory, + entityIndexFactory, + eventBuilder, + mapManagerFactory, + queueFig, + rxTaskScheduler ); + default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } @@ -135,12 +163,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { /** * Different implementations */ - public static enum Implementations { //TODO see about removing SNS and SQS and use AMZN? - michaelarusso + public static enum Implementations { TEST, LOCAL, - SQS, - SNS; - + SQS, // deprecated + SNS, // deprecated + MULTIREGION; // built-in Akka-based queue public String asString() { return toString(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 315975c..87342ad 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -37,11 +37,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.text.DecimalFormat; +import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class QueueActor extends UntypedActor { @@ -62,6 +61,10 @@ public class QueueActor extends UntypedActor { private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>(); private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>(); + private final AtomicLong runCount = new AtomicLong(0); + private final AtomicLong messageCount = new AtomicLong(0); + private final Set<String> queuesSeen = new HashSet<>(); + public QueueActor() { @@ -81,6 +84,8 @@ public class QueueActor extends UntypedActor { if ( message instanceof QueueInitRequest) { QueueInitRequest request = (QueueInitRequest)message; + queuesSeen.add( request.getQueueName() ); + if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) { Cancellable scheduler = getContext().system().scheduler().schedule( Duration.create( 0, TimeUnit.MILLISECONDS), @@ -120,6 +125,8 @@ public class QueueActor extends UntypedActor { } else if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest)message; + queuesSeen.add( request.getQueueName() ); + if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { @@ -135,6 +142,8 @@ public class QueueActor extends UntypedActor { } else if ( message instanceof QueueTimeoutRequest ) { QueueTimeoutRequest request = (QueueTimeoutRequest)message; + queuesSeen.add( request.getQueueName() ); + if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( Props.create( QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter"); @@ -148,6 +157,8 @@ public class QueueActor extends UntypedActor { } else if ( message instanceof ShardCheckRequest ) { ShardCheckRequest request = (ShardCheckRequest)message; + queuesSeen.add( request.getQueueName() ); + if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( Props.create( ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator"); @@ -164,6 +175,8 @@ public class QueueActor extends UntypedActor { try { QueueGetRequest queueGetRequest = (QueueGetRequest) message; + queuesSeen.add( queueGetRequest.getQueueName() ); + Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); while (queueMessages.size() < queueGetRequest.getNumRequested()) { @@ -189,6 +202,36 @@ public class QueueActor extends UntypedActor { getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() ); + long runs = runCount.incrementAndGet(); + long messagesReturned = messageCount.addAndGet( queueMessages.size() ); + + if ( logger.isDebugEnabled() && runs % 100 == 0 ) { + + final DecimalFormat format = new DecimalFormat("##.###"); + final long nano = 1000000000; + Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET ); + + logger.debug("QueueActor get stats (queues {}):\n" + + " Num runs={}\n" + + " Messages={}\n" + + " Mean={}\n" + + " One min rate={}\n" + + " Five min rate={}\n" + + " Snapshot mean={}\n" + + " Snapshot min={}\n" + + " Snapshot max={}", + queuesSeen.toArray(), + t.getCount(), + messagesReturned, + format.format( t.getMeanRate() ), + format.format( t.getOneMinuteRate() ), + format.format( t.getFiveMinuteRate() ), + format.format( t.getSnapshot().getMean() / nano ), + format.format( (double) t.getSnapshot().getMin() / nano ), + format.format( (double) t.getSnapshot().getMax() / nano ) ); + } + + } finally { timer.close(); } @@ -201,6 +244,8 @@ public class QueueActor extends UntypedActor { QueueAckRequest queueAckRequest = (QueueAckRequest) message; + queuesSeen.add( queueAckRequest.getQueueName() ); + DistributedQueueService.Status status = queueActorHelper.ackQueueMessage( queueAckRequest.getQueueName(), queueAckRequest.getQueueMessageId() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index ebc328f..dbd5235 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -39,28 +39,31 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.DecimalFormat; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; public class QueueRefresher extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); - private final String queueName; - - private final QueueMessageSerialization serialization; - private final InMemoryQueue inMemoryQueue; - private final QakkaFig qakkaFig; - private final ActorSystemFig actorSystemFig; - private final MetricsService metricsService; + private final String queueName; + private final InMemoryQueue inMemoryQueue; + private final QakkaFig qakkaFig; + private final ActorSystemFig actorSystemFig; + private final MetricsService metricsService; private final CassandraClient cassandraClient; + private final AtomicLong runCount = new AtomicLong(0); + private final AtomicLong totalRead = new AtomicLong(0); + + public QueueRefresher(String queueName ) { this.queueName = queueName; Injector injector = App.INJECTOR; - serialization = injector.getInstance( QueueMessageSerialization.class ); inMemoryQueue = injector.getInstance( InMemoryQueue.class ); qakkaFig = injector.getInstance( QakkaFig.class ); actorSystemFig = injector.getInstance( ActorSystemFig.class ); @@ -76,7 +79,7 @@ public class QueueRefresher extends UntypedActor { QueueRefreshRequest request = (QueueRefreshRequest) message; - logger.debug( "running for queue {}", queueName ); + //logger.debug( "running for queue {}", queueName ); if (!request.getQueueName().equals( queueName )) { throw new QakkaRuntimeException( @@ -109,10 +112,39 @@ public class QueueRefresher extends UntypedActor { count++; } - if ( count > 0 ) { - logger.debug( "Added {} in-memory for queue {}, new size = {}", - count, queueName, inMemoryQueue.size( queueName ) ); + long runs = runCount.incrementAndGet(); + long readCount = totalRead.addAndGet( count ); + + if ( logger.isDebugEnabled() && runs % 100 == 0 ) { + + final DecimalFormat format = new DecimalFormat("##.###"); + final long nano = 1000000000; + Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME ); + + logger.debug("QueueRefresher for queue '{}' stats:\n" + + " Num runs={}\n" + + " Read count={}\n" + + " Mean={}\n" + + " One min rate={}\n" + + " Five min rate={}\n" + + " Snapshot mean={}\n" + + " Snapshot min={}\n" + + " Snapshot max={}", + queueName, + t.getCount(), + readCount, + format.format( t.getMeanRate() ), + format.format( t.getOneMinuteRate() ), + format.format( t.getFiveMinuteRate() ), + format.format( t.getSnapshot().getMean() / nano ), + format.format( (double) t.getSnapshot().getMin() / nano ), + format.format( (double) t.getSnapshot().getMax() / nano ) ); } + +// if ( count > 0 ) { +// logger.debug( "Added {} in-memory for queue {}, new size = {}", +// count, queueName, inMemoryQueue.size( queueName ) ); +// } } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index 7806d30..b47aac6 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -40,8 +40,10 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.DecimalFormat; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; public class QueueTimeouter extends UntypedActor { @@ -54,9 +56,11 @@ public class QueueTimeouter extends UntypedActor { private final ActorSystemFig actorSystemFig; private final QakkaFig qakkaFig; private final CassandraClient cassandraClient; - private final MessageCounterSerialization messageCounterSerialization; + private final AtomicLong runCount = new AtomicLong(0); + private final AtomicLong totalTimedout = new AtomicLong(0); + public QueueTimeouter(String queueName ) { this.queueName = queueName; @@ -137,13 +141,43 @@ public class QueueTimeouter extends UntypedActor { } } - if (count > 0) { - logger.debug( "Timed out {} messages for queue {}", count, queueName ); - messageCounterSerialization.decrementCounter( - queueName, DatabaseQueueMessage.Type.DEFAULT, count); + long runs = runCount.incrementAndGet(); + long timeoutCount = totalTimedout.addAndGet( count ); + + if ( logger.isDebugEnabled() && runs % 100 == 0 ) { + + final DecimalFormat format = new DecimalFormat("##.###"); + final long nano = 1000000000; + Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME ); + + logger.debug("QueueTimeouter for queue '{}' stats:\n" + + " Num runs={}\n" + + " Timeout count={}\n" + + " Mean={}\n" + + " One min rate={}\n" + + " Five min rate={}\n" + + " Snapshot mean={}\n" + + " Snapshot min={}\n" + + " Snapshot max={}", + queueName, + t.getCount(), + timeoutCount, + format.format( t.getMeanRate() ), + format.format( t.getOneMinuteRate() ), + format.format( t.getFiveMinuteRate() ), + format.format( t.getSnapshot().getMean() / nano ), + format.format( (double) t.getSnapshot().getMin() / nano ), + format.format( (double) t.getSnapshot().getMax() / nano ) ); } +// if (count > 0) { +// logger.debug( "Timed out {} messages for queue {}", count, queueName ); +// +// messageCounterSerialization.decrementCounter( +// queueName, DatabaseQueueMessage.Type.DEFAULT, count); +// } + } finally { timer.close(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 9d71c31..bcb6b79 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; @@ -168,7 +165,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { logger.debug("SUCCESS after {} retries", retries ); } - logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName); + // send refresh-queue-if-empty message QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); clientActor.tell( qrr, null ); @@ -221,6 +218,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist"); } + if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) { + logger.error("Akka Actor System is not ready yet for requests."); + return Collections.EMPTY_LIST; + } + int maxRetries = qakkaFig.getMaxGetRetries(); int retries = 0; http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index e1bf239..4a41dda 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -53,15 +53,18 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory { if ( queueFig.overrideQueueForDefault() ){ LegacyQueueManager manager = defaultManager.get( scope.getName() ); - logger.info("Using Queue Implemention: {}", manager.getClass().getSimpleName()); if ( manager == null ) { + logger.info("Using LocalQueueManager for scope {}", scope.getName() ); manager = new LocalQueueManager(); defaultManager.put( scope.getName(), manager ); } return manager; } else { - return queuemanagerInternalFactory.getQueueManager(scope); + LegacyQueueManager queueManager = queuemanagerInternalFactory.getQueueManager( scope ); + logger.info("Using queue manager {} for scope {}", + queueManager.getClass().getSimpleName(), scope.getName() ); + return queueManager; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties index 9e14f29..b207ea3 100644 --- a/stack/corepersistence/queue/src/test/resources/log4j.properties +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -21,12 +21,13 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n -log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG -log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG -log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG - log4j.logger.org.apache.cassandra=WARN log4j.logger.org.glassfish=WARN -log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG +#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG +#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG +#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG +log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG +log4j.logger.org.apache.usergrid.persistence.queue=DEBUG +log4j.logger.org.apache.usergrid.corepersistence.asyncevents=DEBUG http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/tests/performance/results.txt ---------------------------------------------------------------------- diff --git a/tests/performance/results.txt b/tests/performance/results.txt new file mode 100644 index 0000000..fb63637 --- /dev/null +++ b/tests/performance/results.txt @@ -0,0 +1 @@ +collection,name,uuid,modified,status,error,lastStatus
