http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
new file mode 100644
index 0000000..c154067
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
+import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class QueueMessageManagerTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageManagerTest.class );
+
+    // TODO: test that multiple threads pulling from same queue will never pop 
same item
+
+    protected Injector myInjector = null;
+
+    @Override
+    protected Injector getInjector() {
+        if ( myInjector == null ) {
+            myInjector = Guice.createInjector( new QakkaModule() );
+        }
+        return myInjector;
+    }
+
+
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        DistributedQueueService distributedQueueService = 
getInjector().getInstance( DistributedQueueService.class );
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        // create queue and send one message to it
+        String queueName = "qmmt_queue_" + 
RandomStringUtils.randomAlphanumeric(15);
+        QueueManager queueManager = getInjector().getInstance( 
QueueManager.class );
+        QueueMessageManager qmm = getInjector().getInstance( 
QueueMessageManager.class );
+        queueManager.createQueue( new Queue( queueName, "test-type", region, 
region, 0L, 5, 10, null ));
+        String jsonData = "{}";
+        qmm.sendMessages( queueName, Collections.singletonList(region), null, 
null,
+                "application/json", DataType.serializeValue( jsonData, 
ProtocolVersion.NEWEST_SUPPORTED) );
+
+        distributedQueueService.refresh();
+        Thread.sleep(1000);
+
+        // get message from the queue
+        List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+        Assert.assertEquals( 1, messages.size() );
+        QueueMessage message = messages.get(0);
+
+        // test that queue message data is present and correct
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+        DatabaseQueueMessageBody data = qms.loadMessageData( 
message.getMessageId() );
+        Assert.assertNotNull( data );
+        Assert.assertEquals( "application/json", data.getContentType() );
+        String jsonDataReturned = new String( data.getBlob().array(), 
Charset.forName("UTF-8") );
+        Assert.assertEquals( jsonData, jsonDataReturned );
+
+        // test that transfer log is empty for our queue
+        TransferLogSerialization tlogs = getInjector().getInstance( 
TransferLogSerialization.class );
+        Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+        List<TransferLog> logs = all.getEntities().stream()
+                .filter( log -> log.getQueueName().equals( queueName ) 
).collect( Collectors.toList() );
+        Assert.assertTrue( logs.isEmpty() );
+
+        // ack the message
+        qmm.ackMessage( queueName, message.getQueueMessageId() );
+
+        // test that message is no longer stored in non-replicated keyspace
+
+        Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() 
));
+
+        Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.INFLIGHT, 
message.getQueueMessageId() ));
+
+        // test that audit log entry was written
+        AuditLogSerialization auditLogSerialization = 
getInjector().getInstance( AuditLogSerialization.class );
+        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( 
message.getMessageId() );
+        Assert.assertEquals( 3, auditLogs.getEntities().size() );
+    }
+
+
+    @Test
+    public void testQueueMessageTimeouts() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        DistributedQueueService distributedQueueService = 
getInjector().getInstance( DistributedQueueService.class );
+        QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        // create some number of queue messages
+
+        QueueManager queueManager = getInjector().getInstance( 
QueueManager.class );
+        QueueMessageManager qmm = getInjector().getInstance( 
QueueMessageManager.class );
+        String queueName = "qmmt_queue_" + 
RandomStringUtils.randomAlphanumeric(15);
+        queueManager.createQueue( new Queue( queueName, "test-type", region, 
region, 0L, 5, 10, null ));
+
+        int numMessages = 40;
+
+        for ( int i=0; i<numMessages; i++ ) {
+            qmm.sendMessages(
+                    queueName,
+                    Collections.singletonList( region ),
+                    null, // delay
+                    null, // expiration
+                    "application/json",
+                    DataType.serializeValue( "{}", 
ProtocolVersion.NEWEST_SUPPORTED ) );
+        }
+
+        distributedQueueService.refresh();
+        Thread.sleep(1000);
+
+        // get all messages from queue
+
+        List<QueueMessage> messages = qmm.getNextMessages( queueName, 
numMessages );
+        Assert.assertEquals( numMessages, messages.size() );
+
+        // ack half of the messages
+
+        List<QueueMessage> remove = new ArrayList<>();
+
+        for ( int i=0; i<numMessages/2; i++ ) {
+            QueueMessage queueMessage = messages.get( i );
+            qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+            remove.add( queueMessage );
+        }
+
+        for ( QueueMessage message : remove ) {
+            messages.remove( message );
+        }
+
+        // wait for twice timeout period
+
+        Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+
+        distributedQueueService.processTimeouts();
+
+        Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+
+        // attempt to ack other half of messages
+
+        for ( QueueMessage message : messages ) {
+            try {
+                qmm.ackMessage( queueName, message.getQueueMessageId() );
+                Assert.fail("Message should have timed out by now");
+
+            } catch ( QakkaRuntimeException expected ) {
+                // keep on going...
+            }
+        }
+    }
+
+
+    @Test
+    public void testGetWithMissingData() throws InterruptedException {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        DistributedQueueService qas         = getInjector().getInstance( 
DistributedQueueService.class );
+        QueueManager qm               = getInjector().getInstance( 
QueueManager.class );
+        QueueMessageManager qmm       = getInjector().getInstance( 
QueueMessageManager.class );
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        // create queue messages, every other one with missing data
+
+        int numMessages = 100;
+        String queueName = "qmmt_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 
5, 10, null ));
+
+        for ( int i=0; i<numMessages; i++ ) {
+
+            final UUID messageId = QakkaUtils.getTimeUuid();
+
+            if ( i % 2 == 0 ) { // every other it
+                final String data = "my test data";
+                final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+                        DataType.serializeValue( data, 
ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+                qms.writeMessageData( messageId, messageBody );
+            }
+
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
+                    messageId,
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    System.currentTimeMillis(),
+                    null,
+                    queueMessageId);
+            qms.writeMessage( message );
+        }
+
+        qas.refresh();
+        Thread.sleep(1000);
+
+        int count = 0;
+        while ( count < numMessages / 2 ) {
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+            Assert.assertTrue( !messages.isEmpty() );
+            count += messages.size();
+            logger.debug("Got {} messages", ++count);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
new file mode 100644
index 0000000..829ba27
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+
+
+public class QueueActorServiceTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueActorServiceTest.class );
+
+    protected Injector myInjector = null;
+
+    @Override
+    protected Injector getInjector() {
+        if ( myInjector == null ) {
+            myInjector = Guice.createInjector( new QakkaModule() );
+        }
+        return myInjector;
+    }
+
+
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        DistributedQueueService distributedQueueService = 
getInjector().getInstance( DistributedQueueService.class );
+        QueueMessageSerialization serialization = getInjector().getInstance( 
QueueMessageSerialization.class );
+
+        String queueName = "testqueue_" + UUID.randomUUID();
+        QueueManager queueManager = getInjector().getInstance( 
QueueManager.class );
+        queueManager.createQueue( new Queue( queueName, "test-type", region, 
region, 0L, 5, 10, null ));
+
+        // send 1 queue message, get back one queue message
+        UUID messageId = UUIDGen.getTimeUUID();
+
+        final String data = "my test data";
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+                DataType.serializeValue( data, 
ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+        serialization.writeMessageData( messageId, messageBody );
+
+        distributedQueueService.sendMessageToRegion(
+                queueName, region, region, messageId, null, null);
+
+        distributedQueueService.refresh();
+        Thread.sleep(1000);
+
+        Collection<DatabaseQueueMessage> qmReturned = 
distributedQueueService.getNextMessages( queueName, 1 );
+        Assert.assertEquals( 1, qmReturned.size() );
+
+        DatabaseQueueMessage dqm = qmReturned.iterator().next();
+        DatabaseQueueMessageBody dqmb = serialization.loadMessageData( 
dqm.getMessageId() );
+        ByteBuffer blob = dqmb.getBlob();
+
+        String returnedData = new String( blob.array(), "UTF-8");
+//        ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
+//        ObjectInputStream ios = new ObjectInputStream( bais );
+//        String returnedData = (String)ios.readObject();
+
+        Assert.assertEquals( data, returnedData );
+
+    }
+
+
+    @Test
+    public void testGetMultipleQueueMessages() throws InterruptedException {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+
+        App app = getInjector().getInstance( App.class );
+        app.start("localhost", getNextAkkaPort(), region);
+
+        DistributedQueueService distributedQueueService = 
getInjector().getInstance( DistributedQueueService.class );
+        QueueMessageSerialization serialization = getInjector().getInstance( 
QueueMessageSerialization.class );
+        InMemoryQueue inMemoryQueue             = getInjector().getInstance( 
InMemoryQueue.class );
+
+        String queueName = "testqueue_" + UUID.randomUUID();
+        QueueManager queueManager = getInjector().getInstance( 
QueueManager.class );
+        queueManager.createQueue(
+                new Queue( queueName, "test-type", region, region, 0L, 5, 10, 
null ));
+
+        for ( int i=0; i<100; i++ ) {
+
+            UUID messageId = UUIDGen.getTimeUUID();
+
+            final String data = "my test data";
+            final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+                    DataType.serializeValue( data, 
ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
+            serialization.writeMessageData( messageId, messageBody );
+
+            distributedQueueService.sendMessageToRegion(
+                    queueName, region, region, messageId , null, null);
+        }
+
+        int maxRetries = 15;
+        int retries = 0;
+        while ( retries++ < maxRetries ) {
+            distributedQueueService.refresh();
+            Thread.sleep( 3000 );
+            if (inMemoryQueue.size( queueName ) == 100) {
+                break;
+            }
+        }
+
+        Assert.assertEquals( 100, inMemoryQueue.size( queueName ) );
+
+        Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
+        Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+
+        Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
+        Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+
+        Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
+        Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+
+        Assert.assertEquals( 25, distributedQueueService.getNextMessages( 
queueName, 25 ).size() );
+        Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
new file mode 100644
index 0000000..9e4128e
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+
+public class QueueActorHelperTest extends AbstractTest {
+
+    protected Injector myInjector = null;
+
+    @Override
+    protected Injector getInjector() {
+        if ( myInjector == null ) {
+            myInjector = Guice.createInjector( new QakkaModule() );
+        }
+        return myInjector;
+    }
+
+
+    @Test
+    public void loadDatabaseQueueMessage() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+        QueueManager queueManager     = getInjector().getInstance( 
QueueManager.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        String queueName = "qat_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        queueManager.createQueue( new Queue( queueName ) );
+
+        UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+        // write message
+
+        DatabaseQueueMessage message = new DatabaseQueueMessage(
+                QakkaUtils.getTimeUuid(),
+                DatabaseQueueMessage.Type.DEFAULT,
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,
+                System.currentTimeMillis(),
+                null,
+                queueMessageId);
+        qms.writeMessage( message );
+
+        // load message
+
+        QueueActorHelper helper = getInjector().getInstance( 
QueueActorHelper.class );
+        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+                queueName, message.getQueueMessageId(), message.getType() );
+
+        Assert.assertNotNull( queueMessage );
+    }
+
+
+    @Test
+    public void loadDatabaseQueueMessageNotFound() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+        QueueManager queueManager = getInjector().getInstance( 
QueueManager.class );
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        String queueName = "qat_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        queueManager.createQueue( new Queue( queueName ) );
+
+        // don't write any message
+
+        // load message
+
+        QueueActorHelper helper = getInjector().getInstance( 
QueueActorHelper.class );
+        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), 
DatabaseQueueMessage.Type.DEFAULT );
+
+        Assert.assertNull( queueMessage );
+    }
+
+
+    @Test
+    public void putInflight() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+        QueueManager queueManager     = getInjector().getInstance( 
QueueManager.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        // write message to messages_available table
+
+        UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+        String queueName = "qat_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        queueManager.createQueue( new Queue( queueName ) );
+
+        DatabaseQueueMessage message = new DatabaseQueueMessage(
+                QakkaUtils.getTimeUuid(),
+                DatabaseQueueMessage.Type.DEFAULT,
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,
+                System.currentTimeMillis(),
+                null,
+                queueMessageId);
+        qms.writeMessage( message );
+
+        // put message inflight
+
+        QueueActorHelper helper = getInjector().getInstance( 
QueueActorHelper.class );
+        helper.putInflight( queueName, message );
+
+        // message must be gone from messages_available table
+
+        Assert.assertNull( qms.loadMessage(
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,
+                DatabaseQueueMessage.Type.DEFAULT,
+                message.getQueueMessageId() ) );
+
+        // message must be present in messages_inflight table
+
+        Assert.assertNotNull( qms.loadMessage(
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,
+                DatabaseQueueMessage.Type.INFLIGHT,
+                message.getQueueMessageId() ) );
+
+        // there must be an audit log record of the successful get operation
+
+        AuditLogSerialization auditLogSerialization = 
getInjector().getInstance( AuditLogSerialization.class );
+        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( 
message.getMessageId() );
+        Assert.assertEquals( 1, auditLogs.getEntities().size() );
+        Assert.assertEquals( AuditLog.Status.SUCCESS, 
auditLogs.getEntities().get(0).getStatus()  );
+        Assert.assertEquals( AuditLog.Action.GET,     
auditLogs.getEntities().get(0).getAction()  );
+    }
+
+
+    @Test
+    public void ackQueueMessage() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+        QueueManager queueManager     = getInjector().getInstance( 
QueueManager.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+        String queueName = "qat_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        queueManager.createQueue( new Queue( queueName ) );
+
+        // write message to messages_inflight table
+
+        DatabaseQueueMessage message = new DatabaseQueueMessage(
+                QakkaUtils.getTimeUuid(),
+                DatabaseQueueMessage.Type.INFLIGHT,
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,
+                System.currentTimeMillis(),
+                null,
+                queueMessageId);
+        qms.writeMessage( message );
+
+        // ack message
+
+        QueueActorHelper helper = getInjector().getInstance( 
QueueActorHelper.class );
+        helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+
+        // message must be gone from messages_available table
+
+        Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), 
DatabaseQueueMessage.Type.INFLIGHT ));
+
+        // message must be gone from messages_inflight table
+
+        Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), 
DatabaseQueueMessage.Type.DEFAULT ));
+
+        // there should be an audit log record of the successful ack operation
+
+        AuditLogSerialization auditLogSerialization = 
getInjector().getInstance( AuditLogSerialization.class );
+        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( 
message.getMessageId() );
+        Assert.assertEquals( 1, auditLogs.getEntities().size() );
+        Assert.assertEquals( AuditLog.Status.SUCCESS, 
auditLogs.getEntities().get(0).getStatus()  );
+        Assert.assertEquals( AuditLog.Action.ACK,     
auditLogs.getEntities().get(0).getAction()  );
+    }
+
+
+    @Test
+    public void ackQueueMessageNotFound() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+        QueueManager queueManager     = getInjector().getInstance( 
QueueManager.class );
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        String queueName = "qat_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        queueManager.createQueue( new Queue( queueName ) );
+
+        // don't write message, just make up some bogus IDs
+
+        UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+        // ack message must fail
+
+        QueueActorHelper helper = getInjector().getInstance( 
QueueActorHelper.class );
+        Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST, 
helper.ackQueueMessage( queueName, queueMessageId ));
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
new file mode 100644
index 0000000..5f0be53
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import 
org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class QueueReaderTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueReaderTest.class );
+
+    
+    
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        getInjector().getInstance( App.class ); // init the INJECTOR 
+        
+        QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        ShardSerialization shardSerialization = getInjector().getInstance( 
ShardSerialization.class );
+
+        int numMessages = 200;
+        // create queue messages, only first lot get queue message data
+        
+        QueueMessageSerialization serialization = getInjector().getInstance( 
QueueMessageSerialization.class );
+        String queueName = "qrt_queue_" + 
RandomStringUtils.randomAlphanumeric( 10 );
+        
+        Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+                Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( newShard );
+
+        for ( int i=0; i<numMessages; i++ ) {
+
+            UUID messageId = QakkaUtils.getTimeUuid();
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage( 
+                    messageId,
+                    DatabaseQueueMessage.Type.DEFAULT, 
+                    queueName, 
+                    actorSystemFig.getRegionLocal(),
+                    null, 
+                    System.currentTimeMillis(), 
+                    null, 
+                    queueMessageId);
+            serialization.writeMessage( message ); 
+        }
+
+        InMemoryQueue inMemoryQueue = getInjector().getInstance( 
InMemoryQueue.class );
+        Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+
+        // run the QueueRefresher to fill up the in-memory queue
+
+        ActorSystem system = ActorSystem.create("Test-" + queueName);
+        ActorRef queueReaderRef = system.actorOf( Props.create( 
QueueRefresher.class, queueName ), "queueReader");
+        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( 
queueName );
+        queueReaderRef.tell( refreshRequest, null ); // tell sends message, 
returns immediately
+    
+        // need to wait for refresh to complete
+        int maxRetries = 10;
+        int retries = 0;
+        while ( inMemoryQueue.size( queueName ) < 
qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
+            Thread.sleep(1000);     
+        }
+        
+        Assert.assertEquals( numMessages, inMemoryQueue.size( queueName ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
new file mode 100644
index 0000000..511b059
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import 
org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
+import 
org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class QueueTimeouterTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueTimeouterTest.class );
+
+    
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        QakkaFig qakkaFig             = getInjector().getInstance( 
QakkaFig.class );
+        ActorSystemFig actorSystemFig = getInjector().getInstance( 
ActorSystemFig.class );
+        QueueMessageSerialization qms = getInjector().getInstance( 
QueueMessageSerialization.class );
+        ShardSerialization shardSerialization = getInjector().getInstance( 
ShardSerialization.class );
+        
+        // create records in inflight table, with some being old enough to 
time out 
+
+        int numInflight = 200; // number of messages to be put into timeout 
table
+        int numTimedout = 75;  // number of messages to be timedout
+        
+        long timeoutMs = qakkaFig.getQueueTimeoutSeconds()*1000;
+
+        String queueName = "qtt_queue_" + 
RandomStringUtils.randomAlphanumeric( 20 );
+
+        Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+                Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( newShard );
+
+        newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+                Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( newShard );
+
+        for ( int i=0; i<numInflight; i++ ) {
+           
+            long created = System.currentTimeMillis();
+            created = i < numTimedout ? created - timeoutMs: created + 
timeoutMs;
+
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
+            
+            UUID messageId = QakkaUtils.getTimeUuid();
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
+                    messageId,
+                    DatabaseQueueMessage.Type.INFLIGHT,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    created,
+                    created,
+                    queueMessageId );
+            
+            qms.writeMessage( message );
+        }
+
+        List<DatabaseQueueMessage> inflightMessages = 
getDatabaseQueueMessages( 
+                cassandraClient, queueName, actorSystemFig.getRegionLocal(), 
Shard.Type.INFLIGHT );
+        Assert.assertEquals( numInflight, inflightMessages.size() );
+        
+        // run timeouter actor
+
+        ActorSystem system = ActorSystem.create("Test-" + queueName);
+        ActorRef timeouterRef = system.actorOf( Props.create( 
QueueTimeouter.class, queueName ), "timeouter");
+        QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
+        timeouterRef.tell( qtr, null ); // tell sends message, returns 
immediately
+        
+        Thread.sleep( timeoutMs );
+
+        // timed out messages should have been moved into available (DEFAULT) 
table
+        
+        List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages( 
+                cassandraClient, queueName, actorSystemFig.getRegionLocal(), 
Shard.Type.DEFAULT);
+        Assert.assertEquals( numTimedout, queuedMessages.size() );
+
+        // and there should still be some messages in the INFLIGHT table
+
+        inflightMessages = getDatabaseQueueMessages( 
+                cassandraClient, queueName, actorSystemFig.getRegionLocal(), 
Shard.Type.INFLIGHT );
+        Assert.assertEquals( numInflight - numTimedout, 
inflightMessages.size() );
+
+    }
+
+    private List<DatabaseQueueMessage> getDatabaseQueueMessages(
+            CassandraClient cassandraClient, String queueName, String region, 
Shard.Type type ) {
+        
+        ShardIterator shardIterator = new ShardIterator(
+                cassandraClient, queueName, region, type, Optional.empty() );
+
+        DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( type ) 
?
+                DatabaseQueueMessage.Type.DEFAULT : 
DatabaseQueueMessage.Type.INFLIGHT;
+        
+        MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
+                cassandraClient, queueName, region, dbqmType, shardIterator, 
null);
+
+        List<DatabaseQueueMessage> inflightMessages = new ArrayList<>(2000);
+        while ( multiShardIterator.hasNext() && inflightMessages.size() < 2000 
) {
+            DatabaseQueueMessage message = multiShardIterator.next();
+            inflightMessages.add( message );
+        }
+        return inflightMessages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
new file mode 100644
index 0000000..3dbd980
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.distributed.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.*;
+import 
org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Optional;
+
+
+public class ShardAllocatorTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
QueueReaderTest.class );
+
+
+    protected Injector myInjector = null;
+
+    @Override
+    protected Injector getInjector() {
+        if ( myInjector == null ) {
+            myInjector = Guice.createInjector( new QakkaModule() );
+        }
+        return myInjector;
+    }
+
+
+    @Test
+    public void testBasicOperation() throws InterruptedException {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ShardSerialization shardSer        = getInjector().getInstance( 
ShardSerialization.class );
+        QakkaFig qakkaFig        = getInjector().getInstance( QakkaFig.class );
+        ActorSystemFig            actorSystemFig  = getInjector().getInstance( 
ActorSystemFig.class );
+        ShardCounterSerialization shardCounterSer = getInjector().getInstance( 
ShardCounterSerialization.class );
+
+        String rando = RandomStringUtils.randomAlphanumeric( 20 );
+
+        String queueName = "queue_" + rando;
+        String region = actorSystemFig.getRegionLocal();
+
+        // Create a set of shards, each with max count
+
+        Shard lastShard = null;
+
+        int numShards = 4;
+        long maxPerShard = qakkaFig.getMaxShardSize();
+
+        for ( long shardId = 1; shardId < numShards + 1; shardId++ ) {
+
+            Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, 
shardId, QakkaUtils.getTimeUuid());
+            shardSer.createShard( shard );
+
+            if ( shardId != numShards ) {
+                shardCounterSer.incrementCounter( queueName, 
Shard.Type.DEFAULT, shardId, maxPerShard );
+
+            } else {
+                // Create last shard with %20 less than max
+                shardCounterSer.incrementCounter( queueName, 
Shard.Type.DEFAULT, shardId, (long)(0.8 * maxPerShard) );
+                lastShard = shard;
+            }
+
+            Thread.sleep( 10 );
+        }
+
+        Assert.assertEquals( numShards, countShards(
+                cassandraClient, shardCounterSer, queueName, region, 
Shard.Type.DEFAULT ));
+
+        // Run shard allocator actor by sending message to it
+
+        ActorSystem system = ActorSystem.create("Test-" + queueName);
+        ActorRef shardAllocRef = system.actorOf( Props.create( 
ShardAllocator.class, queueName ), "shardallocator");
+
+        ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
+        shardAllocRef.tell( checkRequest, null ); // tell sends message, 
returns immediately
+        Thread.sleep(1000);
+
+        // Test that no new shards created
+
+        Assert.assertEquals( numShards, countShards(
+                cassandraClient, shardCounterSer, queueName, region, 
Shard.Type.DEFAULT ));
+
+        // Increment last shard by 20% of max
+
+        shardCounterSer.incrementCounter(
+                queueName, Shard.Type.DEFAULT, lastShard.getShardId(), 
(long)(0.2 * maxPerShard) );
+
+        // Run shard allocator again
+
+        shardAllocRef.tell( checkRequest, null ); // tell sends message, 
returns immediately
+        Thread.sleep(1000);
+
+        // Test that, this time, a new shard was created
+
+        Assert.assertEquals( numShards + 1, countShards(
+                cassandraClient, shardCounterSer, queueName, region, 
Shard.Type.DEFAULT ));
+    }
+
+
+    int countShards(
+            CassandraClient cassandraClient,
+            ShardCounterSerialization scs,
+            String queueName,
+            String region,
+            Shard.Type type ) {
+
+        ShardIterator shardIterator =
+                new ShardIterator( cassandraClient, queueName, region, type, 
Optional.empty() );
+
+        int shardCount = 0;
+        while ( shardIterator.hasNext() ) {
+            Shard s = shardIterator.next();
+            shardCount++;
+            long counterValue = scs.getCounterValue( s.getQueueName(), type, 
s.getShardId() );
+            logger.debug("Shard {} {} is #{} has count={}", type, 
s.getShardId(), shardCount, counterValue );
+
+        }
+
+        return shardCount;
+    }
+
+
+    @Test
+    public void testBasicOperationWithMessages() throws InterruptedException {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        getInjector().getInstance( App.class ); // init the INJECTOR
+
+        ActorSystemFig      actorSystemFig        = getInjector().getInstance( 
ActorSystemFig.class );
+        QueueManager        queueManager          = getInjector().getInstance( 
QueueManager.class );
+        QueueMessageManager queueMessageManager   = getInjector().getInstance( 
QueueMessageManager.class );
+        DistributedQueueService distributedQueueService = 
getInjector().getInstance( DistributedQueueService.class );
+        ShardCounterSerialization shardCounterSer = getInjector().getInstance( 
ShardCounterSerialization.class );
+
+
+        String region = actorSystemFig.getRegionLocal();
+        App app = getInjector().getInstance( App.class );
+        app.start( "localhost", getNextAkkaPort(), region );
+
+        String rando = RandomStringUtils.randomAlphanumeric( 20 );
+        String queueName = "queue_" + rando;
+
+        queueManager.createQueue( new Queue( queueName ));
+
+        // Create 4000 messages
+
+        int numMessages = 4000;
+
+        for ( int i=0; i<numMessages; i++ ) {
+            queueMessageManager.sendMessages(
+                    queueName,
+                    Collections.singletonList( region ),
+                    null, // delay
+                    null, // expiration
+                    "application/json",
+                    DataType.serializeValue( "{}", 
ProtocolVersion.NEWEST_SUPPORTED ) );
+        }
+
+        distributedQueueService.refresh();
+        Thread.sleep(3000);
+
+        // Test that 8 shards were created
+
+        Assert.assertEquals( 8,
+                countShards( cassandraClient, shardCounterSer, queueName, 
region, Shard.Type.DEFAULT ));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
new file mode 100644
index 0000000..76e3279
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import 
org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
+    private static final Logger logger = LoggerFactory.getLogger( 
MultiShardDatabaseQueueMessageIteratorTest.class );
+
+    
+    @Test
+    public void testIterator() throws InterruptedException {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+
+        ShardSerialization shardSerialization = new ShardSerializationImpl( 
cassandraClient );
+        
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+        
+        Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, 
null);
+        Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, 
null);
+        Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, 
null);
+        Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, 
null);
+
+        shardSerialization.createShard(shard1);
+        shardSerialization.createShard(shard2);
+        shardSerialization.createShard(shard3);
+        shardSerialization.createShard(shard4);
+
+        final int numMessagesPerShard = 50;
+
+        // just do these separately to space out the time UUIDs per shard
+        for(int i=0; i < numMessagesPerShard; i++){
+
+            queueMessageSerialization.writeMessage( new 
DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", 
shard1.getShardId(), 
+                    System.currentTimeMillis(), null, null));
+            Thread.sleep(3);
+        }
+
+        for(int i=0; i < numMessagesPerShard; i++){
+
+            queueMessageSerialization.writeMessage( new 
DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", 
shard2.getShardId(), 
+                    System.currentTimeMillis(), null, null));
+            Thread.sleep(3);
+        }
+
+        for(int i=0; i < numMessagesPerShard; i++){
+
+            queueMessageSerialization.writeMessage( new 
DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", 
shard3.getShardId(), 
+                    System.currentTimeMillis(), null, null));
+            Thread.sleep(3);
+        }
+
+        for(int i=0; i < numMessagesPerShard; i++){
+
+            queueMessageSerialization.writeMessage( new 
DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT, "test", "region", 
shard4.getShardId(), 
+                    System.currentTimeMillis(), null, null));
+            Thread.sleep(3);
+        }
+
+
+        ShardIterator shardIterator = new ShardIterator(
+                cassandraClient, "test", "region", Shard.Type.DEFAULT, 
Optional.empty());
+        MultiShardMessageIterator iterator = new MultiShardMessageIterator( 
+                cassandraClient, "test", "region", 
DatabaseQueueMessage.Type.DEFAULT, shardIterator, null);
+
+        final AtomicInteger[] counts = { 
+                new AtomicInteger(0), new AtomicInteger(0), new 
AtomicInteger(0), new AtomicInteger(0) };
+        
+        iterator.forEachRemaining(message -> {
+            //logger.info("Shard ID: {}, DatabaseQueueMessage ID: {}", 
message.getShardId(), message.getMessageId());
+            counts[ (int)(message.getShardId() - 1) ] .incrementAndGet();
+        });
+
+        logger.info("Total Count 1: {}", counts[0].get());
+        logger.info("Total Count 2: {}", counts[1].get());
+        logger.info("Total Count 3: {}", counts[2].get());
+        logger.info("Total Count 4: {}", counts[3].get());
+
+        assertEquals(numMessagesPerShard, counts[0].get());
+        assertEquals(numMessagesPerShard, counts[1].get());
+        assertEquals(numMessagesPerShard, counts[2].get());
+        assertEquals(numMessagesPerShard, counts[3].get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
new file mode 100644
index 0000000..072fd94
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlogs;
+
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import 
org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+
+public class AuditLogSerializationTest extends AbstractTest {
+
+    @Test
+    public void testRecordAuditLog() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+        AuditLogSerialization logSerialization = getInjector().getInstance( 
AuditLogSerialization.class );
+
+        // record some audit logs for a message
+        UUID messageId = UUIDGen.getTimeUUID();
+        String queueName = "alst_queue_" + 
RandomStringUtils.randomAlphanumeric( 15 );
+        String source = RandomStringUtils.randomAlphanumeric( 15 );
+        String dest = RandomStringUtils.randomAlphanumeric( 15 );
+        
+        logSerialization.recordAuditLog( AuditLog.Action.GET, 
AuditLog.Status.SUCCESS, 
+            queueName, dest, messageId, UUIDGen.getTimeUUID() );
+           
+        // get audit logs for that message
+        Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
+        Assert.assertEquals( 1, result.getEntities().size() );
+    }
+
+    @Test
+    public void testGetAuditLogs() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        AuditLogSerialization logSerialization = getInjector().getInstance( 
AuditLogSerialization.class );
+
+        // record some audit logs for a message
+        UUID messageId = UUIDGen.getTimeUUID();
+        String queueName = "alst_queue_" + 
RandomStringUtils.randomAlphanumeric( 15 );
+        String source = RandomStringUtils.randomAlphanumeric( 15 );
+        String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+        int numLogs = 10;
+        
+        UUID queueMessageId1 = UUIDGen.getTimeUUID();
+        for ( int i=0; i<numLogs; i++ ) {
+            logSerialization.recordAuditLog( AuditLog.Action.GET, 
AuditLog.Status.SUCCESS,
+                    queueName, dest, messageId, queueMessageId1 );
+            Thread.sleep(5); 
+        }
+        
+        UUID queueMessageId2 = UUIDGen.getTimeUUID();
+        for ( int i=0; i<numLogs; i++ ) {
+            logSerialization.recordAuditLog( AuditLog.Action.GET, 
AuditLog.Status.SUCCESS,
+                    queueName, dest, messageId, queueMessageId2 );
+            Thread.sleep(5);
+        }
+
+        UUID queueMessageId3 = UUIDGen.getTimeUUID();
+        for ( int i=0; i<numLogs; i++ ) {
+            logSerialization.recordAuditLog( AuditLog.Action.GET, 
AuditLog.Status.SUCCESS,
+                    queueName, dest, messageId, queueMessageId3 );
+            Thread.sleep(5);
+        }
+
+        // test that we have 3 X number of logs for the messageId
+        Result<AuditLog> result = logSerialization.getAuditLogs( messageId );
+        Assert.assertEquals( numLogs * 3, result.getEntities().size() );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
new file mode 100644
index 0000000..4ea6de3
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+public class DatabaseQueueMessageSerializationTest extends AbstractTest {
+
+
+    static class ThingToSave implements Serializable {
+        String value;
+    }
+
+
+    @Test
+    public void writeNewMessage(){
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        DatabaseQueueMessage message1 = new 
DatabaseQueueMessage(QakkaUtils.getTimeUuid(),
+                DatabaseQueueMessage.Type.DEFAULT, "test", "region1",
+                shard1.getShardId(), System.currentTimeMillis(), null, null);
+
+        UUID queueMessageId = queueMessageSerialization.writeMessage(message1);
+    }
+
+    @Test
+    public void deleteMessage(){
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+        String queueName = "dqmst_queue_" + 
RandomStringUtils.randomAlphanumeric( 20 );
+
+        DatabaseQueueMessage message = new DatabaseQueueMessage(
+                messageId,
+                DatabaseQueueMessage.Type.DEFAULT,
+                queueName,
+                "dummy_region",
+                shard1.getShardId(),
+                System.currentTimeMillis(),
+                null, null );
+
+        UUID queueMessageId = queueMessageSerialization.writeMessage( message 
);
+
+        queueMessageSerialization.deleteMessage(
+            queueName,
+            "dummy_region",
+            shard1.getShardId(),
+            DatabaseQueueMessage.Type.DEFAULT,
+            queueMessageId );
+
+        assertNull( queueMessageSerialization.loadMessage(
+            queueName,
+            "dummy_region",
+            shard1.getShardId(),
+            DatabaseQueueMessage.Type.DEFAULT,
+            queueMessageId
+        ));
+    }
+
+
+    @Test
+    public void loadNullMessage(){
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        assertNull( queueMessageSerialization.loadMessage(
+                RandomStringUtils.randomAlphanumeric( 20 ),
+                "dummy_region",
+                shard1.getShardId(),
+                DatabaseQueueMessage.Type.DEFAULT,
+                null
+        ));
+    }
+
+
+    @Test
+    public void writeNewMessageData(){
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+
+        final String data = "my test data";
+
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+                DataType.serializeValue(data, 
ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+        queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+        final DatabaseQueueMessageBody returnedData = 
queueMessageSerialization.loadMessageData( messageId );
+    }
+
+
+    @Test
+    public void loadMessageData() throws Exception {
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+
+        final String data = "my test data";
+
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody( DataType.serializeValue(data,
+                ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+        queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+        final DatabaseQueueMessageBody returnedBody = 
queueMessageSerialization.loadMessageData( messageId );
+        String returnedData = new String( returnedBody.getBlob().array(), 
"UTF-8");
+
+        assertEquals(data, returnedData);
+    }
+
+
+    @Test
+    public void loadMessageObjectData() throws Exception {
+
+        QueueMessageSerialization queueMessageSerialization =
+            getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+
+        final String data = "my test data";
+
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody( DataType.serializeValue(data,
+            ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+        queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+        final DatabaseQueueMessageBody returnedBody = 
queueMessageSerialization.loadMessageData( messageId );
+        String returnedData = new String( returnedBody.getBlob().array(), 
"UTF-8");
+
+        assertEquals(data, returnedData);
+    }
+
+
+
+
+    @Test
+    public void deleteMessageData() throws UnsupportedEncodingException {
+
+        QueueMessageSerialization queueMessageSerialization =
+                getInjector().getInstance( QueueMessageSerialization.class );
+
+        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, 
null);
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+
+        final String data = "my test data";
+
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody( DataType.serializeValue(data,
+                ProtocolVersion.NEWEST_SUPPORTED), "text/plain");
+
+        queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+        final DatabaseQueueMessageBody returnedBody = 
queueMessageSerialization.loadMessageData( messageId );
+        final String returnedData = new String( 
returnedBody.getBlob().array(), "UTF-8");
+
+        assertEquals(data, returnedData);
+
+        queueMessageSerialization.deleteMessageData(messageId);
+
+        assertNull(queueMessageSerialization.loadMessageData( messageId ));
+
+
+    }
+
+
+    /**
+     * Persist to blob using Java serialization.
+     */
+    @Test
+    public void persistJavaObjectData() throws Exception {
+
+        QueueMessageSerialization queueMessageSerialization =
+            getInjector().getInstance( QueueMessageSerialization.class );
+
+        // serialize Java object to byte buffer
+
+        final ThingToSave data = new ThingToSave();
+        data.value = "my test data";
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(bos);
+        oos.writeObject(data);
+        oos.flush();
+        oos.close();
+        ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() );
+
+        // write to Cassandra
+
+        final DatabaseQueueMessageBody messageBody = new 
DatabaseQueueMessageBody(
+            byteBuffer,"application/octet-stream");
+
+        UUID messageId = QakkaUtils.getTimeUuid();
+        queueMessageSerialization.writeMessageData(messageId, messageBody);
+
+        // load from Cassandra
+
+        final DatabaseQueueMessageBody returnedBody = 
queueMessageSerialization.loadMessageData( messageId );
+
+        // deserialize byte buffer
+
+        ByteBuffer messageData = returnedBody.getBlob();
+        ByteArrayInputStream bais = new ByteArrayInputStream( 
messageData.array() );
+
+        // throws exception -> java.io.StreamCorruptedException: invalid 
stream header: 00000000
+        ObjectInputStream ios = new ObjectInputStream( bais );
+        ThingToSave returnedData = (ThingToSave)ios.readObject();
+
+        assertEquals( data.value, returnedData.value );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
new file mode 100644
index 0000000..4690a1a
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class DatabaseQueueSerializationTest extends AbstractTest {
+
+    @Test
+    public void writeQueue(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+        QueueSerialization queueSerialization = getInjector().getInstance( 
QueueSerialization.class );
+
+        DatabaseQueue queue = new DatabaseQueue("test", "west", "west", 0L, 0, 
0, "test_dlq");
+
+        queueSerialization.writeQueue(queue);
+
+    }
+
+    @Test
+    public void loadQueue(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+        QueueSerialization queueSerialization = getInjector().getInstance( 
QueueSerialization.class );
+        
+        DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 
0, 0, "test_dlq");
+
+        queueSerialization.writeQueue(queue);
+        DatabaseQueue returnedQueue = queueSerialization.getQueue("test1");
+
+        assertEquals(queue, returnedQueue);
+
+    }
+
+    @Test
+    public void deleteQueue(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+        QueueSerialization queueSerialization = getInjector().getInstance( 
QueueSerialization.class );
+        
+        DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 
0, 0, "test_dlq");
+
+        queueSerialization.writeQueue(queue);
+        DatabaseQueue returnedQueue = queueSerialization.getQueue("test1");
+
+        assertEquals(queue, returnedQueue);
+
+        queueSerialization.deleteQueue(queue.getName());
+
+        assertNull(queueSerialization.getQueue("test1"));
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
new file mode 100644
index 0000000..3152025
--- /dev/null
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+public class ShardCounterSerializationTest extends AbstractTest {
+
+    
+    @Test
+    public void testBasicOperation() throws Exception {
+
+        CassandraClient cassandraClient = getInjector().getInstance( 
CassandraClientImpl.class );
+        cassandraClient.getSession();
+
+
+        ShardCounterSerialization scs = getInjector().getInstance( 
ShardCounterSerialization.class ); 
+       
+        String queueName = "scst_queue_" + 
RandomStringUtils.randomAlphanumeric( 20 );
+        long shardId = 100L;
+       
+        try {
+            scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId );
+            fail("Should have throw NotFoundException");
+        } catch ( NotFoundException expected ) {
+            // pass 
+        }
+
+        scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 10 );
+        Assert.assertEquals( 10, scs.getCounterValue( queueName, 
Shard.Type.DEFAULT, shardId ) );
+        
+        scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 50 );
+        Assert.assertEquals( 60, scs.getCounterValue( queueName, 
Shard.Type.DEFAULT, shardId ) );
+        
+        scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 150 );
+        Assert.assertEquals( 210, scs.getCounterValue( queueName, 
Shard.Type.DEFAULT, shardId ) );
+    }
+
+}
\ No newline at end of file

Reply via email to