Ensure correct keyspace names used
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbce160a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbce160a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbce160a Branch: refs/heads/usergrid-1318-queue Commit: fbce160a1a87418ee6eb09c6183a9db8f35aec8c Parents: 202d5be Author: Dave Johnson <[email protected]> Authored: Wed Oct 5 17:43:19 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Wed Oct 5 17:43:19 2016 -0400 ---------------------------------------------------------------------- .../persistence/core/CassandraConfig.java | 28 +++ .../persistence/core/CassandraConfigImpl.java | 69 ++++++++ .../core/datastax/impl/DataStaxClusterImpl.java | 57 ++++--- .../core/astyanax/ColumnNameIteratorTest.java | 65 +++++++ .../MultiKeyColumnNameIteratorTest.java | 66 +++++++ .../astyanax/MultiRowColumnIteratorTest.java | 67 ++++++++ .../qakka/core/impl/QueueManagerImpl.java | 13 +- .../qakka/api/QueueResourceTest.java | 170 ++++++++++--------- 8 files changed, 421 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java index f45ad43..499561e 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.core; import com.netflix.astyanax.model.ConsistencyLevel; +import org.apache.cassandra.db.marshal.AbstractCompositeType; +import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath; /** @@ -77,4 +79,30 @@ public interface CassandraConfig { String getApplicationKeyspace(); String getApplicationLocalKeyspace(); + + String getLocalDataCenter(); + + int getConnections(); + + int getTimeout(); + + int getPoolTimeout(); + + String getClusterName(); + + String getHosts(); + + String getVersion(); + + String getUsername(); + + String getPassword(); + + String getStrategy(); + + String getStrategyOptions(); + + String getStrategyLocal(); + + String getStrategyOptionsLocal(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java index 77f7228..9cdec95 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java @@ -26,6 +26,7 @@ import java.beans.PropertyChangeListener; import com.google.inject.Inject; import com.google.inject.Singleton; import com.netflix.astyanax.model.ConsistencyLevel; +import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath; /** @@ -35,6 +36,7 @@ import com.netflix.astyanax.model.ConsistencyLevel; @Singleton public class CassandraConfigImpl implements CassandraConfig { + private CassandraFig cassandraFig; private ConsistencyLevel readCl; private ConsistencyLevel writeCl; @@ -54,6 +56,8 @@ public class CassandraConfigImpl implements CassandraConfig { @Inject public CassandraConfigImpl( final CassandraFig cassandraFig ) { + this.cassandraFig = cassandraFig; + this.readCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ); this.writeCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxWriteCL() ); @@ -154,4 +158,69 @@ public class CassandraConfigImpl implements CassandraConfig { return applicationLocalKeyspace; } + @Override + public String getLocalDataCenter() { + return cassandraFig.getLocalDataCenter(); + } + + @Override + public int getConnections() { + return cassandraFig.getConnections(); + } + + @Override + public int getTimeout() { + return cassandraFig.getTimeout(); + } + + @Override + public int getPoolTimeout() { + return cassandraFig.getPoolTimeout(); + } + + @Override + public String getClusterName() { + return cassandraFig.getClusterName(); + } + + @Override + public String getHosts() { + return cassandraFig.getHosts(); + } + + @Override + public String getVersion() { + return cassandraFig.getVersion(); + } + + @Override + public String getUsername() { + return cassandraFig.getUsername(); + } + + @Override + public String getPassword() { + return cassandraFig.getPassword(); + } + + @Override + public String getStrategy() { + return cassandraFig.getStrategyLocal(); + } + + @Override + public String getStrategyOptions() { + return cassandraFig.getStrategyOptions(); + } + + @Override + public String getStrategyLocal() { + return cassandraFig.getStrategyLocal(); + } + + @Override + public String getStrategyOptionsLocal() { + return cassandraFig.getStrategyOptionsLocal(); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java index c8ddf3e..67f6123 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java @@ -23,6 +23,7 @@ import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; @@ -36,15 +37,15 @@ public class DataStaxClusterImpl implements DataStaxCluster { private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class ); - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; private Cluster cluster; private Session applicationSession; private Session queueMessageSession; private Session clusterSession; @Inject - public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception { - this.cassandraFig = cassandraFig; + public DataStaxClusterImpl(final CassandraConfig cassandraFig ) throws Exception { + this.cassandraConfig = cassandraFig; this.cluster = buildCluster(); // always initialize the keyspaces @@ -85,7 +86,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { // always grab cluster from getCluster() in case it was prematurely closed if ( applicationSession == null || applicationSession.isClosed() ){ - applicationSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) ); + applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) ); } return applicationSession; } @@ -96,7 +97,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { // always grab cluster from getCluster() in case it was prematurely closed if ( queueMessageSession == null || queueMessageSession.isClosed() ){ - queueMessageSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) ); + queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) ); } return queueMessageSession; } @@ -110,7 +111,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { public void createApplicationKeyspace() throws Exception { boolean exists = getClusterSession().getCluster().getMetadata() - .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())) != null; + .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationKeyspace())) != null; if(exists){ return; @@ -118,14 +119,14 @@ public class DataStaxClusterImpl implements DataStaxCluster { final String createApplicationKeyspace = String.format( "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s", - CQLUtils.quote(cassandraFig.getApplicationKeyspace()), - CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()) + CQLUtils.quote( cassandraConfig.getApplicationKeyspace()), + CQLUtils.getFormattedReplication( cassandraConfig.getStrategy(), cassandraConfig.getStrategyOptions()) ); getClusterSession().execute(createApplicationKeyspace); - logger.info("Created keyspace: {}", cassandraFig.getApplicationKeyspace()); + logger.info("Created keyspace: {}", cassandraConfig.getApplicationKeyspace()); } @@ -139,7 +140,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { public void createApplicationLocalKeyspace() throws Exception { boolean exists = getClusterSession().getCluster().getMetadata() - .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != null; + .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace())) != null; if (exists) { return; @@ -147,14 +148,14 @@ public class DataStaxClusterImpl implements DataStaxCluster { final String createQueueMessageKeyspace = String.format( "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s", - CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()), - CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal()) + CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace()), + CQLUtils.getFormattedReplication( cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal()) ); getClusterSession().execute(createQueueMessageKeyspace); - logger.info("Created keyspace: {}", cassandraFig.getApplicationLocalKeyspace()); + logger.info("Created keyspace: {}", cassandraConfig.getApplicationLocalKeyspace()); } @@ -184,7 +185,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { ConsistencyLevel defaultConsistencyLevel; try { - defaultConsistencyLevel = ConsistencyLevel.valueOf(cassandraFig.getReadCl()); + defaultConsistencyLevel = cassandraConfig.getDataStaxReadCl(); } catch (IllegalArgumentException e){ logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}", @@ -196,44 +197,44 @@ public class DataStaxClusterImpl implements DataStaxCluster { LoadBalancingPolicy loadBalancingPolicy; - if( !cassandraFig.getLocalDataCenter().isEmpty() ){ + if( !cassandraConfig.getLocalDataCenter().isEmpty() ){ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder() - .withLocalDc( cassandraFig.getLocalDataCenter() ).build(); + .withLocalDc( cassandraConfig.getLocalDataCenter() ).build(); }else{ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build(); } final PoolingOptions poolingOptions = new PoolingOptions() - .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections() / 2) - .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections()) - .setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000) - .setPoolTimeoutMillis(cassandraFig.getPoolTimeout()); + .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections() / 2) + .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections()) + .setIdleTimeoutSeconds( cassandraConfig.getTimeout() / 1000) + .setPoolTimeoutMillis( cassandraConfig.getPoolTimeout()); // purposely add a couple seconds to the driver's lower level socket timeouts vs. cassandra timeouts final SocketOptions socketOptions = new SocketOptions() - .setConnectTimeoutMillis(cassandraFig.getPoolTimeout() + 2000) - .setReadTimeoutMillis(cassandraFig.getTimeout() + 2000); + .setConnectTimeoutMillis( cassandraConfig.getPoolTimeout() + 2000) + .setReadTimeoutMillis( cassandraConfig.getTimeout() + 2000); final QueryOptions queryOptions = new QueryOptions() .setConsistencyLevel(defaultConsistencyLevel); Cluster.Builder datastaxCluster = Cluster.builder() - .withClusterName(cassandraFig.getClusterName()) - .addContactPoints(cassandraFig.getHosts().split(",")) + .withClusterName( cassandraConfig.getClusterName()) + .addContactPoints( cassandraConfig.getHosts().split(",")) .withMaxSchemaAgreementWaitSeconds(30) .withCompression(ProtocolOptions.Compression.LZ4) .withLoadBalancingPolicy(loadBalancingPolicy) .withPoolingOptions(poolingOptions) .withQueryOptions(queryOptions) .withSocketOptions(socketOptions) - .withProtocolVersion(getProtocolVersion(cassandraFig.getVersion())); + .withProtocolVersion(getProtocolVersion( cassandraConfig.getVersion())); // only add auth credentials if they were provided - if ( !cassandraFig.getUsername().isEmpty() && !cassandraFig.getPassword().isEmpty() ){ + if ( !cassandraConfig.getUsername().isEmpty() && !cassandraConfig.getPassword().isEmpty() ){ datastaxCluster.withCredentials( - cassandraFig.getUsername(), - cassandraFig.getPassword() + cassandraConfig.getUsername(), + cassandraConfig.getPassword() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java index c45fdd1..00856a5 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java @@ -126,6 +126,71 @@ public class ColumnNameIteratorTest { return cassandraFig.getApplicationLocalKeyspace(); } + @Override + public String getLocalDataCenter() { + return cassandraFig.getLocalDataCenter(); + } + + @Override + public int getConnections() { + return cassandraFig.getConnections(); + } + + @Override + public int getTimeout() { + return cassandraFig.getTimeout(); + } + + @Override + public int getPoolTimeout() { + return cassandraFig.getPoolTimeout(); + } + + @Override + public String getClusterName() { + return cassandraFig.getClusterName(); + } + + @Override + public String getHosts() { + return cassandraFig.getHosts(); + } + + @Override + public String getVersion() { + return cassandraFig.getVersion(); + } + + @Override + public String getUsername() { + return cassandraFig.getUsername(); + } + + @Override + public String getPassword() { + return cassandraFig.getPassword(); + } + + @Override + public String getStrategy() { + return cassandraFig.getStrategy(); + } + + @Override + public String getStrategyOptions() { + return cassandraFig.getStrategyOptions(); + } + + @Override + public String getStrategyLocal() { + return cassandraFig.getStrategyLocal(); + } + + @Override + public String getStrategyOptionsLocal() { + return cassandraFig.getStrategyOptionsLocal(); + } + }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java index 5cdf0e1..b12ea6f 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java @@ -129,6 +129,72 @@ public class MultiKeyColumnNameIteratorTest { return cassandraFig.getApplicationLocalKeyspace(); } + + @Override + public String getLocalDataCenter() { + return cassandraFig.getLocalDataCenter(); + } + + @Override + public int getConnections() { + return cassandraFig.getConnections(); + } + + @Override + public int getTimeout() { + return cassandraFig.getTimeout(); + } + + @Override + public int getPoolTimeout() { + return cassandraFig.getPoolTimeout(); + } + + @Override + public String getClusterName() { + return cassandraFig.getClusterName(); + } + + @Override + public String getHosts() { + return cassandraFig.getHosts(); + } + + @Override + public String getVersion() { + return cassandraFig.getVersion(); + } + + @Override + public String getUsername() { + return cassandraFig.getUsername(); + } + + @Override + public String getPassword() { + return cassandraFig.getPassword(); + } + + @Override + public String getStrategy() { + return cassandraFig.getStrategy(); + } + + @Override + public String getStrategyOptions() { + return cassandraFig.getStrategyOptions(); + } + + @Override + public String getStrategyLocal() { + return cassandraFig.getStrategyLocal(); + } + + @Override + public String getStrategyOptionsLocal() { + return cassandraFig.getStrategyOptionsLocal(); + } + }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java index 6331941..e509c45 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java @@ -132,6 +132,73 @@ public class MultiRowColumnIteratorTest { public String getApplicationLocalKeyspace() { return cassandraFig.getApplicationLocalKeyspace(); } + + + @Override + public String getLocalDataCenter() { + return cassandraFig.getLocalDataCenter(); + } + + @Override + public int getConnections() { + return cassandraFig.getConnections(); + } + + @Override + public int getTimeout() { + return cassandraFig.getTimeout(); + } + + @Override + public int getPoolTimeout() { + return cassandraFig.getPoolTimeout(); + } + + @Override + public String getClusterName() { + return cassandraFig.getClusterName(); + } + + @Override + public String getHosts() { + return cassandraFig.getHosts(); + } + + @Override + public String getVersion() { + return cassandraFig.getVersion(); + } + + @Override + public String getUsername() { + return cassandraFig.getUsername(); + } + + @Override + public String getPassword() { + return cassandraFig.getPassword(); + } + + @Override + public String getStrategy() { + return cassandraFig.getStrategy(); + } + + @Override + public String getStrategyOptions() { + return cassandraFig.getStrategyOptions(); + } + + @Override + public String getStrategyLocal() { + return cassandraFig.getStrategyLocal(); + } + + @Override + public String getStrategyOptionsLocal() { + return cassandraFig.getStrategyOptionsLocal(); + } + }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java index a8139a1..d51fe2d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java @@ -81,14 +81,13 @@ public class QueueManagerImpl implements QueueManager { } } - for ( String region : regions ) { + Shard available = new Shard( queue.getName(), actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( available ); - Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid()); - shardSerialization.createShard( available ); - - Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid()); - shardSerialization.createShard( inflight ); - } + Shard inflight = new Shard( queue.getName(), actorSystemFig.getRegionLocal(), + Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( inflight ); // only write the existence of a queue to the database if its dependent initial shards have been written queueSerialization.writeQueue(queue.toDatabaseQueue()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java index d723e97..620e946 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java @@ -155,39 +155,43 @@ public class QueueResourceTest extends AbstractRestTest { }}; target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); - // send some messages + try { - ObjectMapper mapper = new ObjectMapper(); + // send some messages + + ObjectMapper mapper = new ObjectMapper(); - int numMessages = 100; - for (int i = 0; i < numMessages; i++) { + int numMessages = 100; + for (int i = 0; i < numMessages; i++) { - final int number = i; - Map<String, Object> messageMap = new HashMap<String, Object>() {{ - put( "message", "this is message #" + number ); - put( "valid", true ); - }}; - String body = mapper.writeValueAsString( messageMap ); + final int number = i; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ + put( "message", "this is message #" + number ); + put( "valid", true ); + }}; + String body = mapper.writeValueAsString( messageMap ); - Response response; - if ( asJson ) { - response = target( "queues" ).path( queueName ).path( "messages" ) + Response response; + if (asJson) { + response = target( "queues" ).path( queueName ).path( "messages" ) .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) ); - } else { - response = target( "queues" ).path( queueName ).path( "messages" ) + } else { + response = target( "queues" ).path( queueName ).path( "messages" ) .queryParam( "contentType", MediaType.APPLICATION_JSON ) .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) ); - } + } - Assert.assertEquals( 200, response.getStatus() ); - } + Assert.assertEquals( 200, response.getStatus() ); + } - // get all messages, checking for dups + // get all messages, checking for dups - checkJsonMessages( queueName, numMessages ); + checkJsonMessages( queueName, numMessages ); - Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); - Assert.assertEquals( 200, response.getStatus() ); + } finally { + Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); + } } @@ -244,28 +248,32 @@ public class QueueResourceTest extends AbstractRestTest { }}; target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) ); - // send messages each with image/jpg payload + try { - InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg"); - byte[] bytes = ByteStreams.toByteArray( is ); + // send messages each with image/jpg payload - int numMessages = 100; - for (int i = 0; i < numMessages; i++) { + InputStream is = getClass().getResourceAsStream( "/qakka-duck.jpg" ); + byte[] bytes = ByteStreams.toByteArray( is ); - Response response = target( "queues" ).path( queueName ).path( "messages" ) + int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + + Response response = target( "queues" ).path( queueName ).path( "messages" ) .queryParam( "contentType", "image/jpg" ) .request() - .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM )); + .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ) ); - Assert.assertEquals( 200, response.getStatus() ); - } + Assert.assertEquals( 200, response.getStatus() ); + } - // get all messages, checking for dups + // get all messages, checking for dups - checkBinaryMessages( queueName, numMessages ); + checkBinaryMessages( queueName, numMessages ); - Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); - Assert.assertEquals( 200, response.getStatus() ); + } finally { + Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); + } } @@ -320,71 +328,75 @@ public class QueueResourceTest extends AbstractRestTest { Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }}; target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE)); - // send some messages + try { - ObjectMapper mapper = new ObjectMapper(); + // send some messages - int numMessages = 100; - for ( int i=0; i<numMessages; i++ ) { + ObjectMapper mapper = new ObjectMapper(); - final int number = i; - Map<String, Object> messageMap = new HashMap<String, Object>() {{ - put("message", "this is message #" + number); - put("valid", true ); - }}; - String body = mapper.writeValueAsString( messageMap ); + int numMessages = 100; + for (int i = 0; i < numMessages; i++) { - Response response = target("queues").path( queueName ).path( "messages" ) - .request().post( Entity.entity( body, MediaType.APPLICATION_JSON )); + final int number = i; + Map<String, Object> messageMap = new HashMap<String, Object>() {{ + put( "message", "this is message #" + number ); + put( "valid", true ); + }}; + String body = mapper.writeValueAsString( messageMap ); - Assert.assertEquals( 200, response.getStatus() ); - } + Response response = target( "queues" ).path( queueName ).path( "messages" ) + .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) ); - // get all messages, checking for dups + Assert.assertEquals( 200, response.getStatus() ); + } - Set<UUID> messageIds = checkJsonMessages( queueName, numMessages ); + // get all messages, checking for dups - // there should be no more messages available + Set<UUID> messageIds = checkJsonMessages( queueName, numMessages ); - Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); - ApiResponse apiResponse = response.readEntity( ApiResponse.class ); - Assert.assertNotNull( apiResponse.getQueueMessages() ); - Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() ); + // there should be no more messages available + + Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get(); + ApiResponse apiResponse = response.readEntity( ApiResponse.class ); + Assert.assertNotNull( apiResponse.getQueueMessages() ); + Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() ); - // ack half of the messages + // ack half of the messages - int count = 0; - Set<UUID> ackedIds = new HashSet<>(); - for ( UUID queueMessageId : messageIds ) { - response = target( "queues" ) + int count = 0; + Set<UUID> ackedIds = new HashSet<>(); + for (UUID queueMessageId : messageIds) { + response = target( "queues" ) .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); - Assert.assertEquals( 200, response.getStatus() ); - ackedIds.add( queueMessageId ); - if ( ++count >= numMessages/2 ) { - break; + Assert.assertEquals( 200, response.getStatus() ); + ackedIds.add( queueMessageId ); + if (++count >= numMessages / 2) { + break; + } } - } - messageIds.removeAll( ackedIds ); + messageIds.removeAll( ackedIds ); - // wait for remaining of the messages to timeout + // wait for remaining of the messages to timeout - QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class ); - Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 ); + QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class ); + Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 ); - // now, the remaining messages cannot be acked because they timed out + // now, the remaining messages cannot be acked because they timed out - for ( UUID queueMessageId : messageIds ) { - response = target( "queues" ) + for (UUID queueMessageId : messageIds) { + response = target( "queues" ) .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete(); - Assert.assertEquals( 400, response.getStatus() ); - } + Assert.assertEquals( 400, response.getStatus() ); + } - // and, those same messages should be available again in the queue + // and, those same messages should be available again in the queue - checkJsonMessages( queueName, numMessages/2 ); + checkJsonMessages( queueName, numMessages / 2 ); - response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); - Assert.assertEquals( 200, response.getStatus() ); + } finally { + Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete(); + Assert.assertEquals( 200, response.getStatus() ); + } }
