Ensure that injector is not used in Akka constructors to avoid "cannot 
serialize injector" errors


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe08fa72
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe08fa72
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe08fa72

Branch: refs/heads/master
Commit: fe08fa72f8e807565141bac871796479f28aa3f1
Parents: a90a0bb
Author: Dave Johnson <[email protected]>
Authored: Wed Oct 5 17:38:32 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Wed Oct 5 17:38:32 2016 -0400

----------------------------------------------------------------------
 .../qakka/distributed/actors/QueueActor.java    | 75 +++++++-------------
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../distributed/actors/QueueRefresher.java      |  1 -
 .../qakka/distributed/actors/QueueSender.java   | 34 +++++----
 .../distributed/actors/QueueSenderRouter.java   |  2 +-
 .../distributed/actors/QueueTimeouter.java      | 66 +++++------------
 .../qakka/distributed/actors/QueueWriter.java   | 30 ++++----
 .../distributed/actors/QueueWriterRouter.java   |  2 +-
 .../distributed/actors/ShardAllocator.java      | 35 ++++++---
 .../impl/DistributedQueueServiceImpl.java       | 13 ++--
 .../impl/QueueActorRouterProducer.java          |  2 +-
 .../impl/QueueSenderRouterProducer.java         |  2 +-
 .../impl/QueueWriterRouterProducer.java         |  2 +-
 .../impl/QueueMessageSerializationImpl.java     | 18 +++--
 14 files changed, 132 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 64f12d4..9ce38ef 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -52,7 +52,6 @@ public class QueueActor extends UntypedActor {
     private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
-
     private final MessageCounterSerialization messageCounterSerialization;
 
     private final Map<String, Cancellable> refreshSchedulersByQueueName = new 
HashMap<>();
@@ -63,24 +62,32 @@ public class QueueActor extends UntypedActor {
     private final Map<String, ActorRef> queueTimeoutersByQueueName = new 
HashMap<>();
     private final Map<String, ActorRef> shardAllocatorsByQueueName = new 
HashMap<>();
 
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong messageCount = new AtomicLong(0);
     private final Set<String> queuesSeen = new HashSet<>();
 
-    private final Injector injector;
+    //private final Injector injector;
 
 
     @Inject
-    public QueueActor( Injector injector ) {
-
-        this.injector = injector;
-
-        qakkaFig         = injector.getInstance( QakkaFig.class );
-        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
-        queueActorHelper = injector.getInstance( QueueActorHelper.class );
-        metricsService   = injector.getInstance( MetricsService.class );
-
-        messageCounterSerialization = injector.getInstance( 
MessageCounterSerialization.class );
+    public QueueActor(
+        //Injector         injector,
+        QakkaFig         qakkaFig,
+        InMemoryQueue    inMemoryQueue,
+        QueueActorHelper queueActorHelper,
+        MetricsService   metricsService,
+        MessageCounterSerialization messageCounterSerialization
+    ) {
+        //this.injector = injector;
+        this.qakkaFig = qakkaFig;
+        this.inMemoryQueue = inMemoryQueue;
+        this.queueActorHelper = queueActorHelper;
+        this.metricsService = metricsService;
+        this.messageCounterSerialization = messageCounterSerialization;
+
+//        qakkaFig         = injector.getInstance( QakkaFig.class );
+//        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
+//        queueActorHelper = injector.getInstance( QueueActorHelper.class );
+//        metricsService   = injector.getInstance( MetricsService.class );
+//        messageCounterSerialization = injector.getInstance( 
MessageCounterSerialization.class );
     }
 
     @Override
@@ -138,7 +145,7 @@ public class QueueActor extends UntypedActor {
 
                 if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( 
request.getQueueName()) == null ) {
                     ActorRef readerRef = getContext().actorOf(
-                        Props.create( GuiceActorProducer.class, injector, 
QueueRefresher.class ),
+                        Props.create( GuiceActorProducer.class, 
QueueRefresher.class ),
                         request.getQueueName() + "_reader");
                     queueReadersByQueueName.put( request.getQueueName(), 
readerRef );
                 }
@@ -154,7 +161,7 @@ public class QueueActor extends UntypedActor {
 
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == 
null ) {
                 ActorRef readerRef = getContext().actorOf(
-                    Props.create( GuiceActorProducer.class, injector, 
QueueTimeouter.class),
+                    Props.create( GuiceActorProducer.class, 
QueueTimeouter.class),
                     request.getQueueName() + "_timeouter");
                 queueTimeoutersByQueueName.put( request.getQueueName(), 
readerRef );
             }
@@ -169,7 +176,7 @@ public class QueueActor extends UntypedActor {
 
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == 
null ) {
                 ActorRef readerRef = getContext().actorOf(
-                    Props.create( GuiceActorProducer.class, injector, 
ShardAllocator.class),
+                    Props.create( GuiceActorProducer.class, 
ShardAllocator.class),
                     request.getQueueName() + "_shard_allocator");
                 shardAllocatorsByQueueName.put( request.getQueueName(), 
readerRef );
             }
@@ -191,43 +198,9 @@ public class QueueActor extends UntypedActor {
 
                 Collection<DatabaseQueueMessage> messages = 
queueActorHelper.getMessages( queueName, numRequested);
 
-                messageCounterSerialization.decrementCounter(
-                    queueName,
-                    DatabaseQueueMessage.Type.DEFAULT,
-                    messages.size());
-
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, messages ), 
getSender() );
 
-//                long runs = runCount.incrementAndGet();
-//                long messagesReturned = messageCount.addAndGet( 
queueMessages.size() );
-//
-//                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-//
-//                    final DecimalFormat format = new DecimalFormat("##.###");
-//                    final long nano = 1000000000;
-//                    Timer t = 
metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
-//
-//                    logger.debug("QueueActor get stats (queues {}):\n" +
-//                            "   Num runs={}\n" +
-//                            "   Messages={}\n" +
-//                            "   Mean={}\n" +
-//                            "   One min rate={}\n" +
-//                            "   Five min rate={}\n" +
-//                            "   Snapshot mean={}\n" +
-//                            "   Snapshot min={}\n" +
-//                            "   Snapshot max={}",
-//                        queuesSeen.toArray(),
-//                        t.getCount(),
-//                        messagesReturned,
-//                        format.format( t.getMeanRate() ),
-//                        format.format( t.getOneMinuteRate() ),
-//                        format.format( t.getFiveMinuteRate() ),
-//                        format.format( t.getSnapshot().getMean() / nano ),
-//                        format.format( (double) t.getSnapshot().getMin() / 
nano ),
-//                        format.format( (double) t.getSnapshot().getMax() / 
nano ) );
-//                }
-
             } finally {
                 timer.close();
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 9257a0d..c40a3d9 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -42,7 +42,7 @@ public class QueueActorRouter extends UntypedActor {
     public QueueActorRouter( Injector injector ) {
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create(GuiceActorProducer.class, injector, 
QueueActor.class)), "router");
+            Props.create(GuiceActorProducer.class, QueueActor.class)), 
"router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index 2f70088..ae9969c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -43,7 +43,6 @@ public class QueueRefresher extends UntypedActor {
         if ( message instanceof QueueRefreshRequest ) {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
-            logger.debug( "running for queue {}", request.getQueueName() );
             String queueName = request.getQueueName();
             helper.queueRefresh( queueName );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 739e1c4..3dc695e 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -61,19 +61,32 @@ public class QueueSender extends UntypedActor {
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final ActorSystemFig            actorSystemFig;
-    private final QakkaFig qakkaFig;
+    private final QakkaFig                  qakkaFig;
     private final MetricsService            metricsService;
 
 
     @Inject
-    public QueueSender( Injector injector ) {
-
-        actorSystemManager       = injector.getInstance( 
ActorSystemManager.class );
-        transferLogSerialization = injector.getInstance( 
TransferLogSerialization.class );
-        auditLogSerialization    = injector.getInstance( 
AuditLogSerialization.class );
-        actorSystemFig           = injector.getInstance( ActorSystemFig.class 
);
-        qakkaFig                 = injector.getInstance( QakkaFig.class );
-        metricsService           = injector.getInstance( MetricsService.class 
);
+    public QueueSender(
+        ActorSystemManager        actorSystemManager,
+        TransferLogSerialization  transferLogSerialization,
+        AuditLogSerialization     auditLogSerialization,
+        ActorSystemFig            actorSystemFig,
+        QakkaFig                  qakkaFig,
+        MetricsService            metricsService
+    ) {
+        this.actorSystemManager = actorSystemManager;
+        this.transferLogSerialization = transferLogSerialization;
+        this.auditLogSerialization = auditLogSerialization;
+        this.actorSystemFig = actorSystemFig;
+        this.qakkaFig = qakkaFig;
+        this.metricsService = metricsService;
+
+//        actorSystemManager       = injector.getInstance( 
ActorSystemManager.class );
+//        transferLogSerialization = injector.getInstance( 
TransferLogSerialization.class );
+//        auditLogSerialization    = injector.getInstance( 
AuditLogSerialization.class );
+//        actorSystemFig           = injector.getInstance( 
ActorSystemFig.class );
+//        qakkaFig                 = injector.getInstance( QakkaFig.class );
+//        metricsService           = injector.getInstance( 
MetricsService.class );
     }
 
     @Override
@@ -195,9 +208,6 @@ public class QueueSender extends UntypedActor {
         } else if ( writeStatus != null
                 && writeStatus.equals( 
QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) {
 
-            //logger.debug( "Delivery Success, now removing transfer log: {}, 
{}, {}, {}",
-            //        new Object[]{queueName, actorSystemFig.getRegionLocal(), 
region, messageId} );
-
             // queue actor failed to clean up transfer log
             try {
                 transferLogSerialization.removeTransferLog(

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 92d0785..a205d71 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -41,7 +41,7 @@ public class QueueSenderRouter extends UntypedActor {
     public QueueSenderRouter( Injector injector ) {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, injector, 
QueueSender.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueSender.class )), 
"router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 9b11277..33f1dd9 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -53,24 +53,29 @@ public class QueueTimeouter extends UntypedActor {
     private final QueueMessageSerialization messageSerialization;
     private final MetricsService            metricsService;
     private final ActorSystemFig            actorSystemFig;
-    private final QakkaFig qakkaFig;
+    private final QakkaFig                  qakkaFig;
     private final CassandraClient           cassandraClient;
-    private final MessageCounterSerialization messageCounterSerialization;
-
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong totalTimedout = new AtomicLong(0);
 
 
     @Inject
-    public QueueTimeouter( Injector injector) {
-
-        messageSerialization = injector.getInstance( 
QueueMessageSerialization.class );
-        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
-        qakkaFig             = injector.getInstance( QakkaFig.class );
-        metricsService       = injector.getInstance( MetricsService.class );
-        cassandraClient      = injector.getInstance( CassandraClientImpl.class 
);
-
-        messageCounterSerialization = injector.getInstance( 
MessageCounterSerialization.class );
+    public QueueTimeouter(
+        QueueMessageSerialization messageSerialization,
+        MetricsService            metricsService,
+        ActorSystemFig            actorSystemFig,
+        QakkaFig                  qakkaFig,
+        CassandraClient           cassandraClient
+    ) {
+        this.messageSerialization = messageSerialization;
+        this.metricsService = metricsService;
+        this.actorSystemFig = actorSystemFig;
+        this.qakkaFig = qakkaFig;
+        this.cassandraClient = cassandraClient;
+
+//        messageSerialization = injector.getInstance( 
QueueMessageSerialization.class );
+//        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
+//        qakkaFig             = injector.getInstance( QakkaFig.class );
+//        metricsService       = injector.getInstance( MetricsService.class );
+//        cassandraClient      = injector.getInstance( 
CassandraClientImpl.class );
     }
 
 
@@ -135,41 +140,8 @@ public class QueueTimeouter extends UntypedActor {
                     }
                 }
 
-
-                long runs = runCount.incrementAndGet();
-                long timeoutCount = totalTimedout.addAndGet( count );
-
-                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                    final DecimalFormat format = new DecimalFormat("##.###");
-                    final long nano = 1000000000;
-                    Timer t = 
metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );
-
-                    logger.debug("QueueTimeouter for queue '{}' stats:\n" +
-                            "   Num runs={}\n" +
-                            "   Timeout count={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                        queueName,
-                        t.getCount(),
-                        timeoutCount,
-                        format.format( t.getMeanRate() ),
-                        format.format( t.getOneMinuteRate() ),
-                        format.format( t.getFiveMinuteRate() ),
-                        format.format( t.getSnapshot().getMean() / nano ),
-                        format.format( (double) t.getSnapshot().getMin() / 
nano ),
-                        format.format( (double) t.getSnapshot().getMax() / 
nano ) );
-                }
-
                 if (count > 0) {
                     logger.debug( "Timed out {} messages for queue {}", count, 
queueName );
-
-                    messageCounterSerialization.decrementCounter(
-                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 273f0b2..e014d59 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -46,25 +46,28 @@ public class QueueWriter extends UntypedActor {
 
     public enum WriteStatus { SUCCESS_XFERLOG_DELETED, 
SUCCESS_XFERLOG_NOTDELETED, ERROR };
 
-    private final DistributedQueueService   distributedQueueService;
     private final QueueMessageSerialization messageSerialization;
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
 
-    private final MessageCounterSerialization messageCounterSerialization;
-
 
     @Inject
-    public QueueWriter( Injector injector ) {
-
-        messageSerialization     = injector.getInstance( 
QueueMessageSerialization.class );
-        transferLogSerialization = injector.getInstance( 
TransferLogSerialization.class );
-        auditLogSerialization    = injector.getInstance( 
AuditLogSerialization.class );
-        metricsService           = injector.getInstance( MetricsService.class 
);
-
-        distributedQueueService     = injector.getInstance( 
DistributedQueueService.class );
-        messageCounterSerialization = injector.getInstance( 
MessageCounterSerialization.class );
+    public QueueWriter(
+        QueueMessageSerialization messageSerialization,
+        TransferLogSerialization  transferLogSerialization,
+        AuditLogSerialization     auditLogSerialization,
+        MetricsService            metricsService
+    ) {
+        this.messageSerialization = messageSerialization;
+        this.transferLogSerialization = transferLogSerialization;
+        this.auditLogSerialization = auditLogSerialization;
+        this.metricsService = metricsService;
+
+//        messageSerialization     = injector.getInstance( 
QueueMessageSerialization.class );
+//        transferLogSerialization = injector.getInstance( 
TransferLogSerialization.class );
+//        auditLogSerialization    = injector.getInstance( 
AuditLogSerialization.class );
+//        metricsService           = injector.getInstance( 
MetricsService.class );
     }
 
     @Override
@@ -97,9 +100,6 @@ public class QueueWriter extends UntypedActor {
 
                     messageSerialization.writeMessage( dbqm );
 
-                    messageCounterSerialization.incrementCounter(
-                        qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 
1);
-
                     //logger.debug("Wrote queue message id {} to queue name 
{}",
                     //        dbqm.getQueueMessageId(), dbqm.getQueueName());
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index f0540af..0e3e981 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -40,7 +40,7 @@ public class QueueWriterRouter extends UntypedActor {
     public QueueWriterRouter( Injector injector ) {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, injector, 
QueueWriter.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueWriter.class )), 
"router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 65c3370..46dc0ed 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import 
org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
@@ -59,14 +60,27 @@ public class ShardAllocator extends UntypedActor {
 
 
     @Inject
-    public ShardAllocator( Injector injector ) {
-
-        this.qakkaFig                  = injector.getInstance( QakkaFig.class 
);
-        this.shardCounterSerialization = injector.getInstance( 
ShardCounterSerializationImpl.class );
-        this.shardSerialization        = injector.getInstance( 
ShardSerializationImpl.class );
-        this.actorSystemFig            = injector.getInstance( 
ActorSystemFig.class );
-        this.metricsService            = injector.getInstance( 
MetricsService.class );
-        this.cassandraClient           = injector.getInstance( 
CassandraClientImpl.class );
+    public ShardAllocator(
+        QakkaFig qakkaFig,
+        ActorSystemFig            actorSystemFig,
+        ShardSerialization        shardSerialization,
+        ShardCounterSerialization shardCounterSerialization,
+        MetricsService            metricsService,
+        CassandraClient           cassandraClient
+    ) {
+        this.qakkaFig = qakkaFig;
+        this.actorSystemFig = actorSystemFig;
+        this.shardSerialization = shardSerialization;
+        this.shardCounterSerialization = shardCounterSerialization;
+        this.metricsService = metricsService;
+        this.cassandraClient = cassandraClient;
+
+//        this.qakkaFig                  = injector.getInstance( 
QakkaFig.class );
+//        this.shardCounterSerialization = injector.getInstance( 
ShardCounterSerializationImpl.class );
+//        this.shardSerialization        = injector.getInstance( 
ShardSerializationImpl.class );
+//        this.actorSystemFig            = injector.getInstance( 
ActorSystemFig.class );
+//        this.metricsService            = injector.getInstance( 
MetricsService.class );
+//        this.cassandraClient           = injector.getInstance( 
CassandraClientImpl.class );
     }
 
 
@@ -106,7 +120,10 @@ public class ShardAllocator extends UntypedActor {
             }
 
             if (shard == null) {
-                logger.warn( "No shard found for {}, {}, {}", queueName, 
region, type );
+                shard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+                    type, 1L, QakkaUtils.getTimeUuid());
+                shardSerialization.createShard( shard );
+
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index af71247..9063242 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -23,10 +23,13 @@ import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.google.inject.Guice;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -53,23 +56,21 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
-    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public DistributedQueueServiceImpl(
+            Injector injector,
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig,
-            MessageCounterSerialization messageCounterSerialization ) {
+            QakkaFig qakkaFig
+            ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
-        this.messageCounterSerialization = messageCounterSerialization;
-
-
 
+        GuiceActorProducer.INJECTOR = injector;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index 8c6adda..d74936b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueActorRouterProducer implements 
RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( 
"io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, 
QueueActorRouter.class ),
+                Props.create( GuiceActorProducer.class, QueueActorRouter.class 
),
                 PoisonPill.getInstance(), settings ), "queueActorRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
index 885a559..4fa9048 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueSenderRouterProducer implements 
RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( 
"io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, 
QueueSenderRouter.class ),
+                Props.create( GuiceActorProducer.class, 
QueueSenderRouter.class ),
                 PoisonPill.getInstance(), settings ), "queueSenderRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index c8b5567..006f1a7 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueWriterRouterProducer implements 
RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( 
"io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, 
QueueWriterRouter.class ),
+                Props.create( GuiceActorProducer.class, 
QueueWriterRouter.class ),
                 PoisonPill.getInstance(), settings ), "queueWriterRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 02862c4..d9a2543 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
@@ -59,6 +60,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
     private final ShardCounterSerialization shardCounterSerialization;
+    private final MessageCounterSerialization messageCounterSerialization;
 
     public final static String COLUMN_QUEUE_NAME       = "queue_name";
     public final static String COLUMN_REGION           = "region";
@@ -114,14 +116,16 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
+            MessageCounterSerialization messageCounterSerialization,
             CassandraClient           cassandraClient,
             QakkaFig                  qakkaFig
         ) {
-        this.cassandraConfig              = cassandraConfig;
-        this.actorSystemFig            = actorSystemFig;
-        this.shardStrategy             = shardStrategy;
-        this.shardCounterSerialization = shardCounterSerialization;
-        this.cassandraClient = cassandraClient;
+        this.cassandraConfig             = cassandraConfig;
+        this.actorSystemFig              = actorSystemFig;
+        this.shardStrategy               = shardStrategy;
+        this.shardCounterSerialization   = shardCounterSerialization;
+        this.messageCounterSerialization = messageCounterSerialization;
+        this.cassandraClient             = cassandraClient;
 
         this.maxTtl = qakkaFig.getMaxTtlSeconds();
     }
@@ -162,6 +166,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), 
shardType, message.getShardId(), 1 );
 
+        messageCounterSerialization.incrementCounter( message.getQueueName(), 
message.getType(), 1L );
+
         return queueMessageId;
     }
 
@@ -250,6 +256,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .and(queueMessageIdClause);
 
         cassandraClient.getQueueMessageSession().execute( delete );
+
+        messageCounterSerialization.decrementCounter( queueName, type, 1L );
     }
 
 

Reply via email to