Initial integration of Qakka into Usergrid Queue module, and implementation of 
Qakka-based LegacyQueueManager implementation.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3075dce1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3075dce1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3075dce1

Branch: refs/heads/usergrid-1318-queue
Commit: 3075dce1631cc7f225ddb34aa08fb16a4a8f486a
Parents: 9016fd2
Author: Dave Johnson <snoopd...@apache.org>
Authored: Tue Sep 13 13:44:42 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Tue Sep 13 13:44:42 2016 -0400

----------------------------------------------------------------------
 stack/core/pom.xml                              |   2 +-
 .../usergrid/persistence/entities/Receipt.java  |  33 +-
 .../usergrid/corepersistence/index/RxTest.java  |  23 +-
 stack/corepersistence/queryindex/pom.xml        |   1 +
 .../persistence/index/guice/IndexModule.java    |   1 -
 stack/corepersistence/queue/pom.xml             | 156 +++++++
 .../apache/usergrid/persistence/qakka/App.java  |  94 ++++
 .../persistence/qakka/MetricsService.java       |  39 ++
 .../usergrid/persistence/qakka/QakkaFig.java    | 131 ++++++
 .../usergrid/persistence/qakka/QakkaModule.java | 120 +++++
 .../persistence/qakka/api/ApiResponse.java      |  68 +++
 .../persistence/qakka/api/QueueResource.java    | 443 +++++++++++++++++++
 .../persistence/qakka/api/StatusResource.java   |  82 ++++
 .../persistence/qakka/api/URIStrategy.java      |  31 ++
 .../qakka/api/impl/BadRequestMapper.java        |  43 ++
 .../qakka/api/impl/GuiceFeature.java            |  45 ++
 .../qakka/api/impl/JacksonProvider.java         |  44 ++
 .../qakka/api/impl/JerseyResourceConfig.java    |  31 ++
 .../qakka/api/impl/NotFoundMapper.java          |  43 ++
 .../qakka/api/impl/StartupListener.java         |  70 +++
 .../qakka/api/impl/URIStrategyLocalhost.java    |  49 ++
 .../persistence/qakka/core/CassandraClient.java |  29 ++
 .../qakka/core/CassandraClientImpl.java         |  49 ++
 .../persistence/qakka/core/QakkaUtils.java      |  44 ++
 .../usergrid/persistence/qakka/core/Queue.java  | 147 ++++++
 .../persistence/qakka/core/QueueManager.java    |  35 ++
 .../persistence/qakka/core/QueueMessage.java    | 186 ++++++++
 .../qakka/core/QueueMessageManager.java         |  83 ++++
 .../persistence/qakka/core/QueueType.java       |  26 ++
 .../persistence/qakka/core/Regions.java         |  84 ++++
 .../qakka/core/impl/InMemoryQueue.java          |  84 ++++
 .../qakka/core/impl/QueueManagerImpl.java       | 125 ++++++
 .../core/impl/QueueMessageManagerImpl.java      | 299 +++++++++++++
 .../distributed/DistributedQueueService.java    |  60 +++
 .../qakka/distributed/actors/QueueActor.java    | 207 +++++++++
 .../distributed/actors/QueueActorHelper.java    | 167 +++++++
 .../distributed/actors/QueueActorRouter.java    |  95 ++++
 .../distributed/actors/QueueRefresher.java      | 124 ++++++
 .../qakka/distributed/actors/QueueSender.java   | 220 +++++++++
 .../distributed/actors/QueueSenderRouter.java   |  53 +++
 .../distributed/actors/QueueTimeouter.java      | 147 ++++++
 .../qakka/distributed/actors/QueueWriter.java   | 152 +++++++
 .../distributed/actors/QueueWriterRouter.java   |  53 +++
 .../distributed/actors/ShardAllocator.java      | 153 +++++++
 .../impl/DistributedQueueServiceImpl.java       | 296 +++++++++++++
 .../impl/QueueActorRouterProducer.java          | 141 ++++++
 .../impl/QueueSenderRouterProducer.java         | 134 ++++++
 .../impl/QueueWriterRouterProducer.java         | 134 ++++++
 .../distributed/messages/QakkaMessage.java      |  28 ++
 .../distributed/messages/QueueAckRequest.java   |  52 +++
 .../distributed/messages/QueueAckResponse.java  |  59 +++
 .../distributed/messages/QueueGetRequest.java   |  49 ++
 .../distributed/messages/QueueGetResponse.java  |  63 +++
 .../distributed/messages/QueueInitRequest.java  |  43 ++
 .../messages/QueueRefreshRequest.java           |  43 ++
 .../distributed/messages/QueueSendRequest.java  |  84 ++++
 .../distributed/messages/QueueSendResponse.java |  43 ++
 .../messages/QueueTimeoutRequest.java           |  43 ++
 .../distributed/messages/QueueWriteRequest.java |  84 ++++
 .../messages/QueueWriteResponse.java            |  43 ++
 .../distributed/messages/ShardCheckRequest.java |  43 ++
 .../qakka/exceptions/BadRequestException.java   |  32 ++
 .../qakka/exceptions/NotFoundException.java     |  32 ++
 .../qakka/exceptions/QakkaException.java        |  32 ++
 .../qakka/exceptions/QakkaRuntimeException.java |  32 ++
 .../MultiShardMessageIterator.java              | 181 ++++++++
 .../persistence/qakka/serialization/Result.java |  34 ++
 .../qakka/serialization/auditlog/AuditLog.java  | 101 +++++
 .../auditlog/AuditLogSerialization.java         |  45 ++
 .../impl/AuditLogSerializationImpl.java         | 148 +++++++
 .../queuemessages/DatabaseQueueMessage.java     | 155 +++++++
 .../queuemessages/DatabaseQueueMessageBody.java |  52 +++
 .../MessageCounterSerialization.java            |  31 ++
 .../QueueMessageSerialization.java              |  54 +++
 .../impl/MessageCounterSerializationImpl.java   | 204 +++++++++
 .../impl/QueueMessageSerializationImpl.java     | 320 ++++++++++++++
 .../serialization/queues/DatabaseQueue.java     | 114 +++++
 .../queues/QueueSerialization.java              |  36 ++
 .../queues/impl/QueueSerializationImpl.java     | 157 +++++++
 .../qakka/serialization/sharding/Shard.java     | 111 +++++
 .../sharding/ShardCounterSerialization.java     |  29 ++
 .../serialization/sharding/ShardIterator.java   | 142 ++++++
 .../sharding/ShardSerialization.java            |  35 ++
 .../serialization/sharding/ShardStrategy.java   |  35 ++
 .../sharding/impl/PlaceholderShardStrategy.java |  44 ++
 .../impl/ShardCounterSerializationImpl.java     | 198 +++++++++
 .../sharding/impl/ShardSerializationImpl.java   | 200 +++++++++
 .../sharding/impl/ShardStrategyImpl.java        |  71 +++
 .../serialization/transferlog/TransferLog.java  |  84 ++++
 .../transferlog/TransferLogSerialization.java   |  62 +++
 .../impl/TransferLogSerializationImpl.java      | 165 +++++++
 .../persistence/queue/guice/QueueModule.java    |  20 +-
 .../queue/impl/QakkaQueueManager.java           | 178 ++++++++
 .../queue/impl/QueueManagerFactoryImpl.java     |   4 +-
 .../queue/src/main/webapp/WEB-INF/web.xml       |  51 +++
 .../persistence/qakka/AbstractTest.java         |  79 ++++
 .../persistence/qakka/KeyspaceDropper.java      |  70 +++
 .../persistence/qakka/api/AbstractRestTest.java |  64 +++
 .../persistence/qakka/api/PerformanceTest.java  | 148 +++++++
 .../qakka/api/QueueResourceTest.java            | 418 +++++++++++++++++
 .../qakka/common/CassandraClientTest.java       |  46 ++
 .../qakka/core/QueueMessageManagerTest.java     | 272 ++++++++++++
 .../distributed/QueueActorServiceTest.java      | 175 ++++++++
 .../actors/QueueActorHelperTest.java            | 284 ++++++++++++
 .../distributed/actors/QueueReaderTest.java     | 111 +++++
 .../distributed/actors/QueueTimeouterTest.java  | 152 +++++++
 .../distributed/actors/ShardAllocatorTest.java  | 212 +++++++++
 ...tiShardDatabaseQueueMessageIteratorTest.java | 127 ++++++
 .../auditlogs/AuditLogSerializationTest.java    | 102 +++++
 .../DatabaseQueueMessageSerializationTest.java  | 267 +++++++++++
 .../queues/DatabaseQueueSerializationTest.java  |  85 ++++
 .../sharding/ShardCounterSerializationTest.java |  65 +++
 .../sharding/ShardIteratorTest.java             | 139 ++++++
 .../sharding/ShardSerializationTest.java        | 106 +++++
 .../sharding/ShardStrategyTest.java             |  71 +++
 .../qakka/serialization/sharding/ShardTest.java |  53 +++
 .../TransferLogSerializationTest.java           | 133 ++++++
 .../queue/LegacyQueueManagerTest.java           | 117 +++--
 .../queue/guice/TestQueueModule.java            |   2 -
 .../queue/src/test/resources/cassandra.yaml     |  53 +++
 .../queue/src/test/resources/log4j.properties   |  29 ++
 .../queue/src/test/resources/qakka-duck.jpg     | Bin 0 -> 11188 bytes
 .../queue/src/test/resources/qakka.properties   |  50 +++
 stack/pom.xml                                   |   6 +-
 .../notifications/apns/APNsAdapter.java         |  28 +-
 .../services/notifications/gcm/GCMAdapter.java  |  11 +-
 126 files changed, 12116 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 9f2dc88..7ee3b06 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -40,7 +40,6 @@
         </plugins>
     </reporting>
 
-
     <build>
 
         <resources>
@@ -429,6 +428,7 @@
             <groupId>org.apache.usergrid</groupId>
             <artifactId>queue</artifactId>
             <version>${project.version}</version>
+            <classifier>classes</classifier>
             <type>jar</type>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
 
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
index 1e145ac..e7c90b5 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
@@ -16,14 +16,15 @@
  */
 package org.apache.usergrid.persistence.entities;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.usergrid.persistence.TypedEntity;
+import org.apache.usergrid.persistence.annotations.EntityProperty;
 
 import javax.xml.bind.annotation.XmlRootElement;
 import java.util.HashMap;
 import java.util.UUID;
-import org.apache.usergrid.persistence.annotations.EntityProperty;
-import org.mortbay.util.ajax.JSON;
 
 @XmlRootElement
 public class Receipt extends TypedEntity {
@@ -31,6 +32,11 @@ public class Receipt extends TypedEntity {
     public static final String ENTITY_TYPE = "receipt";
     public static final String NOTIFICATION_CONNECTION = "notification";
 
+    private static ObjectMapper objectMapper = new ObjectMapper();
+    static private final TypeReference<HashMap> hashMapTypeRef = new 
TypeReference<HashMap>() {};
+
+
+
     /** device id **/
     @EntityProperty
     protected UUID deviceId;
@@ -65,27 +71,28 @@ public class Receipt extends TypedEntity {
     public Receipt() {
     }
 
-    public Receipt(UUID notificationUUID, String notifierId, Object 
payload,UUID deviceId) {
+    public Receipt(UUID notificationUUID, String notifierId, Object payload, 
UUID deviceId) {
         this.notificationUUID = notificationUUID;
         this.notifierId = notifierId;
         HashMap receiptPayload;
-        if(! (payload instanceof HashMap) ){
-            if(payload instanceof String){
+
+        if (!(payload instanceof HashMap)) {
+            if (payload instanceof String) {
                 try {
-                    receiptPayload = (HashMap) JSON.parse((String) payload);
-                }catch (Exception e){
+                    receiptPayload = (HashMap) objectMapper.readValue( 
(String)payload, hashMapTypeRef );
+                } catch (Exception e) {
                     receiptPayload = new HashMap<>();
-                    receiptPayload.put("payload", payload);
+                    receiptPayload.put( "payload", payload );
                 }
-            }else {
+            } else {
                 receiptPayload = new HashMap<>();
-                receiptPayload.put("payload", payload);
+                receiptPayload.put( "payload", payload );
             }
-        }else{
-            receiptPayload = (HashMap)payload;
+        } else {
+            receiptPayload = (HashMap) payload;
         }
         this.payload = receiptPayload;
-        this.setDeviceId(deviceId);
+        this.setDeviceId( deviceId );
     }
 
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index f7d52d6..d4fd33d 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -20,18 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.avro.generic.GenericData;
 import org.apache.usergrid.ExperimentalTest;
-import org.junit.Ignore;
 import org.junit.Test;
-
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +30,13 @@ import rx.Subscription;
 import rx.observables.ConnectableObservable;
 import rx.schedulers.Schedulers;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -61,8 +58,10 @@ public class RxTest {
         final CountDownLatch latch = new CountDownLatch( count+1 );
 
         final Subscription connectedObservable =
-            Observable.range( 0, count ).doOnNext( integer -> 
latch.countDown() ).doOnCompleted( () -> latch.countDown() ).subscribeOn( 
Schedulers.io() )
-                      .subscribe();
+            Observable.range( 0, count )
+                .doOnNext( integer -> latch.countDown() )
+                .doOnCompleted( () -> latch.countDown() ).subscribeOn( 
Schedulers.io() )
+                .subscribe();
 
 
         final boolean completed = latch.await( 3, TimeUnit.SECONDS );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml 
b/stack/corepersistence/queryindex/pom.xml
index 9b4eca4..a3f293d 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -97,6 +97,7 @@
                   <groupId>${project.parent.groupId}</groupId>
                   <artifactId>queue</artifactId>
                   <version>${project.version}</version>
+                    <classifier>classes</classifier>
                   <type>jar</type>
               </dependency>
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index b828934..47399c7 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -23,7 +23,6 @@ import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.*;
 import com.google.inject.AbstractModule;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml 
b/stack/corepersistence/queue/pom.xml
index c81e71c..c74d49c 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -21,6 +21,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
     <parent>
         <artifactId>persistence</artifactId>
         <groupId>org.apache.usergrid</groupId>
@@ -30,6 +31,48 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>queue</artifactId>
     <name>Usergrid Queue</name>
+    <packaging>war</packaging>
+
+    <properties>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <servlet.version>3.0.1</servlet.version>
+        <jersey.version>2.23.1</jersey.version>
+        <guice-bridge.version>2.4.0</guice-bridge.version>
+    </properties>
+
+
+    <build>
+
+        <finalName>queue-${project.version}</finalName>
+
+        <pluginManagement>
+            <plugins>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-war-plugin</artifactId>
+                <version>2.6</version>
+                <configuration>
+                    <archiveClasses>true</archiveClasses>
+                    <attachClasses>true</attachClasses>
+                </configuration>
+            </plugin>
+
+            </plugins>
+        </pluginManagement>
+
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+                <filtering>true</filtering>
+                <includes>
+                    <include>**/**</include>
+                </includes>
+            </testResource>
+        </testResources>
+
+    </build>
 
     <dependencies>
 
@@ -90,6 +133,119 @@
             <version>4.2</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>actorsystem</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>cassandra-all</artifactId>
+                    <groupId>org.apache.cassandra</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!--
+                <dependency>
+                    <groupId>com.datastax.cassandra</groupId>
+                    <artifactId>cassandra-driver-core</artifactId>
+                    <version>2.1.9</version>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>com.google.guava</groupId>
+                            <artifactId>guava</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <version>1.7.13</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>com.fasterxml.uuid</groupId>
+                    <artifactId>java-uuid-generator</artifactId>
+                    <version>3.1.4</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang3</artifactId>
+                    <version>3.3.2</version>
+                </dependency>
+        -->
+
+        <!-- Java EE & Jersey -->
+
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+            <version>${servlet.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-json-jackson</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId>
+            <artifactId>jersey-container-servlet</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <!-- added for Guice Jersey integration -->
+        <dependency>
+            <groupId>org.glassfish.hk2</groupId>
+            <artifactId>guice-bridge</artifactId>
+            <version>${guice-bridge.version}</version>
+        </dependency>
+
+        <!-- added for Guice Jersey integration -->
+        <dependency>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-servlet</artifactId>
+            <version>${guice.version}</version>
+        </dependency>
+
+        <!-- added to enable logging from within Jersey -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+
+        <!-- Testing -->
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.jersey.test-framework.providers</groupId>
+            <artifactId>jersey-test-framework-provider-jetty</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
     </dependencies>
 
     <!--

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
new file mode 100644
index 0000000..9d9c972
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import 
org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Akka queueing application
+ */
+public class App implements MetricsService {
+    private static final Logger logger = LoggerFactory.getLogger( App.class );
+
+    // TODO: can we avoid this kludge with better Akka-Guice integration?
+    static public Injector INJECTOR;
+
+    private final ActorSystemFig          actorSystemFig;
+    private final ActorSystemManager      actorSystemManager;
+    private final DistributedQueueService distributedQueueService;
+    private final MetricRegistry          metrics = new MetricRegistry();
+
+
+    @Inject
+    public App(
+            Injector                  injector,
+            ActorSystemFig            actorSystemFig,
+            ActorSystemManager        actorSystemManager,
+            DistributedQueueService   distributedQueueService,
+            MigrationManager          migrationManager) {
+
+        this.INJECTOR = injector;
+        this.actorSystemFig = actorSystemFig;
+        this.actorSystemManager = actorSystemManager;
+        this.distributedQueueService = distributedQueueService;
+
+        try {
+            migrationManager.migrate();
+        } catch (MigrationException e) {
+            throw new QakkaRuntimeException( "Error running migration", e );
+        }
+    }
+
+    /**
+     * Init Akka ActorSystems and wait for request actors to init.
+     */
+    public void start() {
+        start(
+            actorSystemFig.getHostname(),
+            Integer.parseInt(actorSystemFig.getPort()), // TODO: make port an 
int in Actor System module
+            actorSystemFig.getRegionLocal());
+    }
+
+    /**
+     * For testing purposes only; does not wait for request actors to init.
+     */
+    public void start( String h, Integer p, String r ) {
+        actorSystemManager.start( h, p, r );
+        actorSystemManager.waitForClientActor();
+        distributedQueueService.init();
+    }
+
+
+    @Override
+    public MetricRegistry getMetricRegistry() {
+        return metrics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
new file mode 100644
index 0000000..378ba0d
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import com.codahale.metrics.MetricRegistry;
+
+
+public interface MetricsService {
+
+    String SEND_TIME_TOTAL  = 
"org.apache.usergrid.persistence.qakka.send.time.total";
+    String SEND_TIME_SEND   = 
"org.apache.usergrid.persistence.qakka.send.time.send";
+    String SEND_TIME_WRITE  = 
"org.apache.usergrid.persistence.qakka.send.time.write";
+    String GET_TIME_TOTAL   = 
"org.apache.usergrid.persistence.qakka.get.time.total";
+    String GET_TIME_GET     = 
"org.apache.usergrid.persistence.qakka.get.time.get";
+    String ACK_TIME_TOTAL   = 
"org.apache.usergrid.persistence.qakka.ack.time.total";
+    String ACK_TIME_ACK     = 
"org.apache.usergrid.persistence.qakka.ack.time.ack";
+    String TIMEOUT_TIME     = 
"org.apache.usergrid.persistence.qakka.timeout.time";
+    String REFRESH_TIME     = 
"org.apache.usergrid.persistence.qakka.timeout.time";
+    String ALLOCATE_TIME    = 
"org.apache.usergrid.persistence.qakka.allocate.time";
+
+    MetricRegistry getMetricRegistry();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
new file mode 100644
index 0000000..6f3df11
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+import java.io.Serializable;
+
+
+@FigSingleton
+public interface QakkaFig extends GuicyFig, Serializable {
+
+    String QUEUE_NUM_ACTORS                       = "queue.num.actors";
+
+    String QUEUE_SENDER_NUM_ACTORS                = "queue.sender.num.actors";
+
+    String QUEUE_WRITER_NUM_ACTORS                = "queue.writer.num.actors";
+
+    String QUEUE_TIMEOUT_SECONDS                  = "queue.timeout.seconds";
+
+    String QUEUE_REFRESH_MILLISECONDS             = 
"queue.refresh.milliseconds";
+
+    String QUEUE_INMEMORY_SIZE                    = "queue.inmemory.size";
+
+    String QUEUE_SEND_MAX_RETRIES                 = "queue.send.max.retries";
+
+    String QUEUE_SEND_TIMEOUT                     = 
"queue.send.timeout.seconds";
+
+    String QUEUE_GET_MAX_RETRIES                  = "queue.get.max.retries";
+
+    String QUEUE_GET_TIMEOUT                      = 
"queue.get.timeout.seconds";
+
+    String QUEUE_MAX_SHARD_COUNTER                = 
"queue.max.inmemory.shard.counter";
+
+    String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = 
"queue.shard.allocation.check.frequency.millis";
+
+    String QUEUE_SHARD_ALLOCATION_ADVANCE_TIME    = 
"queue.shard.allocation.advance.time.millis";
+
+    String QUEUE_SHARD_MAX_SIZE                   = "queue.shard.max.size";
+
+
+    /** Queue senders send to queue writers */
+    @Key(QUEUE_SENDER_NUM_ACTORS)
+    @Default("200")
+    int getNumQueueSenderActors();
+
+    /** Queue writers write to Cassandra */
+    @Key(QUEUE_WRITER_NUM_ACTORS)
+    @Default("500")
+    int getNumQueueWriterActors();
+
+    /** Queue actors handle get, ack and manage scheduled timeout and refersh 
tasks */
+    @Key(QUEUE_NUM_ACTORS)
+    @Default("500")
+    int getNumQueueActors();
+
+    /** Time for queue messages to timeout, if not set per queue */
+    @Key(QUEUE_TIMEOUT_SECONDS)
+    @Default("10")
+    int getQueueTimeoutSeconds();
+
+    /** How often to refresh each queue's in-memory data */
+    @Key(QUEUE_REFRESH_MILLISECONDS)
+    @Default("500")
+    int getQueueRefreshMilliseconds();
+
+    /** How many queue messages to keep in-memory */
+    @Key(QUEUE_INMEMORY_SIZE)
+    @Default("1000")
+    int getQueueInMemorySize();
+
+    /** Max number of times to retry call to queue actor for queue get 
operation */
+    @Key(QUEUE_GET_MAX_RETRIES)
+    @Default("5")
+    int getMaxGetRetries();
+
+    /** How long to wait for response from queue actor before timing out and 
trying again */
+    @Key(QUEUE_GET_TIMEOUT)
+    @Default("2")
+    int getGetTimeoutSeconds();
+
+    /** Max number of times to retry call to queue writer for queue send 
operation */
+    @Key(QUEUE_SEND_MAX_RETRIES)
+    @Default("5")
+    int getMaxSendRetries();
+
+    /** How long to wait for response from queue writer before timing out and 
trying again */
+    @Key(QUEUE_SEND_TIMEOUT)
+    @Default("2")
+    int getSendTimeoutSeconds();
+
+    /** Once counter reaches this value, write it to permanent storage */
+    @Key(QUEUE_MAX_SHARD_COUNTER)
+    @Default("100")
+    long getMaxInMemoryShardCounter();
+
+    /** How often to check whether new shard is needed for each queue */
+    @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
+    @Default("5000")
+    long getShardAllocationCheckFrequencyMillis();
+
+    /** New shards are created in advance of the time they will be used */
+    @Key(QUEUE_SHARD_ALLOCATION_ADVANCE_TIME)
+    @Default("5000")
+    long getShardAllocationAdvanceTimeMillis();
+
+    /** Max size to allow for a shard */
+    @Key(QUEUE_SHARD_MAX_SIZE)
+    @Default("400000")
+    long getMaxShardSize();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
new file mode 100644
index 0000000..b7c977c
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.multibindings.Multibinder;
+import com.netflix.config.ConfigurationManager;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemModule;
+import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.qakka.api.URIStrategy;
+import org.apache.usergrid.persistence.qakka.api.impl.URIStrategyLocalhost;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import 
org.apache.usergrid.persistence.qakka.distributed.actors.QueueActorHelper;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.DistributedQueueServiceImpl;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
+import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
+import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardStrategyImpl;
+import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.impl.TransferLogSerializationImpl;
+import org.safehaus.guicyfig.GuicyFigModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+public class QakkaModule extends AbstractModule {
+    private static final Logger logger = LoggerFactory.getLogger( 
QakkaModule.class );
+
+    static {
+        try {
+            // TODO: reconcile with usergrid props
+            // load properties from one properties file using Netflix Archaius 
so that GuicyFig will see them
+            ConfigurationManager.loadCascadedPropertiesFromResources( "qakka" 
);
+        } catch (IOException e) {
+            throw new RuntimeException( "Cannot qakka.properties file", e );
+        }
+    }
+
+    @Override
+    protected void configure() {
+
+        install( new CommonModule() );
+        install( new ActorSystemModule() );
+        install( new GuicyFigModule( QakkaFig.class ) );
+
+        bind( App.class );
+
+        bind( CassandraClient.class ).to(           CassandraClientImpl.class 
);
+        bind( MetricsService.class ).to(            App.class );
+
+        bind( QueueManager.class ).to(              QueueManagerImpl.class );
+        bind( QueueSerialization.class ).to(        
QueueSerializationImpl.class );
+
+        bind( QueueMessageManager.class ).to(       
QueueMessageManagerImpl.class );
+        bind( QueueMessageSerialization.class ).to( 
QueueMessageSerializationImpl.class );
+
+        bind( ShardSerialization.class ).to(        
ShardSerializationImpl.class );
+        bind( ShardStrategy.class ).to(             ShardStrategyImpl.class );
+
+        bind( ShardCounterSerialization.class ).to( 
ShardCounterSerializationImpl.class );
+
+        bind( TransferLogSerialization.class ).to(  
TransferLogSerializationImpl.class );
+        bind( AuditLogSerialization.class ).to(     
AuditLogSerializationImpl.class );
+        bind( DistributedQueueService.class ).to(   
DistributedQueueServiceImpl.class );
+
+        bind( QueueActorRouterProducer.class );
+        bind( QueueWriterRouterProducer.class );
+        bind( QueueSenderRouterProducer.class );
+        bind( QueueActorHelper.class );
+
+        bind( Regions.class );
+        bind( URIStrategy.class ).to( URIStrategyLocalhost.class );
+
+        Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( 
binder(), Migration.class );
+
+        migrationBinder.addBinding().to( Key.get( AuditLogSerialization.class 
) );
+        //migrationBinder.addBinding().to( Key.get( 
MessageCounterSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( 
QueueMessageSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( QueueSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( 
ShardCounterSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( 
TransferLogSerialization.class ) );
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java
new file mode 100644
index 0000000..c2c0910
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collection;
+
+
+@XmlRootElement
+public class ApiResponse {
+
+    private String message;
+    private Integer count;
+    private Collection<Queue> queues;
+    private Collection<QueueMessage> queueMessages;
+
+    public Collection<Queue> getQueues() {
+        return queues;
+    }
+
+    public void setQueues(Collection<Queue> queues) {
+        this.queues = queues;
+    }
+
+    public Collection<QueueMessage> getQueueMessages() {
+        return queueMessages;
+    }
+
+    public void setQueueMessages(Collection<QueueMessage> queueMessages) {
+        this.queueMessages = queueMessages;
+    }
+
+    public Integer getCount() {
+        return count;
+    }
+
+    public void setCount(Integer count) {
+        this.count = count;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
new file mode 100644
index 0000000..f82661c
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.core.*;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+
+@Path("queues")
+public class QueueResource {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueResource.class );
+
+    private final QueueManager queueManager;
+    private final QueueMessageManager queueMessageManager;
+    private final MetricsService            metricsService;
+    private final URIStrategy               uriStrategy;
+    private final Regions regions;
+    private final ShardCounterSerialization shardCounterSerialization;
+
+
+    @Inject
+    public QueueResource(
+            QueueManager              queueManager,
+            QueueMessageManager       queueMessageManager,
+            MetricsService            metricsService,
+            URIStrategy               uriStrategy,
+            Regions                   regions,
+            ShardCounterSerialization shardCounterSerialization ) {
+
+        this.queueManager              = queueManager;
+        this.queueMessageManager       = queueMessageManager;
+        this.metricsService            = metricsService;
+        this.uriStrategy               = uriStrategy;
+        this.regions                   = regions;
+        this.shardCounterSerialization = shardCounterSerialization;
+    }
+
+
+    @POST
+    @Consumes({MediaType.APPLICATION_JSON})
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response createQueue(Queue queue ) throws Exception {
+
+        Preconditions.checkArgument(queue != null, "Queue configuration is 
required");
+        
Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queue.getName()), "Queue 
name is required");
+
+        queueManager.createQueue(queue);
+
+        ApiResponse apiResponse = new ApiResponse();
+        apiResponse.setQueues( Collections.singletonList(queue) );
+        return Response.created( uriStrategy.queueURI( queue.getName() 
)).entity(apiResponse).build();
+    }
+
+
+    @PUT
+    @Path( "{queueName}/config" )
+    @Consumes({MediaType.APPLICATION_JSON})
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response updateQueueConfig( @PathParam("queueName") String 
queueName, Queue queue) {
+
+        Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), 
"Queue name is required");
+        Preconditions.checkArgument(queue != null, "Queue configuration is 
required");
+
+        queue.setName(queueName);
+        queueManager.updateQueueConfig(queue);
+
+        ApiResponse apiResponse = new ApiResponse();
+        apiResponse.setQueues( Collections.singletonList(queue) );
+        return Response.ok().entity(apiResponse).build();
+    }
+
+
+    @DELETE
+    @Path( "{queueName}" )
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response deleteQueue( @PathParam("queueName") String queueName,
+                                 @QueryParam("confirm") @DefaultValue("false") 
Boolean confirmedParam) {
+
+        Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), 
"Queue name is required");
+        Preconditions.checkArgument(confirmedParam != null, "Confirm parameter 
is required");
+
+        ApiResponse apiResponse = new ApiResponse();
+
+        if ( confirmedParam ) {
+            queueManager.deleteQueue( queueName );
+            return Response.ok().entity( apiResponse ).build();
+        }
+
+        apiResponse.setMessage( "confirm parameter must be true" );
+        return Response.status( Response.Status.BAD_REQUEST ).entity( 
apiResponse ).build();
+    }
+
+
+    @GET
+    @Path( "{queueName}/config" )
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getQueueConfig( @PathParam("queueName") String queueName) {
+
+        Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), 
"Queue name is required");
+
+        ApiResponse apiResponse = new ApiResponse();
+        Queue queue = queueManager.getQueueConfig( queueName );
+        if ( queue != null ) {
+            apiResponse.setQueues( Collections.singletonList(queue) );
+            return Response.ok().entity(apiResponse).build();
+        }
+        return Response.status( Response.Status.NOT_FOUND ).build();
+    }
+
+
+    @GET
+    @Produces({MediaType.APPLICATION_JSON})
+    public List<String> getListOfQueues() {
+
+        // TODO: create design to handle large number of queues, e.g. paging 
and/or hierarchy of queues
+
+        // TODO: create design to support multi-tenant usage, authentication, 
etc.
+
+        return queueManager.getListOfQueues();
+    }
+
+
+    @GET
+    @Path( "{queueName}/stats" )
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getQueueStats( @PathParam("queueName") String queueName) 
throws Exception {
+        // TODO: implement GET /queues/{queueName}/stats
+        throw new UnsupportedOperationException();
+    }
+
+
+    Long convertDelayParameter(String delayParam) {
+        Long delayMs = 0L;
+        if (!QakkaUtils.isNullOrEmpty(delayParam)) {
+            switch (delayParam.toUpperCase()) {
+                case "NONE":
+                case "":
+                    delayMs = 0L;
+                    break;
+                default:
+                    try {
+                        delayMs = Long.parseLong(delayParam);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException("Invalid delay 
parameter");
+                    }
+                    break;
+            }
+        }
+        return delayMs;
+    }
+
+    Long convertExpirationParameter(String expirationParam) throws 
IllegalArgumentException {
+        Long expirationSecs = null;
+        if (!QakkaUtils.isNullOrEmpty(expirationParam)) {
+            switch (expirationParam.toUpperCase()) {
+                case "NEVER":
+                case "":
+                    expirationSecs = null;
+                    break;
+                default:
+                    try {
+                        expirationSecs = Long.parseLong(expirationParam);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException("Invalid expiration 
parameter");
+                    }
+                    break;
+            }
+        }
+        return expirationSecs;
+    }
+
+
+    /**
+     * Send a queue message with a JSON payload.
+     *
+     * @param queueName         Name of queue to target (queue must exist)
+     * @param regionsParam      Comma-separated list of regions to send to
+     * @param delayParam        Delay (ms) before sending message (not yet 
supported)
+     * @param expirationParam   Time (ms) after which message will expire (not 
yet supported)
+     * @param messageBody       JSON payload in string form
+     */
+    @POST
+    @Path( "{queueName}/messages" )
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response sendMessageJson(
+            @PathParam("queueName")                     String queueName,
+            @QueryParam("regions" )   @DefaultValue("") String regionsParam,
+            @QueryParam("delay")      @DefaultValue("") String delayParam,
+            @QueryParam("expiration") @DefaultValue("") String expirationParam,
+                                                        String messageBody) 
throws Exception {
+
+        return sendMessage( queueName, regionsParam, delayParam, 
expirationParam,
+                MediaType.APPLICATION_JSON, ByteBuffer.wrap( 
messageBody.getBytes() ) );
+    }
+
+
+    /**
+     * Send a queue message with a binary data payload.
+     *
+     * @param queueName         Name of queue to target (queue must exist)
+     * @param regionsParam      Comma-separated list of regions to send to
+     * @param delayParam        Delay (ms) before sending message (not yet 
supported)
+     * @param expirationParam   Time (ms) after which message will expire (not 
yet supported)
+     * @param actualContentType Content type of messageBody data (if not 
application/octet-stream)
+     * @param messageBody       Binary data that is the payload of the queue 
message
+     */
+    @POST
+    @Path( "{queueName}/messages" )
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response sendMessageBinary(
+            @PathParam("queueName")                     String queueName,
+            @QueryParam("regions" )   @DefaultValue("") String regionsParam,
+            @QueryParam("delay")      @DefaultValue("") String delayParam,
+            @QueryParam("expiration") @DefaultValue("") String expirationParam,
+            @QueryParam("contentType")                  String 
actualContentType,
+                                                        byte[] messageBody) 
throws Exception {
+
+        String contentType = actualContentType != null ? actualContentType : 
MediaType.APPLICATION_OCTET_STREAM;
+
+        return sendMessage( queueName, regionsParam, delayParam, 
expirationParam,
+                contentType, ByteBuffer.wrap( messageBody ) );
+    }
+
+
+    private Response sendMessage( String queueName,
+                                   String regionsParam,
+                                   String delayParam,
+                                   String expirationParam,
+                                   String contentType,
+                                   ByteBuffer byteBuffer) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.SEND_TIME_TOTAL ).time();
+        try {
+
+            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName 
), "Queue name is required" );
+
+            // if regions, delay or expiration are empty string, would get the 
defaults from the queue
+            if (regionsParam.equals( "" )) {
+                regionsParam = Regions.LOCAL;
+            }
+
+            Long delayMs = convertDelayParameter( delayParam );
+
+            Long expirationSecs = convertExpirationParameter( expirationParam 
);
+
+            List<String> regionList = regions.getRegions( regionsParam );
+
+            queueMessageManager.sendMessages( queueName, regionList, delayMs, 
expirationSecs,
+                    contentType, byteBuffer );
+
+            ApiResponse apiResponse = new ApiResponse();
+            apiResponse.setCount( 1 );
+            return Response.ok().entity( apiResponse ).build();
+
+        } finally {
+            timer.close();
+        }
+    }
+
+
+    @GET
+    @Path( "{queueName}/messages" )
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response getNextMessages( @PathParam("queueName") String queueName,
+                                     @QueryParam("count") @DefaultValue("1") 
String countParam) throws Exception {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.GET_TIME_TOTAL ).time();
+        try {
+
+            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName 
), "Queue name is required" );
+
+            int count = 1;
+            try {
+                count = Integer.parseInt( countParam );
+            } catch (Exception e) {
+                throw new IllegalArgumentException( "Invalid count parameter" 
);
+            }
+            if (count <= 0) {
+                // invalid count
+                throw new IllegalArgumentException( "Count must be >= 1" );
+            }
+
+            List<QueueMessage> messages = queueMessageManager.getNextMessages( 
queueName, count );
+
+            ApiResponse apiResponse = new ApiResponse();
+
+            if (messages != null && !messages.isEmpty()) {
+                apiResponse.setQueueMessages( messages );
+
+            } else { // always return queueMessages field
+                apiResponse.setQueueMessages( Collections.EMPTY_LIST );
+            }
+            apiResponse.setCount( apiResponse.getQueueMessages().size() );
+            return Response.ok().entity( apiResponse ).build();
+
+        } finally {
+            timer.close();
+        }
+    }
+
+
+    @DELETE
+    @Path( "{queueName}/messages/{queueMessageId}" )
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response ackMessage( @PathParam("queueName") String queueName,
+                                @PathParam("queueMessageId") String 
queueMessageId) throws Exception {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( 
MetricsService.ACK_TIME_TOTAL ).time();
+        try {
+
+            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName 
), "Queue name is required" );
+
+            UUID messageUuid;
+            try {
+                messageUuid = UUID.fromString( queueMessageId );
+            } catch (Exception e) {
+                throw new IllegalArgumentException( "Invalid queue message 
UUID" );
+            }
+            queueMessageManager.ackMessage( queueName, messageUuid );
+
+            ApiResponse apiResponse = new ApiResponse();
+            return Response.ok().entity( apiResponse ).build();
+
+        } finally {
+            timer.close();
+        }
+    }
+
+
+    @GET
+    @Path( "{queueName}/data/{queueMessageId}" )
+    public Response getMessageData(
+            @PathParam("queueName") String queueName,
+            @PathParam("queueMessageId") String queueMessageIdParam ) {
+
+        Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), 
"Queue name is required");
+
+        UUID queueMessageId;
+        try {
+            queueMessageId = UUID.fromString(queueMessageIdParam);
+        }
+        catch (Exception e) {
+            throw new IllegalArgumentException("Invalid queue message UUID");
+        }
+
+        QueueMessage message = queueMessageManager.getMessage( queueName, 
queueMessageId );
+        if ( message == null ) {
+            throw new NotFoundException(
+                    "Message not found for queueName: " + queueName + " queue 
message id: " + queueMessageId );
+        }
+
+        ByteBuffer messageData = queueMessageManager.getMessageData( 
message.getMessageId() );
+        if ( messageData == null ) {
+            throw new NotFoundException( "Message data not found queueName: " 
+ queueName
+                    + " queue message id: " + queueMessageId + " message id: " 
+ message.getMessageId() );
+        }
+
+        ByteBufferBackedInputStream input = new ByteBufferBackedInputStream( 
messageData );
+
+        StreamingOutput stream = output -> {
+            try {
+                ByteStreams.copy(input, output);
+            } catch (Exception e) {
+                throw new WebApplicationException(e);
+            }
+        };
+
+        return Response.ok( stream ).header( "Content-Type", 
message.getContentType() ).build();
+    }
+
+
+//    @PUT
+//    @Path( "{queueName}/messages/{queueMessageId}" )
+//    @Produces({MediaType.APPLICATION_JSON})
+//    public Response requeueMessage( @PathParam("queueName") String queueName,
+//                                    @PathParam("queueMessageId") String 
queueMessageIdParam,
+//                                    @QueryParam("delay") @DefaultValue("") 
String delayParam) throws Exception {
+//
+//        Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), 
"Queue name is required");
+//
+//        UUID queueMessageId;
+//        try {
+//            queueMessageId = UUID.fromString(queueMessageIdParam);
+//        }
+//        catch (Exception e) {
+//            throw new IllegalArgumentException("Invalid message UUID");
+//        }
+//        Long delayMs = convertDelayParameter(delayParam);
+//
+//        queueMessageManager.requeueMessage(queueName, queueMessageId, 
delayMs);
+//
+//        ApiResponse apiResponse = new ApiResponse();
+//        return Response.ok().entity(apiResponse).build();
+//    }
+//
+//
+//    @DELETE
+//    @Path( "{queueName}/messages" )
+//    @Produces({MediaType.APPLICATION_JSON})
+//    public Response clearMessages( @PathParam("queueName") String queueName,
+//                                   @QueryParam("confirm") 
@DefaultValue("false") Boolean confirmed) throws Exception {
+//
+//        // TODO: implement DELETE /queues/{queueName}/messages"
+//        throw new UnsupportedOperationException();
+//    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java
new file mode 100644
index 0000000..1d1c836
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+import com.codahale.metrics.Timer;
+import com.google.inject.servlet.RequestScoped;
+import org.apache.usergrid.persistence.qakka.App;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.SortedSet;
+
+
+@Path("status")
+@RequestScoped
+public class StatusResource {
+    private static final Logger logger = LoggerFactory.getLogger( 
StatusResource.class );
+
+    private App app;
+
+    @Inject
+    public StatusResource( App app ) {
+        this.app = app;
+    }
+
+    @GET
+    @Produces( MediaType.APPLICATION_JSON )
+    public Object status() {
+
+        final DecimalFormat format = new DecimalFormat("##.###");
+        final long nano = 1000000000;
+
+        return new HashMap<String, Object>() {{
+            put( "name", "Qakka" );
+            try {
+                put( "host", InetAddress.getLocalHost().getHostName() );
+            } catch (UnknownHostException e) {
+                put( "host", "unknown" );
+            }
+            SortedSet<String> names = app.getMetricRegistry().getNames();
+            for (String name : names) {
+                Timer t = app.getMetricRegistry().timer( name );
+                put( name, new HashMap<String, Object>() {{
+                    put( "count", ""            + t.getCount() );
+                    put( "mean_rate", ""        + format.format( 
t.getMeanRate() ) );
+                    put( "one_minute_rate", ""  + format.format( 
t.getOneMinuteRate() ) );
+                    put( "five_minute_rate", "" + format.format( 
t.getFiveMinuteRate() ) );
+                    put( "mean", ""             + format.format( 
t.getSnapshot().getMean() / nano ) );
+                    put( "min", ""              + format.format( (double) 
t.getSnapshot().getMin() / nano ) );
+                    put( "max", ""              + format.format( (double) 
t.getSnapshot().getMax() / nano ) );
+                }} );
+            }
+        }};
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java
new file mode 100644
index 0000000..2f0b65c
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.api;
+
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+public interface URIStrategy {
+
+    URI queueURI(String queueName) throws URISyntaxException;
+
+    URI queueMessageDataURI(String queueName, UUID queueMessageId) throws 
URISyntaxException;
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java
new file mode 100644
index 0000000..88d6394
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import org.apache.usergrid.persistence.qakka.api.ApiResponse;
+import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+
+@Provider
+public class BadRequestMapper implements ExceptionMapper<BadRequestException> {
+
+    public Response toResponse( BadRequestException ex ) {
+
+        ApiResponse apiResponse = new ApiResponse();
+        apiResponse.setMessage( ex.getMessage() );
+
+        return Response.status(400).entity( apiResponse ).type( 
MediaType.APPLICATION_JSON ).build();
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java
new file mode 100644
index 0000000..a3485d0
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import org.glassfish.hk2.api.ServiceLocator;
+import org.glassfish.jersey.ServiceLocatorProvider;
+import org.jvnet.hk2.guice.bridge.api.GuiceBridge;
+import org.jvnet.hk2.guice.bridge.api.GuiceIntoHK2Bridge;
+
+import javax.ws.rs.core.Feature;
+import javax.ws.rs.core.FeatureContext;
+import javax.ws.rs.ext.Provider;
+
+
+@Provider
+public class GuiceFeature implements Feature {
+
+    @Override
+    public boolean configure(FeatureContext context) {
+
+        ServiceLocator serviceLocator = 
ServiceLocatorProvider.getServiceLocator( context );
+        GuiceBridge.getGuiceBridge().initializeGuiceBridge( serviceLocator );
+
+        GuiceIntoHK2Bridge guiceBridge = serviceLocator.getService( 
GuiceIntoHK2Bridge.class );
+        guiceBridge.bridgeGuiceInjector( StartupListener.INJECTOR );
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java
new file mode 100644
index 0000000..b1400ca
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import javax.ws.rs.ext.ContextResolver;
+import javax.ws.rs.ext.Provider;
+
+
+@Provider
+public class JacksonProvider implements ContextResolver<ObjectMapper> {
+    final ObjectMapper mapper;
+
+    public JacksonProvider() {
+        mapper = new ObjectMapper();
+        mapper.enable( SerializationFeature.INDENT_OUTPUT );
+        mapper.setSerializationInclusion( JsonInclude.Include.NON_NULL );
+    }
+
+    @Override
+    public ObjectMapper getContext(final Class<?> type) {
+         return mapper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java
new file mode 100644
index 0000000..86cfe05
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import org.glassfish.jersey.server.ResourceConfig;
+
+
+public class JerseyResourceConfig extends ResourceConfig {
+
+    public JerseyResourceConfig() {
+        packages( "org.apache.usergrid.persistence.qakka.api" );
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java
new file mode 100644
index 0000000..2725766
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import org.apache.usergrid.persistence.qakka.api.ApiResponse;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+
+@Provider
+public class NotFoundMapper implements ExceptionMapper<NotFoundException> {
+
+    public Response toResponse( NotFoundException ex ) {
+
+        ApiResponse apiResponse = new ApiResponse();
+        apiResponse.setMessage( ex.getMessage() );
+
+        return Response.status(404).entity( apiResponse ).type( 
MediaType.APPLICATION_JSON ).build();
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java
new file mode 100644
index 0000000..d97363a
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.google.inject.servlet.ServletModule;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+import java.util.logging.LogManager;
+
+
+public class StartupListener extends GuiceServletContextListener {
+    private static final Logger logger = LoggerFactory.getLogger( 
StartupListener.class );
+
+    public static App APP = null;
+
+    public static Injector INJECTOR = null;
+
+    static {
+
+        try {
+            LogManager.getLogManager().reset();
+            SLF4JBridgeHandler.install();
+
+            INJECTOR = Guice.createInjector( new ServletModule() {
+                @Override
+                protected void configureServlets() {
+                    install( new QakkaModule() );
+                }
+            } );
+
+            APP = INJECTOR.getInstance( App.class );
+
+            APP.start();
+
+        } catch ( Throwable t ) {
+            logger.error("Error static initializing StartupListener class", t);
+        }
+
+    }
+
+    @Override
+    protected Injector getInjector() {
+        return INJECTOR;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java
new file mode 100644
index 0000000..6f8dbb2
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.api.impl;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.api.URIStrategy;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+
+/** TODO: implement a "real" URI strategy */
+public class URIStrategyLocalhost implements URIStrategy {
+
+    final private String hostname;
+
+    @Inject
+    public URIStrategyLocalhost( ActorSystemFig actorSystemFig ) {
+        this.hostname = actorSystemFig.getHostname();
+    }
+
+    @Override
+    public URI queueURI(String queueName) throws URISyntaxException {
+        return new URI("http://"; + hostname + ":8080/api/queues/" + queueName);
+    }
+
+    @Override
+    public URI queueMessageDataURI(String queueName, UUID queueMessageId) 
throws URISyntaxException {
+        return new URI("http://"; + hostname + ":8080/api/queues/" + queueName 
+ "/data/" + queueMessageId );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
new file mode 100644
index 0000000..9f40b51
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.Session;
+
+/**
+ * Created by Dave Johnson (snoopd...@apache.org) on 9/9/16.
+ */
+public interface CassandraClient {
+    Session getSession();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
new file mode 100644
index 0000000..ed665c2
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Singleton
+public class CassandraClientImpl implements CassandraClient {
+    private static final Logger logger = LoggerFactory.getLogger( 
CassandraClientImpl.class );
+
+    private final Session session;
+
+    @Inject
+    public CassandraClientImpl( DataStaxCluster dataStaxCluster) {
+
+        logger.info("Constructing Cassandra client");
+
+        this.session = dataStaxCluster.getApplicationSession();
+    }
+
+
+    @Override
+    public Session getSession() {
+        return session;
+    }
+}

Reply via email to