Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue a90a0bbd2 -> fbce160a1
Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe08fa72 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe08fa72 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe08fa72 Branch: refs/heads/usergrid-1318-queue Commit: fe08fa72f8e807565141bac871796479f28aa3f1 Parents: a90a0bb Author: Dave Johnson <[email protected]> Authored: Wed Oct 5 17:38:32 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Oct 5 17:38:32 2016 -0400 ---------------------------------------------------------------------- .../qakka/distributed/actors/QueueActor.java | 75 +++++++------------- .../distributed/actors/QueueActorRouter.java | 2 +- .../distributed/actors/QueueRefresher.java | 1 - .../qakka/distributed/actors/QueueSender.java | 34 +++++---- .../distributed/actors/QueueSenderRouter.java | 2 +- .../distributed/actors/QueueTimeouter.java | 66 +++++------------ .../qakka/distributed/actors/QueueWriter.java | 30 ++++---- .../distributed/actors/QueueWriterRouter.java | 2 +- .../distributed/actors/ShardAllocator.java | 35 ++++++--- .../impl/DistributedQueueServiceImpl.java | 13 ++-- .../impl/QueueActorRouterProducer.java | 2 +- .../impl/QueueSenderRouterProducer.java | 2 +- .../impl/QueueWriterRouterProducer.java | 2 +- .../impl/QueueMessageSerializationImpl.java | 18 +++-- 14 files changed, 132 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java index 64f12d4..9ce38ef 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -52,7 +52,6 @@ public class QueueActor extends UntypedActor { private final InMemoryQueue inMemoryQueue; private final QueueActorHelper queueActorHelper; private final MetricsService metricsService; - private final MessageCounterSerialization messageCounterSerialization; private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>(); @@ -63,24 +62,32 @@ public class QueueActor extends UntypedActor { private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>(); private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>(); - private final AtomicLong runCount = new AtomicLong(0); - private final AtomicLong messageCount = new AtomicLong(0); private final Set<String> queuesSeen = new HashSet<>(); - private final Injector injector; + //private final Injector injector; @Inject - public QueueActor( Injector injector ) { - - this.injector = injector; - - qakkaFig = injector.getInstance( QakkaFig.class ); - inMemoryQueue = injector.getInstance( InMemoryQueue.class ); - queueActorHelper = injector.getInstance( QueueActorHelper.class ); - metricsService = injector.getInstance( MetricsService.class ); - - messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); + public QueueActor( + //Injector injector, + QakkaFig qakkaFig, + InMemoryQueue inMemoryQueue, + QueueActorHelper queueActorHelper, + MetricsService metricsService, + MessageCounterSerialization messageCounterSerialization + ) { + //this.injector = injector; + this.qakkaFig = qakkaFig; + this.inMemoryQueue = inMemoryQueue; + this.queueActorHelper = queueActorHelper; + this.metricsService = metricsService; + this.messageCounterSerialization = messageCounterSerialization; + +// qakkaFig = injector.getInstance( QakkaFig.class ); +// inMemoryQueue = injector.getInstance( InMemoryQueue.class ); +// queueActorHelper = injector.getInstance( QueueActorHelper.class ); +// metricsService = injector.getInstance( MetricsService.class ); +// messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); } @Override @@ -138,7 +145,7 @@ public class QueueActor extends UntypedActor { if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) { ActorRef readerRef = getContext().actorOf( - Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), + Props.create( GuiceActorProducer.class, QueueRefresher.class ), request.getQueueName() + "_reader"); queueReadersByQueueName.put( request.getQueueName(), readerRef ); } @@ -154,7 +161,7 @@ public class QueueActor extends UntypedActor { if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( - Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class), + Props.create( GuiceActorProducer.class, QueueTimeouter.class), request.getQueueName() + "_timeouter"); queueTimeoutersByQueueName.put( request.getQueueName(), readerRef ); } @@ -169,7 +176,7 @@ public class QueueActor extends UntypedActor { if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) { ActorRef readerRef = getContext().actorOf( - Props.create( GuiceActorProducer.class, injector, ShardAllocator.class), + Props.create( GuiceActorProducer.class, ShardAllocator.class), request.getQueueName() + "_shard_allocator"); shardAllocatorsByQueueName.put( request.getQueueName(), readerRef ); } @@ -191,43 +198,9 @@ public class QueueActor extends UntypedActor { Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested); - messageCounterSerialization.decrementCounter( - queueName, - DatabaseQueueMessage.Type.DEFAULT, - messages.size()); - getSender().tell( new QueueGetResponse( DistributedQueueService.Status.SUCCESS, messages ), getSender() ); -// long runs = runCount.incrementAndGet(); -// long messagesReturned = messageCount.addAndGet( queueMessages.size() ); -// -// if ( logger.isDebugEnabled() && runs % 100 == 0 ) { -// -// final DecimalFormat format = new DecimalFormat("##.###"); -// final long nano = 1000000000; -// Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET ); -// -// logger.debug("QueueActor get stats (queues {}):\n" + -// " Num runs={}\n" + -// " Messages={}\n" + -// " Mean={}\n" + -// " One min rate={}\n" + -// " Five min rate={}\n" + -// " Snapshot mean={}\n" + -// " Snapshot min={}\n" + -// " Snapshot max={}", -// queuesSeen.toArray(), -// t.getCount(), -// messagesReturned, -// format.format( t.getMeanRate() ), -// format.format( t.getOneMinuteRate() ), -// format.format( t.getFiveMinuteRate() ), -// format.format( t.getSnapshot().getMean() / nano ), -// format.format( (double) t.getSnapshot().getMin() / nano ), -// format.format( (double) t.getSnapshot().getMax() / nano ) ); -// } - } finally { timer.close(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index 9257a0d..c40a3d9 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -42,7 +42,7 @@ public class QueueActorRouter extends UntypedActor { public QueueActorRouter( Injector injector ) { this.routerRef = getContext().actorOf( FromConfig.getInstance().props( - Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router"); + Props.create(GuiceActorProducer.class, QueueActor.class)), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index 2f70088..ae9969c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -43,7 +43,6 @@ public class QueueRefresher extends UntypedActor { if ( message instanceof QueueRefreshRequest ) { QueueRefreshRequest request = (QueueRefreshRequest) message; - logger.debug( "running for queue {}", request.getQueueName() ); String queueName = request.getQueueName(); helper.queueRefresh( queueName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index 739e1c4..3dc695e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -61,19 +61,32 @@ public class QueueSender extends UntypedActor { private final TransferLogSerialization transferLogSerialization; private final AuditLogSerialization auditLogSerialization; private final ActorSystemFig actorSystemFig; - private final QakkaFig qakkaFig; + private final QakkaFig qakkaFig; private final MetricsService metricsService; @Inject - public QueueSender( Injector injector ) { - - actorSystemManager = injector.getInstance( ActorSystemManager.class ); - transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); - auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); - actorSystemFig = injector.getInstance( ActorSystemFig.class ); - qakkaFig = injector.getInstance( QakkaFig.class ); - metricsService = injector.getInstance( MetricsService.class ); + public QueueSender( + ActorSystemManager actorSystemManager, + TransferLogSerialization transferLogSerialization, + AuditLogSerialization auditLogSerialization, + ActorSystemFig actorSystemFig, + QakkaFig qakkaFig, + MetricsService metricsService + ) { + this.actorSystemManager = actorSystemManager; + this.transferLogSerialization = transferLogSerialization; + this.auditLogSerialization = auditLogSerialization; + this.actorSystemFig = actorSystemFig; + this.qakkaFig = qakkaFig; + this.metricsService = metricsService; + +// actorSystemManager = injector.getInstance( ActorSystemManager.class ); +// transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); +// auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); +// actorSystemFig = injector.getInstance( ActorSystemFig.class ); +// qakkaFig = injector.getInstance( QakkaFig.class ); +// metricsService = injector.getInstance( MetricsService.class ); } @Override @@ -195,9 +208,6 @@ public class QueueSender extends UntypedActor { } else if ( writeStatus != null && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) { - //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}", - // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} ); - // queue actor failed to clean up transfer log try { transferLogSerialization.removeTransferLog( http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java index 92d0785..a205d71 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java @@ -41,7 +41,7 @@ public class QueueSenderRouter extends UntypedActor { public QueueSenderRouter( Injector injector ) { this.router = getContext().actorOf( FromConfig.getInstance().props( - Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router"); + Props.create( GuiceActorProducer.class, QueueSender.class )), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java index 9b11277..33f1dd9 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java @@ -53,24 +53,29 @@ public class QueueTimeouter extends UntypedActor { private final QueueMessageSerialization messageSerialization; private final MetricsService metricsService; private final ActorSystemFig actorSystemFig; - private final QakkaFig qakkaFig; + private final QakkaFig qakkaFig; private final CassandraClient cassandraClient; - private final MessageCounterSerialization messageCounterSerialization; - - private final AtomicLong runCount = new AtomicLong(0); - private final AtomicLong totalTimedout = new AtomicLong(0); @Inject - public QueueTimeouter( Injector injector) { - - messageSerialization = injector.getInstance( QueueMessageSerialization.class ); - actorSystemFig = injector.getInstance( ActorSystemFig.class ); - qakkaFig = injector.getInstance( QakkaFig.class ); - metricsService = injector.getInstance( MetricsService.class ); - cassandraClient = injector.getInstance( CassandraClientImpl.class ); - - messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); + public QueueTimeouter( + QueueMessageSerialization messageSerialization, + MetricsService metricsService, + ActorSystemFig actorSystemFig, + QakkaFig qakkaFig, + CassandraClient cassandraClient + ) { + this.messageSerialization = messageSerialization; + this.metricsService = metricsService; + this.actorSystemFig = actorSystemFig; + this.qakkaFig = qakkaFig; + this.cassandraClient = cassandraClient; + +// messageSerialization = injector.getInstance( QueueMessageSerialization.class ); +// actorSystemFig = injector.getInstance( ActorSystemFig.class ); +// qakkaFig = injector.getInstance( QakkaFig.class ); +// metricsService = injector.getInstance( MetricsService.class ); +// cassandraClient = injector.getInstance( CassandraClientImpl.class ); } @@ -135,41 +140,8 @@ public class QueueTimeouter extends UntypedActor { } } - - long runs = runCount.incrementAndGet(); - long timeoutCount = totalTimedout.addAndGet( count ); - - if ( logger.isDebugEnabled() && runs % 100 == 0 ) { - - final DecimalFormat format = new DecimalFormat("##.###"); - final long nano = 1000000000; - Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME ); - - logger.debug("QueueTimeouter for queue '{}' stats:\n" + - " Num runs={}\n" + - " Timeout count={}\n" + - " Mean={}\n" + - " One min rate={}\n" + - " Five min rate={}\n" + - " Snapshot mean={}\n" + - " Snapshot min={}\n" + - " Snapshot max={}", - queueName, - t.getCount(), - timeoutCount, - format.format( t.getMeanRate() ), - format.format( t.getOneMinuteRate() ), - format.format( t.getFiveMinuteRate() ), - format.format( t.getSnapshot().getMean() / nano ), - format.format( (double) t.getSnapshot().getMin() / nano ), - format.format( (double) t.getSnapshot().getMax() / nano ) ); - } - if (count > 0) { logger.debug( "Timed out {} messages for queue {}", count, queueName ); - - messageCounterSerialization.decrementCounter( - queueName, DatabaseQueueMessage.Type.DEFAULT, count); } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java index 273f0b2..e014d59 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java @@ -46,25 +46,28 @@ public class QueueWriter extends UntypedActor { public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR }; - private final DistributedQueueService distributedQueueService; private final QueueMessageSerialization messageSerialization; private final TransferLogSerialization transferLogSerialization; private final AuditLogSerialization auditLogSerialization; private final MetricsService metricsService; - private final MessageCounterSerialization messageCounterSerialization; - @Inject - public QueueWriter( Injector injector ) { - - messageSerialization = injector.getInstance( QueueMessageSerialization.class ); - transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); - auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); - metricsService = injector.getInstance( MetricsService.class ); - - distributedQueueService = injector.getInstance( DistributedQueueService.class ); - messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class ); + public QueueWriter( + QueueMessageSerialization messageSerialization, + TransferLogSerialization transferLogSerialization, + AuditLogSerialization auditLogSerialization, + MetricsService metricsService + ) { + this.messageSerialization = messageSerialization; + this.transferLogSerialization = transferLogSerialization; + this.auditLogSerialization = auditLogSerialization; + this.metricsService = metricsService; + +// messageSerialization = injector.getInstance( QueueMessageSerialization.class ); +// transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); +// auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); +// metricsService = injector.getInstance( MetricsService.class ); } @Override @@ -97,9 +100,6 @@ public class QueueWriter extends UntypedActor { messageSerialization.writeMessage( dbqm ); - messageCounterSerialization.incrementCounter( - qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1); - //logger.debug("Wrote queue message id {} to queue name {}", // dbqm.getQueueMessageId(), dbqm.getQueueName()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java index f0540af..0e3e981 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java @@ -40,7 +40,7 @@ public class QueueWriterRouter extends UntypedActor { public QueueWriterRouter( Injector injector ) { this.router = getContext().actorOf( FromConfig.getInstance().props( - Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router"); + Props.create( GuiceActorProducer.class, QueueWriter.class )), "router"); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java index 65c3370..46dc0ed 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.MetricsService; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest; import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; @@ -59,14 +60,27 @@ public class ShardAllocator extends UntypedActor { @Inject - public ShardAllocator( Injector injector ) { - - this.qakkaFig = injector.getInstance( QakkaFig.class ); - this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class ); - this.shardSerialization = injector.getInstance( ShardSerializationImpl.class ); - this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); - this.metricsService = injector.getInstance( MetricsService.class ); - this.cassandraClient = injector.getInstance( CassandraClientImpl.class ); + public ShardAllocator( + QakkaFig qakkaFig, + ActorSystemFig actorSystemFig, + ShardSerialization shardSerialization, + ShardCounterSerialization shardCounterSerialization, + MetricsService metricsService, + CassandraClient cassandraClient + ) { + this.qakkaFig = qakkaFig; + this.actorSystemFig = actorSystemFig; + this.shardSerialization = shardSerialization; + this.shardCounterSerialization = shardCounterSerialization; + this.metricsService = metricsService; + this.cassandraClient = cassandraClient; + +// this.qakkaFig = injector.getInstance( QakkaFig.class ); +// this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class ); +// this.shardSerialization = injector.getInstance( ShardSerializationImpl.class ); +// this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); +// this.metricsService = injector.getInstance( MetricsService.class ); +// this.cassandraClient = injector.getInstance( CassandraClientImpl.class ); } @@ -106,7 +120,10 @@ public class ShardAllocator extends UntypedActor { } if (shard == null) { - logger.warn( "No shard found for {}, {}, {}", queueName, region, type ); + shard = new Shard( queueName, actorSystemFig.getRegionLocal(), + type, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( shard ); + return; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java index af71247..9063242 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java @@ -23,10 +23,13 @@ import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.inject.Guice; import com.google.inject.Inject; +import com.google.inject.Injector; import com.google.inject.Singleton; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.actorsystem.ClientActor; +import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer; import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.QueueManager; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; @@ -53,23 +56,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService { private final ActorSystemManager actorSystemManager; private final QueueManager queueManager; private final QakkaFig qakkaFig; - private final MessageCounterSerialization messageCounterSerialization; @Inject public DistributedQueueServiceImpl( + Injector injector, ActorSystemManager actorSystemManager, QueueManager queueManager, - QakkaFig qakkaFig, - MessageCounterSerialization messageCounterSerialization ) { + QakkaFig qakkaFig + ) { this.actorSystemManager = actorSystemManager; this.queueManager = queueManager; this.qakkaFig = qakkaFig; - this.messageCounterSerialization = messageCounterSerialization; - - + GuiceActorProducer.INJECTOR = injector; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java index 8c6adda..d74936b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java @@ -73,7 +73,7 @@ public class QueueActorRouterProducer implements RouterProducer { ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( - Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ), + Props.create( GuiceActorProducer.class, QueueActorRouter.class ), PoisonPill.getInstance(), settings ), "queueActorRouter" ); ClusterSingletonProxySettings proxySettings = http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java index 885a559..4fa9048 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java @@ -73,7 +73,7 @@ public class QueueSenderRouterProducer implements RouterProducer { ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( - Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ), + Props.create( GuiceActorProducer.class, QueueSenderRouter.class ), PoisonPill.getInstance(), settings ), "queueSenderRouter" ); ClusterSingletonProxySettings proxySettings = http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java index c8b5567..006f1a7 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java @@ -73,7 +73,7 @@ public class QueueWriterRouterProducer implements RouterProducer { ClusterSingletonManagerSettings.create( system ).withRole( "io" ); system.actorOf( ClusterSingletonManager.props( - Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ), + Props.create( GuiceActorProducer.class, QueueWriterRouter.class ), PoisonPill.getInstance(), settings ), "queueWriterRouter" ); ClusterSingletonProxySettings proxySettings = http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index 02862c4..d9a2543 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; @@ -59,6 +60,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization private final ActorSystemFig actorSystemFig; private final ShardStrategy shardStrategy; private final ShardCounterSerialization shardCounterSerialization; + private final MessageCounterSerialization messageCounterSerialization; public final static String COLUMN_QUEUE_NAME = "queue_name"; public final static String COLUMN_REGION = "region"; @@ -114,14 +116,16 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization ActorSystemFig actorSystemFig, ShardStrategy shardStrategy, ShardCounterSerialization shardCounterSerialization, + MessageCounterSerialization messageCounterSerialization, CassandraClient cassandraClient, QakkaFig qakkaFig ) { - this.cassandraConfig = cassandraConfig; - this.actorSystemFig = actorSystemFig; - this.shardStrategy = shardStrategy; - this.shardCounterSerialization = shardCounterSerialization; - this.cassandraClient = cassandraClient; + this.cassandraConfig = cassandraConfig; + this.actorSystemFig = actorSystemFig; + this.shardStrategy = shardStrategy; + this.shardCounterSerialization = shardCounterSerialization; + this.messageCounterSerialization = messageCounterSerialization; + this.cassandraClient = cassandraClient; this.maxTtl = qakkaFig.getMaxTtlSeconds(); } @@ -162,6 +166,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 ); + messageCounterSerialization.incrementCounter( message.getQueueName(), message.getType(), 1L ); + return queueMessageId; } @@ -250,6 +256,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .and(queueMessageIdClause); cassandraClient.getQueueMessageSession().execute( delete ); + + messageCounterSerialization.decrementCounter( queueName, type, 1L ); }
