http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java index 5bb06fd..9ebb841 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java @@ -25,6 +25,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -44,6 +45,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; public final static String TABLE_TRANSFER_LOG = "transfer_log"; @@ -65,7 +67,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { @Inject - public TransferLogSerializationImpl( CassandraClient cassandraClient ) { + public TransferLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.cassandraClient = cassandraClient; } @@ -80,7 +83,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { .value(COLUMN_DEST_REGION, dest ) .value(COLUMN_MESSAGE_ID, messageId ) .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() ); - cassandraClient.getSession().execute(insert); + cassandraClient.getApplicationSession().execute(insert); // logger.debug("Recorded transfer log for queue {} dest {} messageId {}", // queueName, dest, messageId); @@ -95,7 +98,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName )) .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest )) .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId )); - ResultSet rs = cassandraClient.getSession().execute( query ); + ResultSet rs = cassandraClient.getApplicationSession().execute( query ); if ( rs.getAvailableWithoutFetching() == 0 ) { StringBuilder sb = new StringBuilder(); @@ -109,7 +112,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName )) .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest )) .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId )); - cassandraClient.getSession().execute( deleteQuery ); + cassandraClient.getApplicationSession().execute( deleteQuery ); } @@ -123,7 +126,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { query.setPagingState( pagingState ); } - ResultSet rs = cassandraClient.getSession().execute( query ); + ResultSet rs = cassandraClient.getApplicationSession().execute( query ); final PagingState newPagingState = rs.getExecutionInfo().getPagingState(); final List<TransferLog> transferLogs = new ArrayList<>(); @@ -160,7 +163,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( new TableDefinitionStringImpl( TABLE_TRANSFER_LOG, CQL ) ); + return Collections.singletonList( + new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) ); }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java index 4c3e480..6f1c744 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java @@ -29,6 +29,7 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,11 +40,13 @@ public class AbstractTest { protected static Injector sharedInjector; + AtomicBoolean migrated = new AtomicBoolean( false ); + static { new KeyspaceDropper(); } public AbstractTest() { - if ( getInjector() == null ) { + if ( !migrated.getAndSet( true ) ) { setInjector( Guice.createInjector( new TestModule() ) ); MigrationManager migrationManager = getInjector().getInstance( MigrationManager.class ); try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java index aa4dfd1..e220650 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java @@ -33,13 +33,13 @@ import java.util.Properties; * Created by Dave Johnson ([email protected]) on 9/9/16. */ public class KeyspaceDropper { - + private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class ); - - static { dropTestKeyspace(); } - - public static void dropTestKeyspace() { + static { dropTestKeyspaces(); } + + + public static void dropTestKeyspaces() { String propsFileName = "qakka.properties"; @@ -50,10 +50,17 @@ public class KeyspaceDropper { throw new RuntimeException( "Unable to load " + propsFileName + " file!" ); } - String keyspace = (String)props.get("cassandra.keyspace.application"); + String keyspaceApp = (String)props.get("cassandra.keyspace.application"); + String keyspaceQueue = (String)props.get("cassandra.keyspace.queue-message"); String hosts[] = props.getProperty( "cassandra.hosts", "127.0.0.1" ).split(","); int port = Integer.parseInt( props.getProperty( "cassandra.port", "9042" )); + dropTestKeyspace( keyspaceApp, hosts, port ); + dropTestKeyspace( keyspaceQueue, hosts, port ); + } + + public static void dropTestKeyspace( String keyspace, String[] hosts, int port ) { + Cluster.Builder builder = Cluster.builder(); for ( String host : hosts ) { builder = builder.addContactPoint( host ).withPort( port ); @@ -67,4 +74,5 @@ public class KeyspaceDropper { logger.info("Dropping test keyspace: {}", keyspace); session.execute( "DROP KEYSPACE IF EXISTS " + keyspace ); } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java index 42423fa..e1f0c7e 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java @@ -30,13 +30,13 @@ import org.junit.Test; * Created by russo on 6/8/16. */ public class CassandraClientTest extends AbstractTest { - + @Test public void getClient(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - Session session = cassandraClient.getSession(); + Session session = cassandraClient.getApplicationSession(); session.getLoggedKeyspace(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java index 5a0feba..630c953 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java @@ -26,14 +26,13 @@ import com.google.inject.Injector; import net.jcip.annotations.NotThreadSafe; import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; -import org.apache.usergrid.persistence.qakka.QakkaFig; -import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; -import org.apache.usergrid.persistence.qakka.serialization.Result; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.Result; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; @@ -42,8 +41,8 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMe import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.apache.usergrid.persistence.queue.TestModule; -import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +73,6 @@ public class QueueMessageManagerTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); @@ -136,12 +134,12 @@ public class QueueMessageManagerTest extends AbstractTest { @Test + @Ignore public void testQueueMessageTimeouts() throws Exception { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); QakkaFig qakkaFig = injector.getInstance( QakkaFig.class ); @@ -225,12 +223,12 @@ public class QueueMessageManagerTest extends AbstractTest { @Test + @Ignore public void testGetWithMissingData() throws InterruptedException { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 182d5d6..a46c186 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -66,7 +66,6 @@ public class QueueActorServiceTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -119,7 +118,6 @@ public class QueueActorServiceTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java index 77c11e4..3bf352f 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java @@ -21,26 +21,26 @@ package org.apache.usergrid.persistence.qakka.distributed.actors; import com.google.inject.Guice; import com.google.inject.Injector; +import net.jcip.annotations.NotThreadSafe; import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.QakkaModule; import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.serialization.Result; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; -import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.apache.usergrid.persistence.queue.TestModule; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; import java.util.UUID; +@NotThreadSafe public class QueueActorHelperTest extends AbstractTest { @@ -54,7 +54,6 @@ public class QueueActorHelperTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR @@ -102,7 +101,6 @@ public class QueueActorHelperTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR @@ -136,7 +134,6 @@ public class QueueActorHelperTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR @@ -208,7 +205,6 @@ public class QueueActorHelperTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR @@ -272,7 +268,6 @@ public class QueueActorHelperTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java index b803f7e..0b8b795 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java @@ -53,7 +53,6 @@ public class QueueReaderTest extends AbstractTest { public void testBasicOperation() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); getInjector().getInstance( App.class ); // init the INJECTOR http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java index 511b059..54f9d42 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java @@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +52,12 @@ import java.util.UUID; public class QueueTimeouterTest extends AbstractTest { private static final Logger logger = LoggerFactory.getLogger( QueueTimeouterTest.class ); - + @Test + @Ignore public void testBasicOperation() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); getInjector().getInstance( App.class ); // init the INJECTOR @@ -64,12 +65,12 @@ public class QueueTimeouterTest extends AbstractTest { ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class ); QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class ); ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class ); - - // create records in inflight table, with some being old enough to time out + + // create records in inflight table, with some being old enough to time out int numInflight = 200; // number of messages to be put into timeout table int numTimedout = 75; // number of messages to be timedout - + long timeoutMs = qakkaFig.getQueueTimeoutSeconds()*1000; String queueName = "qtt_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); @@ -83,12 +84,12 @@ public class QueueTimeouterTest extends AbstractTest { shardSerialization.createShard( newShard ); for ( int i=0; i<numInflight; i++ ) { - + long created = System.currentTimeMillis(); created = i < numTimedout ? created - timeoutMs: created + timeoutMs; UUID queueMessageId = QakkaUtils.getTimeUuid(); - + UUID messageId = QakkaUtils.getTimeUuid(); DatabaseQueueMessage message = new DatabaseQueueMessage( messageId, @@ -99,32 +100,32 @@ public class QueueTimeouterTest extends AbstractTest { created, created, queueMessageId ); - + qms.writeMessage( message ); } - List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages( + List<DatabaseQueueMessage> inflightMessages = getDatabaseQueueMessages( cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT ); Assert.assertEquals( numInflight, inflightMessages.size() ); - + // run timeouter actor ActorSystem system = ActorSystem.create("Test-" + queueName); ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter"); QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName ); timeouterRef.tell( qtr, null ); // tell sends message, returns immediately - + Thread.sleep( timeoutMs ); // timed out messages should have been moved into available (DEFAULT) table - - List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages( + + List<DatabaseQueueMessage> queuedMessages = getDatabaseQueueMessages( cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.DEFAULT); Assert.assertEquals( numTimedout, queuedMessages.size() ); // and there should still be some messages in the INFLIGHT table - inflightMessages = getDatabaseQueueMessages( + inflightMessages = getDatabaseQueueMessages( cassandraClient, queueName, actorSystemFig.getRegionLocal(), Shard.Type.INFLIGHT ); Assert.assertEquals( numInflight - numTimedout, inflightMessages.size() ); @@ -132,13 +133,13 @@ public class QueueTimeouterTest extends AbstractTest { private List<DatabaseQueueMessage> getDatabaseQueueMessages( CassandraClient cassandraClient, String queueName, String region, Shard.Type type ) { - + ShardIterator shardIterator = new ShardIterator( cassandraClient, queueName, region, type, Optional.empty() ); DatabaseQueueMessage.Type dbqmType = Shard.Type.DEFAULT.equals( type ) ? DatabaseQueueMessage.Type.DEFAULT : DatabaseQueueMessage.Type.INFLIGHT; - + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( cassandraClient, queueName, region, dbqmType, shardIterator, null); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index d486c80..ae62c89 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService import org.apache.usergrid.persistence.queue.TestModule; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,6 @@ public class ShardAllocatorTest extends AbstractTest { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR @@ -163,12 +163,12 @@ public class ShardAllocatorTest extends AbstractTest { @Test + @Ignore public void testBasicOperationWithMessages() throws InterruptedException { Injector injector = getInjector(); CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); injector.getInstance( App.class ); // init the INJECTOR http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java index 76e3279..2d8da6d 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.core.CassandraClient; @@ -44,17 +45,17 @@ import static org.junit.Assert.assertEquals; public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { private static final Logger logger = LoggerFactory.getLogger( MultiShardDatabaseQueueMessageIteratorTest.class ); - + @Test public void testIterator() throws InterruptedException { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); - QueueMessageSerialization queueMessageSerialization = getInjector().getInstance( QueueMessageSerialization.class ); - + Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 1L, null); Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 2L, null); Shard shard3 = new Shard("test", "region", Shard.Type.DEFAULT, 3L, null); @@ -71,7 +72,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard1.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -79,7 +80,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard2.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -87,7 +88,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard3.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -95,7 +96,7 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { for(int i=0; i < numMessagesPerShard; i++){ queueMessageSerialization.writeMessage( new DatabaseQueueMessage(QakkaUtils.getTimeUuid(), - DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(), + DatabaseQueueMessage.Type.DEFAULT, "test", "region", shard4.getShardId(), System.currentTimeMillis(), null, null)); Thread.sleep(3); } @@ -103,12 +104,12 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { ShardIterator shardIterator = new ShardIterator( cassandraClient, "test", "region", Shard.Type.DEFAULT, Optional.empty()); - MultiShardMessageIterator iterator = new MultiShardMessageIterator( + MultiShardMessageIterator iterator = new MultiShardMessageIterator( cassandraClient, "test", "region", DatabaseQueueMessage.Type.DEFAULT, shardIterator, null); - final AtomicInteger[] counts = { + final AtomicInteger[] counts = { new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0), new AtomicInteger(0) }; - + iterator.forEachRemaining(message -> { //logger.info("Shard ID: {}, DatabaseQueueMessage ID: {}", message.getShardId(), message.getMessageId()); counts[ (int)(message.getShardId() - 1) ] .incrementAndGet(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java index 072fd94..4d60772 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/auditlogs/AuditLogSerializationTest.java @@ -39,7 +39,6 @@ public class AuditLogSerializationTest extends AbstractTest { public void testRecordAuditLog() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class ); @@ -48,10 +47,10 @@ public class AuditLogSerializationTest extends AbstractTest { String queueName = "alst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); String source = RandomStringUtils.randomAlphanumeric( 15 ); String dest = RandomStringUtils.randomAlphanumeric( 15 ); - - logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, + + logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, queueName, dest, messageId, UUIDGen.getTimeUUID() ); - + // get audit logs for that message Result<AuditLog> result = logSerialization.getAuditLogs( messageId ); Assert.assertEquals( 1, result.getEntities().size() ); @@ -61,8 +60,6 @@ public class AuditLogSerializationTest extends AbstractTest { public void testGetAuditLogs() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); - AuditLogSerialization logSerialization = getInjector().getInstance( AuditLogSerialization.class ); @@ -73,14 +70,14 @@ public class AuditLogSerializationTest extends AbstractTest { String dest = RandomStringUtils.randomAlphanumeric( 15 ); int numLogs = 10; - + UUID queueMessageId1 = UUIDGen.getTimeUUID(); for ( int i=0; i<numLogs; i++ ) { logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, queueName, dest, messageId, queueMessageId1 ); - Thread.sleep(5); + Thread.sleep(5); } - + UUID queueMessageId2 = UUIDGen.getTimeUUID(); for ( int i=0; i<numLogs; i++ ) { logSerialization.recordAuditLog( AuditLog.Action.GET, AuditLog.Status.SUCCESS, @@ -99,4 +96,4 @@ public class AuditLogSerializationTest extends AbstractTest { Result<AuditLog> result = logSerialization.getAuditLogs( messageId ); Assert.assertEquals( numLogs * 3, result.getEntities().size() ); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java index e50bae5..2100c80 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueueSerializationTest.java @@ -36,7 +36,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest { public void writeQueue(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); DatabaseQueue queue = new DatabaseQueue("test", "west", "west", 0L, 0, 0, "test_dlq"); @@ -51,7 +50,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest { public void loadQueue(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq"); @@ -68,7 +66,6 @@ public class DatabaseQueueSerializationTest extends AbstractTest { public void deleteQueue(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); QueueSerialization queueSerialization = getInjector().getInstance( QueueSerialization.class ); DatabaseQueue queue = new DatabaseQueue("test1", "west", "west", 0L, 0, 0, "test_dlq"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java index 3152025..f9c2951 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java @@ -32,34 +32,33 @@ import static org.junit.Assert.fail; public class ShardCounterSerializationTest extends AbstractTest { - + @Test public void testBasicOperation() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); - ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class ); - + ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class ); + String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 ); long shardId = 100L; - + try { scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ); fail("Should have throw NotFoundException"); } catch ( NotFoundException expected ) { - // pass + // pass } scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 10 ); Assert.assertEquals( 10, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); - + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 50 ); Assert.assertEquals( 60, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); - + scs.incrementCounter( queueName, Shard.Type.DEFAULT, shardId, 150 ); Assert.assertEquals( 210, scs.getCounterValue( queueName, Shard.Type.DEFAULT, shardId ) ); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java index 572c897..fb0a46e 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding; import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; @@ -46,7 +47,8 @@ public class ShardIteratorTest extends AbstractTest { public void getActiveShards(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); @@ -76,8 +78,9 @@ public class ShardIteratorTest extends AbstractTest { public void seekActiveShards(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); - + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null); Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null); @@ -107,21 +110,22 @@ public class ShardIteratorTest extends AbstractTest { public void shardIteratorOrdering() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); int numShards = 10; String region = "default"; String queueName = "sit_queue_" + RandomStringUtils.randomAlphanumeric(20); - + for ( long i=0; i<numShards; i++) { UUID messageId = QakkaUtils.getTimeUuid(); Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, i+1, messageId ); shardSerialization.createShard( shard ); try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {} } - + Iterator<Shard> shardIterator = new ShardIterator( - cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty()); + cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty()); int count = 0; Long prevTimestamp = null; @@ -133,7 +137,7 @@ public class ShardIteratorTest extends AbstractTest { prevTimestamp = shard.getPointer().timestamp(); count++; } - + Assert.assertEquals( numShards, count ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java index e1a541b..e67db28 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.core.CassandraClient; @@ -46,7 +47,8 @@ public class ShardSerializationTest extends AbstractTest { public void writeNewShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); shardSerialization.createShard(shard1); @@ -56,7 +58,8 @@ public class ShardSerializationTest extends AbstractTest { public void deleteShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); @@ -72,7 +75,8 @@ public class ShardSerializationTest extends AbstractTest { public void loadNullShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null); @@ -86,8 +90,9 @@ public class ShardSerializationTest extends AbstractTest { public void updatePointer(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient ); - + CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); shardSerialization.createShard(shard1); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java index ea73abc..7338a42 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java @@ -41,7 +41,6 @@ public class ShardStrategyTest extends AbstractTest { public void testBasicOperation() { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class ); @@ -49,7 +48,7 @@ public class ShardStrategyTest extends AbstractTest { UUID messageIdToLocate = null; long selectedShardId = 4L; - + int numShards = 10; String region = "default"; String queueName = "sst_queue_" + RandomStringUtils.randomAlphanumeric(20); @@ -66,6 +65,6 @@ public class ShardStrategyTest extends AbstractTest { Shard selectedShard = shardStrategy.selectShard( queueName, region, Shard.Type.DEFAULT, messageIdToLocate ); Assert.assertEquals( selectedShardId, selectedShard.getShardId() ); - + } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java index 20b72b0..306dfee 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java @@ -42,16 +42,15 @@ public class TransferLogSerializationTest extends AbstractTest { public void recordTransferLog() throws Exception { TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class ); - + CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); String source = RandomStringUtils.randomAlphanumeric( 15 ); String dest = RandomStringUtils.randomAlphanumeric( 15 ); - + int numLogs = 100; - + for ( int i=0; i<numLogs; i++ ) { logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID()); } @@ -60,9 +59,9 @@ public class TransferLogSerializationTest extends AbstractTest { int fetchCount = 0; PagingState pagingState = null; while ( true ) { - + Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 ); - + // we only want entities for our queue List<TransferLog> logs = all.getEntities().stream() .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); @@ -71,7 +70,7 @@ public class TransferLogSerializationTest extends AbstractTest { fetchCount++; if ( all.getPagingState() == null ) { break; - } + } pagingState = all.getPagingState(); } @@ -84,8 +83,7 @@ public class TransferLogSerializationTest extends AbstractTest { TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class ); CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); - + String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 ); String source = RandomStringUtils.randomAlphanumeric( 15 ); String dest = RandomStringUtils.randomAlphanumeric( 15 ); @@ -101,16 +99,16 @@ public class TransferLogSerializationTest extends AbstractTest { Assert.assertEquals( 1, logs.size()); logSerialization.removeTransferLog( queueName, source, dest, messageId ); - + List<TransferLog> all = getTransferLogs( logSerialization ); logs = all.stream() .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() ); Assert.assertEquals( 0, logs.size()); - + try { logSerialization.removeTransferLog( queueName, source, dest, messageId ); Assert.fail("Removing non-existent log should throw exception"); - + } catch ( QakkaException expected ) { // success! } @@ -130,4 +128,4 @@ public class TransferLogSerializationTest extends AbstractTest { return allLogs; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java index bc01b23..5800bba 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -25,11 +25,9 @@ import com.google.inject.Injector; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.App; -import org.apache.usergrid.persistence.qakka.QakkaModule; import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; -import org.apache.usergrid.persistence.queue.guice.QueueModule; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.junit.Ignore; import org.junit.Test; @@ -60,7 +58,6 @@ public class LegacyQueueManagerTest extends AbstractTest { Injector myInjector = getInjector(); CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -99,7 +96,6 @@ public class LegacyQueueManagerTest extends AbstractTest { Injector myInjector = getInjector(); CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); @@ -142,7 +138,6 @@ public class LegacyQueueManagerTest extends AbstractTest { Injector myInjector = getInjector(); CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class ); - cassandraClient.getSession(); ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class ); String region = actorSystemFig.getRegionLocal(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index c62b0df..9140637 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -34,19 +34,25 @@ usergrid.cluster.seeds=us-east:localhost # Port used for cluster communications. usergrid.cluster.port=2551 -queue.sender.num.actors=10 -queue.writer.num.actors=10 -queue.num.actors=10 +queue.sender.num.actors=20 +queue.writer.num.actors=20 +queue.num.actors=20 # set shard size and times low for testing purposes queue.shard.max.size=500 -queue.shard.allocation.check.frequency.millis=100 -queue.shard.allocation.advance.time.millis=200 +queue.shard.allocation.check.frequency.millis=1000 +queue.shard.allocation.advance.time.millis=2000 +queue.refresh.millis=1000 queue.max.inmemory.shard.counter = 100 +cassandra.connections=10 +#cassandra.timeout=20000 + cassandra.hosts=localhost -cassandra.keyspace.application=qakka_test +cassandra.keyspace.application=qakka_test_application + +cassandra.keyspace.queue-message=qakka_test_queue_messages cassandra.keyspace-drop-and-create=true
