When writing to queue, if in-memory queue is empty then refresh will be done.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/727ff1d6 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/727ff1d6 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/727ff1d6 Branch: refs/heads/master Commit: 727ff1d642fdaf3768ab2b1f0c07364854ec2e60 Parents: 9306f12 Author: Dave Johnson <[email protected]> Authored: Wed Sep 21 13:48:47 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Sep 21 13:48:47 2016 -0400 ---------------------------------------------------------------------- .../org/apache/usergrid/persistence/qakka/QakkaFig.java | 2 +- .../persistence/qakka/core/impl/InMemoryQueue.java | 4 ++++ .../persistence/qakka/distributed/actors/QueueActor.java | 11 +++++++---- .../qakka/distributed/actors/QueueWriter.java | 3 +++ .../distributed/impl/DistributedQueueServiceImpl.java | 8 ++++++-- .../qakka/distributed/messages/QueueRefreshRequest.java | 8 +++++++- .../persistence/qakka/core/QueueMessageManagerTest.java | 1 + .../qakka/distributed/actors/QueueReaderTest.java | 2 +- 8 files changed, 30 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index c3f4189..da47c98 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -94,7 +94,7 @@ public interface QakkaFig extends GuicyFig, Serializable { /** How often to refresh each queue's in-memory data */ @Key(QUEUE_REFRESH_MILLISECONDS) - @Default("500") + @Default("1000") int getQueueRefreshMilliseconds(); /** How many queue messages to keep in-memory */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java index 474ef5c..27de079 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -78,6 +78,10 @@ public class InMemoryQueue { return getQueue( queueName ).poll(); } + public DatabaseQueueMessage peek( String queueName ) { + return getQueue( queueName ).peek(); + } + public int size( String queueName ) { return getQueue( queueName ).size(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 3b50711..315975c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -86,7 +86,7 @@ public class QueueActor extends UntypedActor { Duration.create( 0, TimeUnit.MILLISECONDS), Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS), self(), - new QueueRefreshRequest( request.getQueueName() ), + new QueueRefreshRequest( request.getQueueName(), false ), getContext().dispatcher(), getSelf()); refreshSchedulersByQueueName.put( request.getQueueName(), scheduler ); @@ -121,9 +121,12 @@ public class QueueActor extends UntypedActor { QueueRefreshRequest request = (QueueRefreshRequest)message; if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { - ActorRef readerRef = getContext().actorOf( Props.create( - QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader"); - queueReadersByQueueName.put( request.getQueueName(), readerRef ); + + if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { + ActorRef readerRef = getContext().actorOf( Props.create( + QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader"); + queueReadersByQueueName.put( request.getQueueName(), readerRef ); + } } // hand-off to queue's reader http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index 7166ef1..e54d916 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -25,6 +25,7 @@ import com.google.inject.Injector; import org.apache.usergrid.persistence.qakka.App; import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; @@ -44,6 +45,7 @@ public class QueueWriter extends UntypedActor { public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR }; + private final DistributedQueueService distributedQueueService; private final QueueMessageSerialization messageSerialization; private final TransferLogSerialization transferLogSerialization; private final AuditLogSerialization auditLogSerialization; @@ -61,6 +63,7 @@ public class QueueWriter extends UntypedActor { auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); metricsService = injector.getInstance( MetricsService.class ); + distributedQueueService = injector.getInstance( DistributedQueueService.class ); messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index 3d6a808..9d71c31 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -25,7 +25,6 @@ import akka.util.Timeout; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.apache.log4j.net.SyslogAppender; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.ClientActor; import org.apache.usergrid.persistence.qakka.QakkaFig; @@ -114,7 +113,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { @Override public void refreshQueue(String queueName) { logger.info("{} Requesting refresh for queue: {}", this, queueName); - QueueRefreshRequest request = new QueueRefreshRequest( queueName ); + QueueRefreshRequest request = new QueueRefreshRequest( queueName, false ); ActorRef clientActor = actorSystemManager.getClientActor(); clientActor.tell( request, null ); } @@ -168,6 +167,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { if ( retries > 1 ) { logger.debug("SUCCESS after {} retries", retries ); } + + logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName); + QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false ); + clientActor.tell( qrr, null ); + return qarm.getSendStatus(); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java index a81a6fd..b65ad3d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java @@ -24,10 +24,16 @@ import org.apache.commons.lang3.builder.ToStringBuilder; public class QueueRefreshRequest implements QakkaMessage { private final String queueName; + private final boolean onlyIfEmpty; - public QueueRefreshRequest(String queueName ) { + public QueueRefreshRequest( String queueName, boolean onlyIfEmpty ) { this.queueName = queueName; + this.onlyIfEmpty = onlyIfEmpty; + } + + public boolean isOnlyIfEmpty() { + return onlyIfEmpty; } public String getQueueName() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 3225a66..c10d1f5 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -175,6 +175,7 @@ public class QueueMessageManagerTest extends AbstractTest { if (inMemoryQueue.size( queueName ) == 40) { break; } + Thread.sleep( 500 ); } Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java index 0b8b795..19c1211 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java @@ -95,7 +95,7 @@ public class QueueReaderTest extends AbstractTest { ActorSystem system = ActorSystem.create("Test-" + queueName); ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader"); - QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName ); + QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false ); // need to wait for refresh to complete int maxRetries = 10;
