Complete message counter and add support for getQueueDepth() in QakkaQueueManager
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9306f12e Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9306f12e Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9306f12e Branch: refs/heads/master Commit: 9306f12eea8cd8f0ad0b2ec4751cd8a9b9ba5382 Parents: 8b79fb8 Author: Dave Johnson <[email protected]> Authored: Wed Sep 21 08:34:28 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Sep 21 08:34:28 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 9 +- .../usergrid/persistence/qakka/QakkaModule.java | 29 ++-- .../qakka/core/QueueMessageManager.java | 4 + .../core/impl/QueueMessageManagerImpl.java | 13 +- .../qakka/distributed/actors/QueueActor.java | 14 +- .../distributed/actors/QueueTimeouter.java | 8 + .../qakka/distributed/actors/QueueWriter.java | 138 ++++++++-------- .../impl/DistributedQueueServiceImpl.java | 6 +- .../MessageCounterSerialization.java | 4 +- .../impl/MessageCounterSerializationImpl.java | 159 +++++++++++++------ .../queue/impl/QakkaQueueManager.java | 4 +- .../qakka/core/QueueMessageManagerTest.java | 2 + .../impl/MessageCounterSerializationTest.java | 90 +++++++++++ .../sharding/ShardCounterSerializationTest.java | 3 - .../queue/src/test/resources/qakka.properties | 1 + 15 files changed, 347 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index c66001d..c3f4189 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -52,7 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds"; - String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.shard.counter"; + String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter"; + + String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes"; String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis"; @@ -125,6 +127,11 @@ public interface QakkaFig extends GuicyFig, Serializable { @Default("100") long getMaxInMemoryShardCounter(); + /** Once counter reaches this value, write it to permanent storage */ + @Key(QUEUE_MAX_MESSAGE_CHANGES) + @Default("100") + long getMaxInMemoryMessageCounter(); + /** How often to check whether new shard is needed for each queue */ @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY) @Default("5000") http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java index d1d8d7e..e3113e1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java @@ -37,7 +37,9 @@ import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterP import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.MessageCounterSerializationImpl; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl; import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization; import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl; @@ -76,23 +78,24 @@ public class QakkaModule extends AbstractModule { bind( App.class ); - bind( CassandraClient.class ).to( CassandraClientImpl.class ); - bind( MetricsService.class ).to( App.class ); + bind( CassandraClient.class ).to( CassandraClientImpl.class ); + bind( MetricsService.class ).to( App.class ); - bind( QueueManager.class ).to( QueueManagerImpl.class ); - bind( QueueSerialization.class ).to( QueueSerializationImpl.class ); + bind( QueueManager.class ).to( QueueManagerImpl.class ); + bind( QueueSerialization.class ).to( QueueSerializationImpl.class ); - bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class ); - bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class ); + bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class ); + bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class ); - bind( ShardSerialization.class ).to( ShardSerializationImpl.class ); - bind( ShardStrategy.class ).to( ShardStrategyImpl.class ); + bind( ShardSerialization.class ).to( ShardSerializationImpl.class ); + bind( ShardStrategy.class ).to( ShardStrategyImpl.class ); - bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class ); + bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class ); + bind( MessageCounterSerialization.class ).to( MessageCounterSerializationImpl.class ); - bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class ); - bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class ); - bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class ); + bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class ); + bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class ); + bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class ); bind( QueueActorRouterProducer.class ); bind( QueueWriterRouterProducer.class ); @@ -110,6 +113,6 @@ public class QakkaModule extends AbstractModule { migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) ); migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) ); migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) ); - //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java index 15203d8..b540fce 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java @@ -19,6 +19,8 @@ package org.apache.usergrid.persistence.qakka.core; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; + import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; @@ -80,4 +82,6 @@ public interface QueueMessageManager { * Get message from messages available or messages inflight storage. */ QueueMessage getMessage(String queueName, UUID queueMessageId); + + long getQueueDepth(String queueName); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java index bcd0f58..691c1a6 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.slf4j.Logger; @@ -58,6 +59,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager { private final DistributedQueueService distributedQueueService; private final TransferLogSerialization transferLogSerialization; private final URIStrategy uriStrategy; + private final MessageCounterSerialization messageCounterSerialization; @Inject @@ -67,8 +69,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager { QueueMessageSerialization queueMessageSerialization, DistributedQueueService distributedQueueService, TransferLogSerialization transferLogSerialization, - URIStrategy uriStrategy - ) { + URIStrategy uriStrategy, + MessageCounterSerialization messageCounterSerialization ) { this.actorSystemFig = actorSystemFig; this.queueManager = queueManager; @@ -76,6 +78,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager { this.distributedQueueService = distributedQueueService; this.transferLogSerialization = transferLogSerialization; this.uriStrategy = uriStrategy; + this.messageCounterSerialization = messageCounterSerialization; } @@ -296,4 +299,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager { return queueMessage; } + + @Override + public long getQueueDepth(String queueName) { + return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 6fed13b..3b50711 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -46,11 +47,13 @@ import java.util.concurrent.TimeUnit; public class QueueActor extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger( QueueActor.class ); - private final QakkaFig qakkaFig; - private final InMemoryQueue inMemoryQueue; + private final QakkaFig qakkaFig; + private final InMemoryQueue inMemoryQueue; private final QueueActorHelper queueActorHelper; private final MetricsService metricsService; + private final MessageCounterSerialization messageCounterSerialization; + private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>(); private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>(); private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>(); @@ -68,6 +71,8 @@ public class QueueActor extends UntypedActor { inMemoryQueue = injector.getInstance( InMemoryQueue.class ); queueActorHelper = injector.getInstance( QueueActorHelper.class ); metricsService = injector.getInstance( MetricsService.class ); + + messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } @Override @@ -173,6 +178,11 @@ public class QueueActor extends UntypedActor { } } + messageCounterSerialization.decrementCounter( + queueGetRequest.getQueueName(), + DatabaseQueueMessage.Type.DEFAULT, + queueMessages.size()); + getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index fcd2161..7806d30 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRe import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; @@ -54,6 +55,8 @@ public class QueueTimeouter extends UntypedActor { private final QakkaFig qakkaFig; private final CassandraClient cassandraClient; + private final MessageCounterSerialization messageCounterSerialization; + public QueueTimeouter(String queueName ) { this.queueName = queueName; @@ -65,6 +68,8 @@ public class QueueTimeouter extends UntypedActor { qakkaFig = injector.getInstance( QakkaFig.class ); metricsService = injector.getInstance( MetricsService.class ); cassandraClient = injector.getInstance( CassandraClientImpl.class ); + + messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } @@ -134,6 +139,9 @@ public class QueueTimeouter extends UntypedActor { if (count > 0) { logger.debug( "Timed out {} messages for queue {}", count, queueName ); + + messageCounterSerialization.decrementCounter( + queueName, DatabaseQueueMessage.Type.DEFAULT, count); } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index 8657370..7166ef1 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResp import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.slf4j.Logger; @@ -48,6 +49,8 @@ public class QueueWriter extends UntypedActor { private final AuditLogSerialization auditLogSerialization; private final MetricsService metricsService; + private final MessageCounterSerialization messageCounterSerialization; + public QueueWriter() { @@ -57,96 +60,101 @@ public class QueueWriter extends UntypedActor { transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); metricsService = injector.getInstance( MetricsService.class ); + + messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } @Override public void onReceive(Object message) { - if (message instanceof QueueWriteRequest) { - - Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time(); + if (message instanceof QueueWriteRequest) { - try { - QueueWriteRequest qa = (QueueWriteRequest) message; - - UUID queueMessageId = QakkaUtils.getTimeUuid(); + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time(); - // TODO: implement deliveryTime and expirationTime + try { + QueueWriteRequest qa = (QueueWriteRequest) message; - DatabaseQueueMessage dbqm = null; - long currentTime = System.currentTimeMillis(); + UUID queueMessageId = QakkaUtils.getTimeUuid(); - try { - dbqm = new DatabaseQueueMessage( - qa.getMessageId(), - DatabaseQueueMessage.Type.DEFAULT, - qa.getQueueName(), - qa.getDestRegion(), - null, - currentTime, - currentTime, - queueMessageId ); + // TODO: implement deliveryTime and expirationTime - messageSerialization.writeMessage( dbqm ); + DatabaseQueueMessage dbqm = null; + long currentTime = System.currentTimeMillis(); - //logger.debug("Wrote queue message id {} to queue name {}", - // dbqm.getQueueMessageId(), dbqm.getQueueName()); + try { + dbqm = new DatabaseQueueMessage( + qa.getMessageId(), + DatabaseQueueMessage.Type.DEFAULT, + qa.getQueueName(), + qa.getDestRegion(), + null, + currentTime, + currentTime, + queueMessageId ); - } catch (Throwable t) { - logger.debug("Error creating database queue message", t); + messageSerialization.writeMessage( dbqm ); - auditLogSerialization.recordAuditLog( - AuditLog.Action.SEND, - AuditLog.Status.ERROR, - qa.getQueueName(), - qa.getDestRegion(), - qa.getMessageId(), - dbqm.getMessageId() ); + messageCounterSerialization.incrementCounter( + qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1); - getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.ERROR ), getSender() ); + //logger.debug("Wrote queue message id {} to queue name {}", + // dbqm.getQueueMessageId(), dbqm.getQueueName()); - return; - } + } catch (Throwable t) { + logger.debug("Error creating database queue message", t); auditLogSerialization.recordAuditLog( AuditLog.Action.SEND, - AuditLog.Status.SUCCESS, + AuditLog.Status.ERROR, qa.getQueueName(), qa.getDestRegion(), qa.getMessageId(), - dbqm.getQueueMessageId() ); - - try { - transferLogSerialization.removeTransferLog( - qa.getQueueName(), - qa.getSourceRegion(), - qa.getDestRegion(), - qa.getMessageId() ); - - getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() ); - - } catch (Throwable e) { - logger.debug( "Unable to delete transfer log for {} {} {} {}", - qa.getQueueName(), - qa.getSourceRegion(), - qa.getDestRegion(), - qa.getMessageId() ); - logger.debug("Error deleting transferlog", e); - - getSender().tell( new QueueWriteResponse( - QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() ); - } - - } finally { - timer.close(); + dbqm.getMessageId() ); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.ERROR ), getSender() ); + + return; } - } else { - unhandled( message ); + auditLogSerialization.recordAuditLog( + AuditLog.Action.SEND, + AuditLog.Status.SUCCESS, + qa.getQueueName(), + qa.getDestRegion(), + qa.getMessageId(), + dbqm.getQueueMessageId() ); + + try { + transferLogSerialization.removeTransferLog( + qa.getQueueName(), + qa.getSourceRegion(), + qa.getDestRegion(), + qa.getMessageId() ); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() ); + + } catch (Throwable e) { + logger.debug( "Unable to delete transfer log for {} {} {} {}", + qa.getQueueName(), + qa.getSourceRegion(), + qa.getDestRegion(), + qa.getMessageId() ); + logger.debug("Error deleting transferlog", e); + + getSender().tell( new QueueWriteResponse( + QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() ); + } + + } finally { + timer.close(); } + } else { + unhandled( message ); + } + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index be20cde..3d6a808 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService import org.apache.usergrid.persistence.qakka.distributed.messages.*; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -54,17 +55,20 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { private final ActorSystemManager actorSystemManager; private final QueueManager queueManager; private final QakkaFig qakkaFig; + private final MessageCounterSerialization messageCounterSerialization; @Inject public DistributedQueueServiceImpl( ActorSystemManager actorSystemManager, QueueManager queueManager, - QakkaFig qakkaFig ) { + QakkaFig qakkaFig, + MessageCounterSerialization messageCounterSerialization ) { this.actorSystemManager = actorSystemManager; this.queueManager = queueManager; this.qakkaFig = qakkaFig; + this.messageCounterSerialization = messageCounterSerialization; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java index cbbf11f..6c81863 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java @@ -21,11 +21,11 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages; import org.apache.usergrid.persistence.core.migration.schema.Migration; -public interface MessageCounterSerialization extends Migration { +public interface MessageCounterSerialization extends Migration { void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment); - void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment); + void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement); long getCounterValue(String name, DatabaseQueueMessage.Type type); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index f198d05..0fdb47e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; -import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,49 +43,56 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @Singleton -public class MessageCounterSerializationImpl implements ShardCounterSerialization { +public class MessageCounterSerializationImpl implements MessageCounterSerialization { private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class ); private final CassandraClient cassandraClient; private final CassandraConfig cassandraConfig; - final static String TABLE_SHARD_COUNTERS = "counters"; - final static String COLUMN_QUEUE_NAME = "queue_name"; - final static String COLUMN_SHARD_ID = "shard_id"; - final static String COLUMN_COUNTER_VALUE = "counter_value"; - final static String COLUMN_SHARD_TYPE = "shard_type"; + final static String TABLE_MESSAGE_COUNTERS = "message_counters"; + final static String COLUMN_QUEUE_NAME = "queue_name"; + final static String COLUMN_COUNTER_VALUE = "counter_value"; + final static String COLUMN_MESSAGE_TYPE = "message_type"; // design note: counters based on DataStax example here: // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html static final String CQL = - "CREATE TABLE IF NOT EXISTS shard_counters ( " + + "CREATE TABLE IF NOT EXISTS message_counters ( " + "counter_value counter, " + "queue_name varchar, " + - "shard_type varchar, " + - "shard_id bigint, " + - "PRIMARY KEY (queue_name, shard_type, shard_id) " + + "message_type varchar, " + + "PRIMARY KEY (queue_name, message_type) " + ");"; - final long maxInMemoryIncrement; + /** number of changes since last save to database */ + final AtomicInteger numChanges = new AtomicInteger( 0 ); + + final long maxChangesBeforeSave; class InMemoryCount { long baseCount; final AtomicLong increment = new AtomicLong( 0L ); + final AtomicLong decrement = new AtomicLong( 0L ); + InMemoryCount( long baseCount ) { this.baseCount = baseCount; } - public long value() { - return baseCount + increment.get(); - } public AtomicLong getIncrement() { return increment; } + public AtomicLong getDecrement() { + return decrement; + } + public long value() { + return baseCount + increment.get() - decrement.get(); + } void setBaseCount( long baseCount ) { this.baseCount = baseCount; } @@ -95,64 +102,89 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Inject - public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + public MessageCounterSerializationImpl( + CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; - this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); + this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter(); this.cassandraClient = cassandraClient; } + private String buildKey( String queueName, DatabaseQueueMessage.Type type ) { + return queueName + "_" + type; + } + + @Override - public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) { + public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment ) { + + String key = buildKey( queueName, type ); - String key = queueName + type + shardId; synchronized ( inMemoryCounters ) { if ( inMemoryCounters.get( key ) == null ) { - Long value = retrieveCounterFromStorage( queueName, type, shardId ); + Long value = retrieveCounterFromStorage( queueName, type ); if ( value == null ) { - incrementCounterInStorage( queueName, type, shardId, 0L ); + incrementCounterInStorage( queueName, type, 0L ); inMemoryCounters.put( key, new InMemoryCount( 0L )); } else { inMemoryCounters.put( key, new InMemoryCount( value )); } - inMemoryCounters.get( key ).getIncrement().addAndGet( increment ); - return; } } InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + inMemoryCount.getIncrement().addAndGet( increment ); - synchronized ( inMemoryCount ) { - long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment ); + saveIfNeeded( queueName, type ); + } - if (totalIncrement > maxInMemoryIncrement) { - incrementCounterInStorage( queueName, type, shardId, totalIncrement ); - inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) ); - inMemoryCount.getIncrement().set( 0L ); + + @Override + public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement) { + + String key = buildKey( queueName, type ); + + synchronized ( inMemoryCounters ) { + + if ( inMemoryCounters.get( key ) == null ) { + + Long value = retrieveCounterFromStorage( queueName, type ); + + if ( value == null ) { + decrementCounterInStorage( queueName, type, 0L ); + inMemoryCounters.put( key, new InMemoryCount( 0L )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); + } } } + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + inMemoryCount.getDecrement().addAndGet( decrement ); + + saveIfNeeded( queueName, type ); } @Override - public long getCounterValue( String queueName, Shard.Type type, long shardId ) { + public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) { - String key = queueName + type + shardId; + String key = buildKey( queueName, type ); synchronized ( inMemoryCounters ) { if ( inMemoryCounters.get( key ) == null ) { - Long value = retrieveCounterFromStorage( queueName, type, shardId ); + Long value = retrieveCounterFromStorage( queueName, type ); if ( value == null ) { throw new NotFoundException( - MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}", - queueName, type, shardId )); + MessageFormat.format( "No counter found for queue {0} type {1}", + queueName, type )); } else { inMemoryCounters.put( key, new InMemoryCount( value )); } @@ -162,30 +194,39 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio return inMemoryCounters.get( key ).value(); } - void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) { - Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS ) + void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) { + + Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) - .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) - .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) + .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) ) .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) ); cassandraClient.getQueueMessageSession().execute( update ); } - Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) { + void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) { - Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS ) + Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString() ) ) + .with( QueryBuilder.decr( COLUMN_COUNTER_VALUE, decrement ) ); + cassandraClient.getQueueMessageSession().execute( update ); + } + + + Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) { + + Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS ) .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) - .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) ) - .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ); + .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString()) ); ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query ); List<Row> all = resultSet.all(); if ( all.size() > 1 ) { throw new QakkaRuntimeException( - "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId ); + "Multiple rows for counter " + queueName + " type " + type ); } if ( all.isEmpty() ) { return null; @@ -194,6 +235,32 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio } + private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) { + + String key = buildKey( queueName, type ); + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { + + if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { + + long totalIncrement = inMemoryCount.getIncrement().get(); + incrementCounterInStorage( queueName, type, totalIncrement ); + + long totalDecrement = inMemoryCount.getDecrement().get(); + decrementCounterInStorage( queueName, type, totalDecrement ); + + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); + inMemoryCount.getIncrement().set( 0L ); + inMemoryCount.getDecrement().set( 0L ); + + numChanges.set( 0 ); + } + } + } + + @Override public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { return Collections.EMPTY_LIST; @@ -201,8 +268,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( - new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) ); + return Collections.singletonList( new TableDefinitionStringImpl( + cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGE_COUNTERS, CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index 0eb609d..f3cae86 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.*; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.queue.LegacyQueueFig; import org.apache.usergrid.persistence.queue.LegacyQueueManager; import org.apache.usergrid.persistence.queue.LegacyQueueMessage; @@ -139,10 +140,9 @@ public class QakkaQueueManager implements LegacyQueueManager { return messages; } - @Override public long getQueueDepth() { - return 0; + return queueMessageManager.getQueueDepth( scope.getName() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 0413f81..3225a66 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -177,6 +177,8 @@ public class QueueMessageManagerTest extends AbstractTest { } } + Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) ); + // get all messages from queue List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java new file mode 100644 index 0000000..a4ea0f1 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl; + +import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.fail; + + +/** + * Created by Dave Johnson ([email protected]) on 9/20/16. + */ +public class MessageCounterSerializationTest extends AbstractTest { + + @Test + public void testBasicOperation() { + + Injector injector = getInjector(); + MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class ); + + String queueName = "mcst_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); + + try { + mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ); + fail("Should have throw NotFoundException"); + } catch ( NotFoundException expected ) { + // pass + } + + for ( int i=0; i<10; i++ ) { + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 ); + Assert.assertEquals( i+1, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + } + + mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 10, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 20, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 30, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 40, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 50, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 50 ); + Assert.assertEquals( 100, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 90, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 80, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + + mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 ); + Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java index f9c2951..8dc16bb 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java @@ -36,9 +36,6 @@ public class ShardCounterSerializationTest extends AbstractTest { @Test public void testBasicOperation() throws Exception { - CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - - ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class ); String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index aacc187..fb46f3d 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -44,6 +44,7 @@ queue.shard.allocation.check.frequency.millis=100 queue.shard.allocation.advance.time.millis=200 queue.max.inmemory.shard.counter = 100 +queue.max.inmemory.max.message.changes=3 queue.long.polling.time.millis=2000
