Ensure correct keyspace names used

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbce160a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbce160a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbce160a

Branch: refs/heads/usergrid-1318-queue
Commit: fbce160a1a87418ee6eb09c6183a9db8f35aec8c
Parents: 202d5be
Author: Dave Johnson <[email protected]>
Authored: Wed Oct 5 17:43:19 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Wed Oct 5 17:43:19 2016 -0400

----------------------------------------------------------------------
 .../persistence/core/CassandraConfig.java       |  28 +++
 .../persistence/core/CassandraConfigImpl.java   |  69 ++++++++
 .../core/datastax/impl/DataStaxClusterImpl.java |  57 ++++---
 .../core/astyanax/ColumnNameIteratorTest.java   |  65 +++++++
 .../MultiKeyColumnNameIteratorTest.java         |  66 +++++++
 .../astyanax/MultiRowColumnIteratorTest.java    |  67 ++++++++
 .../qakka/core/impl/QueueManagerImpl.java       |  13 +-
 .../qakka/api/QueueResourceTest.java            | 170 ++++++++++---------
 8 files changed, 421 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index f45ad43..499561e 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.core;
 
 
 import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
 
 
 /**
@@ -77,4 +79,30 @@ public interface CassandraConfig {
     String getApplicationKeyspace();
 
     String getApplicationLocalKeyspace();
+
+    String getLocalDataCenter();
+
+    int getConnections();
+
+    int getTimeout();
+
+    int getPoolTimeout();
+
+    String getClusterName();
+
+    String getHosts();
+
+    String getVersion();
+
+    String getUsername();
+
+    String getPassword();
+
+    String getStrategy();
+
+    String getStrategyOptions();
+
+    String getStrategyLocal();
+
+    String getStrategyOptionsLocal();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 77f7228..9cdec95 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -26,6 +26,7 @@ import java.beans.PropertyChangeListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
 
 
 /**
@@ -35,6 +36,7 @@ import com.netflix.astyanax.model.ConsistencyLevel;
 @Singleton
 public class CassandraConfigImpl implements CassandraConfig {
 
+    private CassandraFig cassandraFig;
 
     private ConsistencyLevel readCl;
     private ConsistencyLevel writeCl;
@@ -54,6 +56,8 @@ public class CassandraConfigImpl implements CassandraConfig {
     @Inject
     public CassandraConfigImpl( final CassandraFig cassandraFig ) {
 
+        this.cassandraFig = cassandraFig;
+
         this.readCl = ConsistencyLevel.valueOf( 
cassandraFig.getAstyanaxReadCL() );
 
         this.writeCl = ConsistencyLevel.valueOf( 
cassandraFig.getAstyanaxWriteCL() );
@@ -154,4 +158,69 @@ public class CassandraConfigImpl implements 
CassandraConfig {
         return applicationLocalKeyspace;
     }
 
+    @Override
+    public String getLocalDataCenter() {
+        return cassandraFig.getLocalDataCenter();
+    }
+
+    @Override
+    public int getConnections() {
+        return cassandraFig.getConnections();
+    }
+
+    @Override
+    public int getTimeout() {
+        return cassandraFig.getTimeout();
+    }
+
+    @Override
+    public int getPoolTimeout() {
+        return cassandraFig.getPoolTimeout();
+    }
+
+    @Override
+    public String getClusterName() {
+        return cassandraFig.getClusterName();
+    }
+
+    @Override
+    public String getHosts() {
+        return cassandraFig.getHosts();
+    }
+
+    @Override
+    public String getVersion() {
+        return cassandraFig.getVersion();
+    }
+
+    @Override
+    public String getUsername() {
+        return cassandraFig.getUsername();
+    }
+
+    @Override
+    public String getPassword() {
+        return cassandraFig.getPassword();
+    }
+
+    @Override
+    public String getStrategy() {
+        return cassandraFig.getStrategyLocal();
+    }
+
+    @Override
+    public String getStrategyOptions() {
+        return cassandraFig.getStrategyOptions();
+    }
+
+    @Override
+    public String getStrategyLocal() {
+        return cassandraFig.getStrategyLocal();
+    }
+
+    @Override
+    public String getStrategyOptionsLocal() {
+        return cassandraFig.getStrategyOptionsLocal();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index c8ddf3e..67f6123 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -23,6 +23,7 @@ import 
com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
 import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
@@ -36,15 +37,15 @@ public class DataStaxClusterImpl implements DataStaxCluster 
{
     private static final Logger logger = LoggerFactory.getLogger( 
DataStaxClusterImpl.class );
 
 
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
     private Cluster cluster;
     private Session applicationSession;
     private Session queueMessageSession;
     private Session clusterSession;
 
     @Inject
-    public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws 
Exception {
-        this.cassandraFig = cassandraFig;
+    public DataStaxClusterImpl(final CassandraConfig cassandraFig ) throws 
Exception {
+        this.cassandraConfig = cassandraFig;
         this.cluster = buildCluster();
 
         // always initialize the keyspaces
@@ -85,7 +86,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         // always grab cluster from getCluster() in case it was prematurely 
closed
         if ( applicationSession == null || applicationSession.isClosed() ){
-            applicationSession = getCluster().connect( 
CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) );
+            applicationSession = getCluster().connect( CQLUtils.quote( 
cassandraConfig.getApplicationKeyspace() ) );
         }
         return applicationSession;
     }
@@ -96,7 +97,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         // always grab cluster from getCluster() in case it was prematurely 
closed
         if ( queueMessageSession == null || queueMessageSession.isClosed() ){
-            queueMessageSession = getCluster().connect( 
CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) );
+            queueMessageSession = getCluster().connect( CQLUtils.quote( 
cassandraConfig.getApplicationLocalKeyspace() ) );
         }
         return queueMessageSession;
     }
@@ -110,7 +111,7 @@ public class DataStaxClusterImpl implements DataStaxCluster 
{
     public void createApplicationKeyspace() throws Exception {
 
         boolean exists = getClusterSession().getCluster().getMetadata()
-            
.getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())) != null;
+            .getKeyspace(CQLUtils.quote( 
cassandraConfig.getApplicationKeyspace())) != null;
 
         if(exists){
             return;
@@ -118,14 +119,14 @@ public class DataStaxClusterImpl implements 
DataStaxCluster {
 
         final String createApplicationKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
-            CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
-            CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), 
cassandraFig.getStrategyOptions())
+            CQLUtils.quote( cassandraConfig.getApplicationKeyspace()),
+            CQLUtils.getFormattedReplication( cassandraConfig.getStrategy(), 
cassandraConfig.getStrategyOptions())
 
         );
 
         getClusterSession().execute(createApplicationKeyspace);
 
-        logger.info("Created keyspace: {}", 
cassandraFig.getApplicationKeyspace());
+        logger.info("Created keyspace: {}", 
cassandraConfig.getApplicationKeyspace());
 
     }
 
@@ -139,7 +140,7 @@ public class DataStaxClusterImpl implements DataStaxCluster 
{
     public void createApplicationLocalKeyspace() throws Exception {
 
         boolean exists = getClusterSession().getCluster().getMetadata()
-            
.getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != 
null;
+            .getKeyspace(CQLUtils.quote( 
cassandraConfig.getApplicationLocalKeyspace())) != null;
 
         if (exists) {
             return;
@@ -147,14 +148,14 @@ public class DataStaxClusterImpl implements 
DataStaxCluster {
 
         final String createQueueMessageKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
-            CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
-            CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), 
cassandraFig.getStrategyOptionsLocal())
+            CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace()),
+            CQLUtils.getFormattedReplication( 
cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal())
 
         );
 
         getClusterSession().execute(createQueueMessageKeyspace);
 
-        logger.info("Created keyspace: {}", 
cassandraFig.getApplicationLocalKeyspace());
+        logger.info("Created keyspace: {}", 
cassandraConfig.getApplicationLocalKeyspace());
 
     }
 
@@ -184,7 +185,7 @@ public class DataStaxClusterImpl implements DataStaxCluster 
{
 
         ConsistencyLevel defaultConsistencyLevel;
         try {
-            defaultConsistencyLevel = 
ConsistencyLevel.valueOf(cassandraFig.getReadCl());
+            defaultConsistencyLevel = cassandraConfig.getDataStaxReadCl();
         } catch (IllegalArgumentException e){
 
             logger.error("Unable to parse provided consistency level in 
property: {}, defaulting to: {}",
@@ -196,44 +197,44 @@ public class DataStaxClusterImpl implements 
DataStaxCluster {
 
 
         LoadBalancingPolicy loadBalancingPolicy;
-        if( !cassandraFig.getLocalDataCenter().isEmpty() ){
+        if( !cassandraConfig.getLocalDataCenter().isEmpty() ){
 
             loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder()
-                .withLocalDc( cassandraFig.getLocalDataCenter() ).build();
+                .withLocalDc( cassandraConfig.getLocalDataCenter() ).build();
         }else{
             loadBalancingPolicy = new 
DCAwareRoundRobinPolicy.Builder().build();
         }
 
         final PoolingOptions poolingOptions = new PoolingOptions()
-            .setCoreConnectionsPerHost(HostDistance.LOCAL, 
cassandraFig.getConnections() / 2)
-            .setMaxConnectionsPerHost(HostDistance.LOCAL, 
cassandraFig.getConnections())
-            .setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000)
-            .setPoolTimeoutMillis(cassandraFig.getPoolTimeout());
+            .setCoreConnectionsPerHost(HostDistance.LOCAL, 
cassandraConfig.getConnections() / 2)
+            .setMaxConnectionsPerHost(HostDistance.LOCAL, 
cassandraConfig.getConnections())
+            .setIdleTimeoutSeconds( cassandraConfig.getTimeout() / 1000)
+            .setPoolTimeoutMillis( cassandraConfig.getPoolTimeout());
 
         // purposely add a couple seconds to the driver's lower level socket 
timeouts vs. cassandra timeouts
         final SocketOptions socketOptions = new SocketOptions()
-            .setConnectTimeoutMillis(cassandraFig.getPoolTimeout() + 2000)
-            .setReadTimeoutMillis(cassandraFig.getTimeout() + 2000);
+            .setConnectTimeoutMillis( cassandraConfig.getPoolTimeout() + 2000)
+            .setReadTimeoutMillis( cassandraConfig.getTimeout() + 2000);
 
         final QueryOptions queryOptions = new QueryOptions()
             .setConsistencyLevel(defaultConsistencyLevel);
 
         Cluster.Builder datastaxCluster = Cluster.builder()
-            .withClusterName(cassandraFig.getClusterName())
-            .addContactPoints(cassandraFig.getHosts().split(","))
+            .withClusterName( cassandraConfig.getClusterName())
+            .addContactPoints( cassandraConfig.getHosts().split(","))
             .withMaxSchemaAgreementWaitSeconds(30)
             .withCompression(ProtocolOptions.Compression.LZ4)
             .withLoadBalancingPolicy(loadBalancingPolicy)
             .withPoolingOptions(poolingOptions)
             .withQueryOptions(queryOptions)
             .withSocketOptions(socketOptions)
-            
.withProtocolVersion(getProtocolVersion(cassandraFig.getVersion()));
+            .withProtocolVersion(getProtocolVersion( 
cassandraConfig.getVersion()));
 
         // only add auth credentials if they were provided
-        if ( !cassandraFig.getUsername().isEmpty() && 
!cassandraFig.getPassword().isEmpty() ){
+        if ( !cassandraConfig.getUsername().isEmpty() && 
!cassandraConfig.getPassword().isEmpty() ){
             datastaxCluster.withCredentials(
-                cassandraFig.getUsername(),
-                cassandraFig.getPassword()
+                cassandraConfig.getUsername(),
+                cassandraConfig.getPassword()
             );
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index c45fdd1..00856a5 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -126,6 +126,71 @@ public class ColumnNameIteratorTest {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
 
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index 5cdf0e1..b12ea6f 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -129,6 +129,72 @@ public class MultiKeyColumnNameIteratorTest {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
 
+
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 6331941..e509c45 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -132,6 +132,73 @@ public class MultiRowColumnIteratorTest {
             public String getApplicationLocalKeyspace() {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
+
+
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index a8139a1..d51fe2d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -81,14 +81,13 @@ public class QueueManagerImpl implements QueueManager {
             }
         }
 
-        for ( String region : regions ) {
+        Shard available = new Shard( queue.getName(), 
actorSystemFig.getRegionLocal(),
+            Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( available );
 
-            Shard available = new Shard( queue.getName(), region, 
Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
-            shardSerialization.createShard( available );
-
-            Shard inflight = new Shard( queue.getName(), region, 
Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
-            shardSerialization.createShard( inflight );
-        }
+        Shard inflight = new Shard( queue.getName(), 
actorSystemFig.getRegionLocal(),
+            Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( inflight );
 
         // only write the existence of a queue to the database if its 
dependent initial shards have been written
         queueSerialization.writeQueue(queue.toDatabaseQueue());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index d723e97..620e946 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -155,39 +155,43 @@ public class QueueResourceTest extends AbstractRestTest {
         }};
         target( "queues" ).request().post( Entity.entity( queueMap, 
MediaType.APPLICATION_JSON_TYPE ) );
 
-        // send some messages
+        try {
 
-        ObjectMapper mapper = new ObjectMapper();
+            // send some messages
+
+            ObjectMapper mapper = new ObjectMapper();
 
-        int numMessages = 100;
-        for (int i = 0; i < numMessages; i++) {
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
 
-            final int number = i;
-            Map<String, Object> messageMap = new HashMap<String, Object>() {{
-                put( "message", "this is message #" + number );
-                put( "valid", true );
-            }};
-            String body = mapper.writeValueAsString( messageMap );
+                final int number = i;
+                Map<String, Object> messageMap = new HashMap<String, Object>() 
{{
+                    put( "message", "this is message #" + number );
+                    put( "valid", true );
+                }};
+                String body = mapper.writeValueAsString( messageMap );
 
-            Response response;
-            if ( asJson ) {
-                response = target( "queues" ).path( queueName ).path( 
"messages" )
+                Response response;
+                if (asJson) {
+                    response = target( "queues" ).path( queueName ).path( 
"messages" )
                         .request().post( Entity.entity( body, 
MediaType.APPLICATION_JSON ) );
-            } else {
-                response = target( "queues" ).path( queueName ).path( 
"messages" )
+                } else {
+                    response = target( "queues" ).path( queueName ).path( 
"messages" )
                         .queryParam( "contentType", MediaType.APPLICATION_JSON 
)
                         .request().post( Entity.entity( body, 
MediaType.APPLICATION_OCTET_STREAM ) );
-            }
+                }
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        // get all messages, checking for dups
+            // get all messages, checking for dups
 
-        checkJsonMessages( queueName, numMessages );
+            checkJsonMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).queryParam( 
"confirm", true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName 
).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 
@@ -244,28 +248,32 @@ public class QueueResourceTest extends AbstractRestTest {
         }};
         target( "queues" ).request().post( Entity.entity( queueMap, 
MediaType.APPLICATION_JSON_TYPE ) );
 
-        // send messages each with image/jpg payload
+        try {
 
-        InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg");
-        byte[] bytes = ByteStreams.toByteArray( is );
+            // send messages each with image/jpg payload
 
-        int numMessages = 100;
-        for (int i = 0; i < numMessages; i++) {
+            InputStream is = getClass().getResourceAsStream( "/qakka-duck.jpg" 
);
+            byte[] bytes = ByteStreams.toByteArray( is );
 
-            Response response = target( "queues" ).path( queueName ).path( 
"messages" )
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
+
+                Response response = target( "queues" ).path( queueName ).path( 
"messages" )
                     .queryParam( "contentType", "image/jpg" )
                     .request()
-                    .post( Entity.entity( bytes, 
MediaType.APPLICATION_OCTET_STREAM ));
+                    .post( Entity.entity( bytes, 
MediaType.APPLICATION_OCTET_STREAM ) );
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        // get all messages, checking for dups
+            // get all messages, checking for dups
 
-        checkBinaryMessages( queueName, numMessages );
+            checkBinaryMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).queryParam( 
"confirm", true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName 
).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 
@@ -320,71 +328,75 @@ public class QueueResourceTest extends AbstractRestTest {
         Map<String, Object> queueMap = new HashMap<String, Object>() {{ 
put("name", queueName); }};
         target("queues").request().post( Entity.entity( queueMap, 
MediaType.APPLICATION_JSON_TYPE));
 
-        // send some messages
+        try {
 
-        ObjectMapper mapper = new ObjectMapper();
+            // send some messages
 
-        int numMessages = 100;
-        for ( int i=0; i<numMessages; i++ ) {
+            ObjectMapper mapper = new ObjectMapper();
 
-            final int number = i;
-            Map<String, Object> messageMap = new HashMap<String, Object>() {{
-                put("message", "this is message #" + number);
-                put("valid", true );
-            }};
-            String body = mapper.writeValueAsString( messageMap );
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
 
-            Response response = target("queues").path( queueName ).path( 
"messages" )
-                    .request().post( Entity.entity( body, 
MediaType.APPLICATION_JSON ));
+                final int number = i;
+                Map<String, Object> messageMap = new HashMap<String, Object>() 
{{
+                    put( "message", "this is message #" + number );
+                    put( "valid", true );
+                }};
+                String body = mapper.writeValueAsString( messageMap );
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Response response = target( "queues" ).path( queueName ).path( 
"messages" )
+                    .request().post( Entity.entity( body, 
MediaType.APPLICATION_JSON ) );
 
-        // get all messages, checking for dups
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
+            // get all messages, checking for dups
 
-        // there should be no more messages available
+            Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).path( 
"messages" ).request().get();
-        ApiResponse apiResponse = response.readEntity( ApiResponse.class );
-        Assert.assertNotNull( apiResponse.getQueueMessages() );
-        Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
+            // there should be no more messages available
+
+            Response response = target( "queues" ).path( queueName ).path( 
"messages" ).request().get();
+            ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+            Assert.assertNotNull( apiResponse.getQueueMessages() );
+            Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
 
-        // ack half of the messages
+            // ack half of the messages
 
-        int count = 0;
-        Set<UUID> ackedIds = new HashSet<>();
-        for ( UUID queueMessageId : messageIds ) {
-            response = target( "queues" )
+            int count = 0;
+            Set<UUID> ackedIds = new HashSet<>();
+            for (UUID queueMessageId : messageIds) {
+                response = target( "queues" )
                     .path( queueName ).path( "messages" ).path( 
queueMessageId.toString() ).request().delete();
-            Assert.assertEquals( 200, response.getStatus() );
-            ackedIds.add( queueMessageId );
-            if ( ++count >= numMessages/2 ) {
-                break;
+                Assert.assertEquals( 200, response.getStatus() );
+                ackedIds.add( queueMessageId );
+                if (++count >= numMessages / 2) {
+                    break;
+                }
             }
-        }
-        messageIds.removeAll( ackedIds );
+            messageIds.removeAll( ackedIds );
 
-        // wait for remaining of the messages to timeout
+            // wait for remaining of the messages to timeout
 
-        QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( 
QakkaFig.class );
-        Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 );
+            QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( 
QakkaFig.class );
+            Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        // now, the remaining messages cannot be acked because they timed out
+            // now, the remaining messages cannot be acked because they timed 
out
 
-        for ( UUID queueMessageId : messageIds ) {
-            response = target( "queues" )
+            for (UUID queueMessageId : messageIds) {
+                response = target( "queues" )
                     .path( queueName ).path( "messages" ).path( 
queueMessageId.toString() ).request().delete();
-            Assert.assertEquals( 400, response.getStatus() );
-        }
+                Assert.assertEquals( 400, response.getStatus() );
+            }
 
-        // and, those same messages should be available again in the queue
+            // and, those same messages should be available again in the queue
 
-        checkJsonMessages( queueName, numMessages/2 );
+            checkJsonMessages( queueName, numMessages / 2 );
 
-        response = target( "queues" ).path( queueName ).queryParam( "confirm", 
true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName 
).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 

Reply via email to