http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java new file mode 100644 index 0000000..c154067 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + + +public class QueueMessageManagerTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerTest.class ); + + // TODO: test that multiple threads pulling from same queue will never pop same item + + protected Injector myInjector = null; + + @Override + protected Injector getInjector() { + if ( myInjector == null ) { + myInjector = Guice.createInjector( new QakkaModule() ); + } + return myInjector; + } + + + @Test + public void testBasicOperation() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + // create queue and send one message to it + String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class ); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + String jsonData = "{}"; + qmm.sendMessages( queueName, Collections.singletonList(region), null, null, + "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) ); + + distributedQueueService.refresh(); + Thread.sleep(1000); + + // get message from the queue + List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 ); + Assert.assertEquals( 1, messages.size() ); + QueueMessage message = messages.get(0); + + // test that queue message data is present and correct + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() ); + Assert.assertNotNull( data ); + Assert.assertEquals( "application/json", data.getContentType() ); + String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") ); + Assert.assertEquals( jsonData, jsonDataReturned ); + + // test that transfer log is empty for our queue + TransferLogSerialization tlogs = getInjector().getInstance( TransferLogSerialization.class ); + Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 ); + List<TransferLog> logs = all.getEntities().stream() + .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); + Assert.assertTrue( logs.isEmpty() ); + + // ack the message + qmm.ackMessage( queueName, message.getQueueMessageId() ); + + // test that message is no longer stored in non-replicated keyspace + + Assert.assertNull( qms.loadMessage( queueName, region, null, + DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() )); + + Assert.assertNull( qms.loadMessage( queueName, region, null, + DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() )); + + // test that audit log entry was written + AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 3, auditLogs.getEntities().size() ); + } + + + @Test + public void testQueueMessageTimeouts() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class ); + QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + // create some number of queue messages + + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class ); + String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + + int numMessages = 40; + + for ( int i=0; i<numMessages; i++ ) { + qmm.sendMessages( + queueName, + Collections.singletonList( region ), + null, // delay + null, // expiration + "application/json", + DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); + } + + distributedQueueService.refresh(); + Thread.sleep(1000); + + // get all messages from queue + + List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages ); + Assert.assertEquals( numMessages, messages.size() ); + + // ack half of the messages + + List<QueueMessage> remove = new ArrayList<>(); + + for ( int i=0; i<numMessages/2; i++ ) { + QueueMessage queueMessage = messages.get( i ); + qmm.ackMessage( queueName, queueMessage.getQueueMessageId() ); + remove.add( queueMessage ); + } + + for ( QueueMessage message : remove ) { + messages.remove( message ); + } + + // wait for twice timeout period + + Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 ); + + distributedQueueService.processTimeouts(); + + Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 ); + + // attempt to ack other half of messages + + for ( QueueMessage message : messages ) { + try { + qmm.ackMessage( queueName, message.getQueueMessageId() ); + Assert.fail("Message should have timed out by now"); + + } catch ( QakkaRuntimeException expected ) { + // keep on going... + } + } + } + + + @Test + public void testGetWithMissingData() throws InterruptedException { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + getInjector().getInstance( App.class ); // init the INJECTOR + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + DistributedQueueService qas = getInjector().getInstance( DistributedQueueService.class ); + QueueManager qm = getInjector().getInstance( QueueManager.class ); + QueueMessageManager qmm = getInjector().getInstance( QueueMessageManager.class ); + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + // create queue messages, every other one with missing data + + int numMessages = 100; + String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + qm.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + + for ( int i=0; i<numMessages; i++ ) { + + final UUID messageId = QakkaUtils.getTimeUuid(); + + if ( i % 2 == 0 ) { // every other it + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); + qms.writeMessageData( messageId, messageBody ); + } + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + DatabaseQueueMessage message = new DatabaseQueueMessage( + messageId, + DatabaseQueueMessage.Type.DEFAULT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + qms.writeMessage( message ); + } + + qas.refresh(); + Thread.sleep(1000); + + int count = 0; + while ( count < numMessages / 2 ) { + List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 ); + Assert.assertTrue( !messages.isEmpty() ); + count += messages.size(); + logger.debug("Got {} messages", ++count); + } + } + +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java new file mode 100644 index 0000000..829ba27 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.core.Queue; +import org.apache.usergrid.persistence.qakka.core.QueueManager; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.UUID; + + +public class QueueActorServiceTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( QueueActorServiceTest.class ); + + protected Injector myInjector = null; + + @Override + protected Injector getInjector() { + if ( myInjector == null ) { + myInjector = Guice.createInjector( new QakkaModule() ); + } + return myInjector; + } + + + @Test + public void testBasicOperation() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class ); + QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class ); + + String queueName = "testqueue_" + UUID.randomUUID(); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + + // send 1 queue message, get back one queue message + UUID messageId = UUIDGen.getTimeUUID(); + + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); + serialization.writeMessageData( messageId, messageBody ); + + distributedQueueService.sendMessageToRegion( + queueName, region, region, messageId, null, null); + + distributedQueueService.refresh(); + Thread.sleep(1000); + + Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 ); + Assert.assertEquals( 1, qmReturned.size() ); + + DatabaseQueueMessage dqm = qmReturned.iterator().next(); + DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() ); + ByteBuffer blob = dqmb.getBlob(); + + String returnedData = new String( blob.array(), "UTF-8"); +// ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() ); +// ObjectInputStream ios = new ObjectInputStream( bais ); +// String returnedData = (String)ios.readObject(); + + Assert.assertEquals( data, returnedData ); + + } + + + @Test + public void testGetMultipleQueueMessages() throws InterruptedException { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + + App app = getInjector().getInstance( App.class ); + app.start("localhost", getNextAkkaPort(), region); + + DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class ); + QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class ); + InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class ); + + String queueName = "testqueue_" + UUID.randomUUID(); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + queueManager.createQueue( + new Queue( queueName, "test-type", region, region, 0L, 5, 10, null )); + + for ( int i=0; i<100; i++ ) { + + UUID messageId = UUIDGen.getTimeUUID(); + + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); + serialization.writeMessageData( messageId, messageBody ); + + distributedQueueService.sendMessageToRegion( + queueName, region, region, messageId , null, null); + } + + int maxRetries = 15; + int retries = 0; + while ( retries++ < maxRetries ) { + distributedQueueService.refresh(); + Thread.sleep( 3000 ); + if (inMemoryQueue.size( queueName ) == 100) { + break; + } + } + + Assert.assertEquals( 100, inMemoryQueue.size( queueName ) ); + + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); + + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); + + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); + + Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); + Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java new file mode 100644 index 0000000..9e4128e --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + + +public class QueueActorHelperTest extends AbstractTest { + + protected Injector myInjector = null; + + @Override + protected Injector getInjector() { + if ( myInjector == null ) { + myInjector = Guice.createInjector( new QakkaModule() ); + } + return myInjector; + } + + + @Test + public void loadDatabaseQueueMessage() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + getInjector().getInstance( App.class ); // init the INJECTOR + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + // write message + + DatabaseQueueMessage message = new DatabaseQueueMessage( + QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + qms.writeMessage( message ); + + // load message + + QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class ); + DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( + queueName, message.getQueueMessageId(), message.getType() ); + + Assert.assertNotNull( queueMessage ); + } + + + @Test + public void loadDatabaseQueueMessageNotFound() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + getInjector().getInstance( App.class ); // init the INJECTOR + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); + + // don't write any message + + // load message + + QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class ); + DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage( + queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ); + + Assert.assertNull( queueMessage ); + } + + + @Test + public void putInflight() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + getInjector().getInstance( App.class ); // init the INJECTOR + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + // write message to messages_available table + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); + + DatabaseQueueMessage message = new DatabaseQueueMessage( + QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + qms.writeMessage( message ); + + // put message inflight + + QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class ); + helper.putInflight( queueName, message ); + + // message must be gone from messages_available table + + Assert.assertNull( qms.loadMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.DEFAULT, + message.getQueueMessageId() ) ); + + // message must be present in messages_inflight table + + Assert.assertNotNull( qms.loadMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.INFLIGHT, + message.getQueueMessageId() ) ); + + // there must be an audit log record of the successful get operation + + AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 1, auditLogs.getEntities().size() ); + Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() ); + Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get(0).getAction() ); + } + + + @Test + public void ackQueueMessage() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + getInjector().getInstance( App.class ); // init the INJECTOR + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); + + // write message to messages_inflight table + + DatabaseQueueMessage message = new DatabaseQueueMessage( + QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.INFLIGHT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + qms.writeMessage( message ); + + // ack message + + QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class ); + helper.ackQueueMessage( queueName, message.getQueueMessageId() ); + + // message must be gone from messages_available table + + Assert.assertNull( helper.loadDatabaseQueueMessage( + queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT )); + + // message must be gone from messages_inflight table + + Assert.assertNull( helper.loadDatabaseQueueMessage( + queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT )); + + // there should be an audit log record of the successful ack operation + + AuditLogSerialization auditLogSerialization = getInjector().getInstance( AuditLogSerialization.class ); + Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() ); + Assert.assertEquals( 1, auditLogs.getEntities().size() ); + Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus() ); + Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get(0).getAction() ); + } + + + @Test + public void ackQueueMessageNotFound() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + getInjector().getInstance( App.class ); // init the INJECTOR + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + queueManager.createQueue( new Queue( queueName ) ); + + // don't write message, just make up some bogus IDs + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + // ack message must fail + + QueueActorHelper helper = getInjector().getInstance( QueueActorHelper.class ); + Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST, helper.ackQueueMessage( queueName, queueMessageId )); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java new file mode 100644 index 0000000..5f0be53 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + + +public class QueueReaderTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( QueueReaderTest.class ); + + + + @Test + public void testBasicOperation() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + getInjector().getInstance( App.class ); // init the INJECTOR + + QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class ); + + int numMessages = 200; + // create queue messages, only first lot get queue message data + + QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class ); + String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 ); + + Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( newShard ); + + for ( int i=0; i<numMessages; i++ ) { + + UUID messageId = QakkaUtils.getTimeUuid(); + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + DatabaseQueueMessage message = new DatabaseQueueMessage( + messageId, + DatabaseQueueMessage.Type.DEFAULT, + queueName, + actorSystemFig.getRegionLocal(), + null, + System.currentTimeMillis(), + null, + queueMessageId); + serialization.writeMessage( message ); + } + + InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class ); + Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); + + // run the QueueRefresher to fill up the in-memory queue + + ActorSystem system = ActorSystem.create("Test-" + queueName); + ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader"); + QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName ); + queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately + + // need to wait for refresh to complete + int maxRetries = 10; + int retries = 0; + while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) { + Thread.sleep(1000); + } + + Assert.assertEquals( numMessages, inMemoryQueue.size( queueName ) ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java new file mode 100644 index 0000000..511b059 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest; +import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + + +public class QueueTimeouterTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( QueueTimeouterTest.class ); + + + @Test + public void testBasicOperation() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + getInjector().getInstance( App.class ); // init the INJECTOR + + QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); + ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class ); + + // create records in inflight table, with some being old enough to time out + + int numInflight = 200; // number of messages to be put into timeout table + int numTimedout = 75; // number of messages to be timedout + + long timeoutMs = qakkaFig.getQueueTimeoutSeconds()*1000; + + String queueName = "qtt_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); + + Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(), + Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( newShard ); + + newShard = new Shard( queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( newShard ); + + for ( int i=0; i<numInflight; i++ ) { + + long created = System.currentTimeMillis(); + created = i < numTimedout ? created - timeoutMs: created + timeoutMs; + + UUID queueMessageId = QakkaUtils.getTimeUuid(); + + UUID messageId = QakkaUtils.getTimeUuid(); + DatabaseQueueMessage message = new DatabaseQueueMessage( + messageId, + DatabaseQueueMessage.Type.INFLIGHT, + queueName, + actorSystemFig.getRegionLocal(), + null, + created, + created, + queueMessageId ); + + qms.writeMessage( message ); + } + + List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT ); + Assert.assertEquals( numInflight, inflightMessages.size() ); + + // run timeouter actor + + ActorSystem system = ActorSystem.create("Test-" + queueName); + ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter"); + QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName ); + timeouterRef.tell( qtr, null ); // tell sends message, returns immediately + + Thread.sleep( timeoutMs ); + + // timed out messages should have been moved into available (DEFAULT) table + + List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT); + Assert.assertEquals( numTimedout, queuedMessages.size() ); + + // and there should still be some messages in the INFLIGHT table + + inflightMessages = getDatabaseQueueMessages( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT ); + Assert.assertEquals( numInflight - numTimedout, inflightMessages.size() ); + + } + + private List<DatabaseQueueMessage> getDatabaseQueueMessages( + CassandraClient cassandraClient, String queueName, String region, Shard.Type type ) { + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, region, type, Optional.empty() ); + + DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( type ) ? + DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT; + + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, dbqmType, shardIterator, null); + + List<DatabaseQueueMessage> inflightMessages = new ArrayList<>(2000); + while ( multiShardIterator.hasNext() && inflightMessages.size() < 2000 ) { + DatabaseQueueMessage message = multiShardIterator.next(); + inflightMessages.add( message ); + } + return inflightMessages; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java new file mode 100644 index 0000000..3dbd980 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Optional; + + +public class ShardAllocatorTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( QueueReaderTest.class ); + + + protected Injector myInjector = null; + + @Override + protected Injector getInjector() { + if ( myInjector == null ) { + myInjector = Guice.createInjector( new QakkaModule() ); + } + return myInjector; + } + + + @Test + public void testBasicOperation() throws InterruptedException { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + getInjector().getInstance( App.class ); // init the INJECTOR + + ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class ); + QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class ); + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + ShardCounterSerialization shardCounterSer = getInjector().getInstance( ShardCounterSerialization.class ); + + String rando = RandomStringUtils.randomAlphanumeric( 20 ); + + String queueName = "queue_" + rando; + String region = actorSystemFig.getRegionLocal(); + + // Create a set of shards, each with max count + + Shard lastShard = null; + + int numShards = 4; + long maxPerShard = qakkaFig.getMaxShardSize(); + + for ( long shardId = 1; shardId < numShards + 1; shardId++ ) { + + Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, shardId, QakkaUtils.getTimeUuid()); + shardSer.createShard( shard ); + + if ( shardId != numShards ) { + shardCounterSer.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, maxPerShard ); + + } else { + // Create last shard with %20 less than max + shardCounterSer.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, (long)(0.8 * maxPerShard) ); + lastShard = shard; + } + + Thread.sleep( 10 ); + } + + Assert.assertEquals( numShards, countShards( + cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT )); + + // Run shard allocator actor by sending message to it + + ActorSystem system = ActorSystem.create("Test-" + queueName); + ActorRef shardAllocRef = system.actorOf( Props.create( ShardAllocator.class, queueName ), "shardallocator"); + + ShardCheckRequest checkRequest = new ShardCheckRequest( queueName ); + shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately + Thread.sleep(1000); + + // Test that no new shards created + + Assert.assertEquals( numShards, countShards( + cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT )); + + // Increment last shard by 20% of max + + shardCounterSer.incrementCounter( + queueName, Shard.Type.DEFAULT, lastShard.getShardId(), (long)(0.2 * maxPerShard) ); + + // Run shard allocator again + + shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately + Thread.sleep(1000); + + // Test that, this time, a new shard was created + + Assert.assertEquals( numShards + 1, countShards( + cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT )); + } + + + int countShards( + CassandraClient cassandraClient, + ShardCounterSerialization scs, + String queueName, + String region, + Shard.Type type ) { + + ShardIterator shardIterator = + new ShardIterator( cassandraClient, queueName, region, type, Optional.empty() ); + + int shardCount = 0; + while ( shardIterator.hasNext() ) { + Shard s = shardIterator.next(); + shardCount++; + long counterValue = scs.getCounterValue( s.getQueueName(), type, s.getShardId() ); + logger.debug("Shard {} {} is #{} has count={}", type, s.getShardId(), shardCount, counterValue ); + + } + + return shardCount; + } + + + @Test + public void testBasicOperationWithMessages() throws InterruptedException { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + getInjector().getInstance( App.class ); // init the INJECTOR + + ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); + QueueManager queueManager = getInjector().getInstance( QueueManager.class ); + QueueMessageManager queueMessageManager = getInjector().getInstance( QueueMessageManager.class ); + DistributedQueueService distributedQueueService = getInjector().getInstance( DistributedQueueService.class ); + ShardCounterSerialization shardCounterSer = getInjector().getInstance( ShardCounterSerialization.class ); + + + String region = actorSystemFig.getRegionLocal(); + App app = getInjector().getInstance( App.class ); + app.start( "localhost", getNextAkkaPort(), region ); + + String rando = RandomStringUtils.randomAlphanumeric( 20 ); + String queueName = "queue_" + rando; + + queueManager.createQueue( new Queue( queueName )); + + // Create 4000 messages + + int numMessages = 4000; + + for ( int i=0; i<numMessages; i++ ) { + queueMessageManager.sendMessages( + queueName, + Collections.singletonList( region ), + null, // delay + null, // expiration + "application/json", + DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); + } + + distributedQueueService.refresh(); + Thread.sleep(3000); + + // Test that 8 shards were created + + Assert.assertEquals( 8, + countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT )); + + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java new file mode 100644 index 0000000..76e3279 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization; + +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** + * Created by russo on 6/9/16. + */ +public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { + private static final Logger logger = LoggerFactory.getLogger( MultiShardDatabaseQueueMessageIteratorTest.class ); + + + @Test + public void testIterator() throws InterruptedException { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null); + Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null); + Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null); + Shard shard4 = new Shard("test", "region", Shard.Type.DEFAULT, 4L, null); + + shardSerialization.createShard(shard1); + shardSerialization.createShard(shard2); + shardSerialization.createShard(shard3); + shardSerialization.createShard(shard4); + + final int numMessagesPerShard = 50; + + // just do these separately to space out the time UUIDs per shard + for(int i=0; i < numMessagesPerShard; i++){ + + queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(), + System.currentTimeMillis(), null, null)); + Thread.sleep(3); + } + + for(int i=0; i < numMessagesPerShard; i++){ + + queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(), + System.currentTimeMillis(), null, null)); + Thread.sleep(3); + } + + for(int i=0; i < numMessagesPerShard; i++){ + + queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(), + System.currentTimeMillis(), null, null)); + Thread.sleep(3); + } + + for(int i=0; i < numMessagesPerShard; i++){ + + queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(), + System.currentTimeMillis(), null, null)); + Thread.sleep(3); + } + + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty()); + MultiShardMessageIterator iterator = new MultiShardMessageIterator( + cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); + + final AtomicInteger[] counts = { + new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) }; + + iterator.forEachRemaining(message -> { + //logger.info("Shard ID: {}, DatabaseQueueMessage ID: {}", message.getShardId(), message.getMessageId()); + counts[ (int)(message.getShardId() - 1) ] .incrementAndGet(); + }); + + logger.info("Total Count 1: {}", counts[0].get()); + logger.info("Total Count 2: {}", counts[1].get()); + logger.info("Total Count 3: {}", counts[2].get()); + logger.info("Total Count 4: {}", counts[3].get()); + + assertEquals(numMessagesPerShard, counts[0].get()); + assertEquals(numMessagesPerShard, counts[1].get()); + assertEquals(numMessagesPerShard, counts[2].get()); + assertEquals(numMessagesPerShard, counts[3].get()); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java new file mode 100644 index 0000000..072fd94 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.auditlogs; + +import org.apache.cassandra.utils.UUIDGen; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + + +public class AuditLogSerializationTest extends AbstractTest { + + @Test + public void testRecordAuditLog() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class ); + + // record some audit logs for a message + UUID messageId = UUIDGen.getTimeUUID(); + String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); + String source = RandomStringUtils.randomAlphanumeric( 15 ); + String dest = RandomStringUtils.randomAlphanumeric( 15 ); + + logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, + queueName, dest, messageId, UUIDGen.getTimeUUID() ); + + // get audit logs for that message + Result<AuditLog> result = logSerialization.getAuditLogs( messageId ); + Assert.assertEquals( 1, result.getEntities().size() ); + } + + @Test + public void testGetAuditLogs() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class ); + + // record some audit logs for a message + UUID messageId = UUIDGen.getTimeUUID(); + String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); + String source = RandomStringUtils.randomAlphanumeric( 15 ); + String dest = RandomStringUtils.randomAlphanumeric( 15 ); + + int numLogs = 10; + + UUID queueMessageId1 = UUIDGen.getTimeUUID(); + for ( int i=0; i<numLogs; i++ ) { + logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, + queueName, dest, messageId, queueMessageId1 ); + Thread.sleep(5); + } + + UUID queueMessageId2 = UUIDGen.getTimeUUID(); + for ( int i=0; i<numLogs; i++ ) { + logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, + queueName, dest, messageId, queueMessageId2 ); + Thread.sleep(5); + } + + UUID queueMessageId3 = UUIDGen.getTimeUUID(); + for ( int i=0; i<numLogs; i++ ) { + logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, + queueName, dest, messageId, queueMessageId3 ); + Thread.sleep(5); + } + + // test that we have 3 X number of logs for the messageId + Result<AuditLog> result = logSerialization.getAuditLogs( messageId ); + Assert.assertEquals( numLogs * 3, result.getEntities().size() ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java new file mode 100644 index 0000000..4ea6de3 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.junit.Test; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +public class DatabaseQueueMessageSerializationTest extends AbstractTest { + + + static class ThingToSave implements Serializable { + String value; + } + + + @Test + public void writeNewMessage(){ + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + DatabaseQueueMessage message1 = new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region1", + shard1.getShardId(), System.currentTimeMillis(), null, null); + + UUID queueMessageId = queueMessageSerialization.writeMessage(message1); + } + + @Test + public void deleteMessage(){ + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + UUID messageId = QakkaUtils.getTimeUuid(); + String queueName = "dqmst_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); + + DatabaseQueueMessage message = new DatabaseQueueMessage( + messageId, + DatabaseQueueMessage.Type.DEFAULT, + queueName, + "dummy_region", + shard1.getShardId(), + System.currentTimeMillis(), + null, null ); + + UUID queueMessageId = queueMessageSerialization.writeMessage( message ); + + queueMessageSerialization.deleteMessage( + queueName, + "dummy_region", + shard1.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, + queueMessageId ); + + assertNull( queueMessageSerialization.loadMessage( + queueName, + "dummy_region", + shard1.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, + queueMessageId + )); + } + + + @Test + public void loadNullMessage(){ + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null); + + assertNull( queueMessageSerialization.loadMessage( + RandomStringUtils.randomAlphanumeric( 20 ), + "dummy_region", + shard1.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, + null + )); + } + + + @Test + public void writeNewMessageData(){ + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + UUID messageId = QakkaUtils.getTimeUuid(); + + final String data = "my test data"; + + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + DataType.serializeValue(data, ProtocolVersion.NEWEST_SUPPORTED), "text/plain"); + + queueMessageSerialization.writeMessageData(messageId, messageBody); + + final DatabaseQueueMessageBody returnedData = queueMessageSerialization.loadMessageData( messageId ); + } + + + @Test + public void loadMessageData() throws Exception { + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + UUID messageId = QakkaUtils.getTimeUuid(); + + final String data = "my test data"; + + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data, + ProtocolVersion.NEWEST_SUPPORTED), "text/plain"); + + queueMessageSerialization.writeMessageData(messageId, messageBody); + + final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId ); + String returnedData = new String( returnedBody.getBlob().array(), "UTF-8"); + + assertEquals(data, returnedData); + } + + + @Test + public void loadMessageObjectData() throws Exception { + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + UUID messageId = QakkaUtils.getTimeUuid(); + + final String data = "my test data"; + + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data, + ProtocolVersion.NEWEST_SUPPORTED), "text/plain"); + + queueMessageSerialization.writeMessageData(messageId, messageBody); + + final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId ); + String returnedData = new String( returnedBody.getBlob().array(), "UTF-8"); + + assertEquals(data, returnedData); + } + + + + + @Test + public void deleteMessageData() throws UnsupportedEncodingException { + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); + + UUID messageId = QakkaUtils.getTimeUuid(); + + final String data = "my test data"; + + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( DataType.serializeValue(data, + ProtocolVersion.NEWEST_SUPPORTED), "text/plain"); + + queueMessageSerialization.writeMessageData(messageId, messageBody); + + final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId ); + final String returnedData = new String( returnedBody.getBlob().array(), "UTF-8"); + + assertEquals(data, returnedData); + + queueMessageSerialization.deleteMessageData(messageId); + + assertNull(queueMessageSerialization.loadMessageData( messageId )); + + + } + + + /** + * Persist to blob using Java serialization. + */ + @Test + public void persistJavaObjectData() throws Exception { + + QueueMessageSerialization queueMessageSerialization = + getInjector().getInstance( QueueMessageSerialization.class ); + + // serialize Java object to byte buffer + + final ThingToSave data = new ThingToSave(); + data.value = "my test data"; + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(data); + oos.flush(); + oos.close(); + ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() ); + + // write to Cassandra + + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + byteBuffer,"application/octet-stream"); + + UUID messageId = QakkaUtils.getTimeUuid(); + queueMessageSerialization.writeMessageData(messageId, messageBody); + + // load from Cassandra + + final DatabaseQueueMessageBody returnedBody = queueMessageSerialization.loadMessageData( messageId ); + + // deserialize byte buffer + + ByteBuffer messageData = returnedBody.getBlob(); + ByteArrayInputStream bais = new ByteArrayInputStream( messageData.array() ); + + // throws exception -> java.io.StreamCorruptedException: invalid stream header: 00000000 + ObjectInputStream ios = new ObjectInputStream( bais ); + ThingToSave returnedData = (ThingToSave)ios.readObject(); + + assertEquals( data.value, returnedData.value ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java new file mode 100644 index 0000000..4690a1a --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queues; + +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Created by russo on 6/9/16. + */ +public class DatabaseQueueSerializationTest extends AbstractTest { + + @Test + public void writeQueue(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); + + DatabaseQueue queue = new DatabaseQueue("test", "west", "west", 0L, 0, 0, "test_dlq"); + + queueSerialization.writeQueue(queue); + + } + + @Test + public void loadQueue(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); + + DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq"); + + queueSerialization.writeQueue(queue); + DatabaseQueue returnedQueue = queueSerialization.getQueue("test1"); + + assertEquals(queue, returnedQueue); + + } + + @Test + public void deleteQueue(){ + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); + + DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq"); + + queueSerialization.writeQueue(queue); + DatabaseQueue returnedQueue = queueSerialization.getQueue("test1"); + + assertEquals(queue, returnedQueue); + + queueSerialization.deleteQueue(queue.getName()); + + assertNull(queueSerialization.getQueue("test1")); + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java new file mode 100644 index 0000000..3152025 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.AbstractTest; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.fail; + + +public class ShardCounterSerializationTest extends AbstractTest { + + + @Test + public void testBasicOperation() throws Exception { + + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + cassandraClient.getSession(); + + + ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class ); + + String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); + long shardId = 100L; + + try { + scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ); + fail("Should have throw NotFoundException"); + } catch ( NotFoundException expected ) { + // pass + } + + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 10 ); + Assert.assertEquals( 10, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); + + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 50 ); + Assert.assertEquals( 60, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); + + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 150 ); + Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); + } + +} \ No newline at end of file