switch Qakka to using two keyspaces, the original replicated Applications one 
and a new un-replicated Applications Local one


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/483ca0f5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/483ca0f5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/483ca0f5

Branch: refs/heads/usergrid-1318-queue
Commit: 483ca0f5485891f2acc5d3b1a921d68c87d1d647
Parents: ee0dda4
Author: Dave Johnson <snoopd...@apache.org>
Authored: Fri Sep 16 07:48:09 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Fri Sep 16 07:48:09 2016 -0400

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  2 +
 .../usergrid/corepersistence/GuiceFactory.java  |  3 ++
 .../impl/ScopedCacheSerializationImpl.java      | 10 ++++-
 .../UniqueValueSerializationStrategyImpl.java   | 16 ++++---
 .../UniqueValueSerializationStrategyV1Impl.java | 46 +++++++++++++-------
 .../UniqueValueSerializationStrategyV2Impl.java | 34 ++++++++++-----
 .../persistence/core/CassandraConfig.java       |  3 ++
 .../persistence/core/CassandraConfigImpl.java   | 18 ++++++++
 .../usergrid/persistence/core/CassandraFig.java |  4 ++
 .../persistence/core/datastax/CQLUtils.java     |  6 ++-
 .../core/datastax/DataStaxCluster.java          |  4 ++
 .../core/datastax/TableDefinition.java          |  2 +
 .../core/datastax/impl/DataStaxClusterImpl.java | 42 ++++++++++++++++++
 .../core/datastax/impl/TableDefinitionImpl.java | 20 ++++++---
 .../impl/TableDefinitionStringImpl.java         |  9 +++-
 .../core/migration/schema/Migration.java        |  1 +
 .../migration/schema/MigrationManagerFig.java   |  4 ++
 .../migration/schema/MigrationManagerImpl.java  | 17 +++++---
 .../core/astyanax/ColumnNameIteratorTest.java   | 11 +++++
 .../MultiKeyColumnNameIteratorTest.java         | 11 +++++
 .../astyanax/MultiRowColumnIteratorTest.java    | 10 +++++
 .../persistence/core/datastax/CQLUtilsTest.java |  6 +--
 .../core/datastax/TableDefinitionTest.java      |  8 ++--
 .../core/guice/MigrationManagerRule.java        | 11 ++---
 .../map/impl/MapSerializationImpl.java          | 24 ++++++----
 stack/corepersistence/queue/pom.xml             |  3 ++
 .../usergrid/persistence/qakka/QakkaFig.java    |  2 +-
 .../persistence/qakka/core/CassandraClient.java |  5 ++-
 .../qakka/core/CassandraClientImpl.java         | 23 +++++++---
 .../MultiShardMessageIterator.java              |  2 +-
 .../impl/AuditLogSerializationImpl.java         | 12 +++--
 .../impl/MessageCounterSerializationImpl.java   | 12 +++--
 .../impl/QueueMessageSerializationImpl.java     | 28 ++++++++----
 .../queues/impl/QueueSerializationImpl.java     | 16 ++++---
 .../serialization/sharding/ShardIterator.java   |  2 +-
 .../impl/ShardCounterSerializationImpl.java     | 12 +++--
 .../sharding/impl/ShardSerializationImpl.java   | 19 +++++---
 .../impl/TransferLogSerializationImpl.java      | 16 ++++---
 .../persistence/qakka/AbstractTest.java         |  5 ++-
 .../persistence/qakka/KeyspaceDropper.java      | 20 ++++++---
 .../qakka/common/CassandraClientTest.java       |  4 +-
 .../qakka/core/QueueMessageManagerTest.java     | 14 +++---
 .../distributed/QueueActorServiceTest.java      |  2 -
 .../actors/QueueActorHelperTest.java            | 11 ++---
 .../distributed/actors/QueueReaderTest.java     |  1 -
 .../distributed/actors/QueueTimeouterTest.java  | 33 +++++++-------
 .../distributed/actors/ShardAllocatorTest.java  |  4 +-
 ...tiShardDatabaseQueueMessageIteratorTest.java | 23 +++++-----
 .../auditlogs/AuditLogSerializationTest.java    | 17 +++-----
 .../queues/DatabaseQueueSerializationTest.java  |  3 --
 .../sharding/ShardCounterSerializationTest.java | 17 ++++----
 .../sharding/ShardIteratorTest.java             | 20 +++++----
 .../sharding/ShardSerializationTest.java        | 15 ++++---
 .../sharding/ShardStrategyTest.java             |  7 ++-
 .../TransferLogSerializationTest.java           | 24 +++++-----
 .../queue/LegacyQueueManagerTest.java           |  5 ---
 .../queue/src/test/resources/qakka.properties   | 18 +++++---
 57 files changed, 477 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties 
b/stack/config/src/main/resources/usergrid-default.properties
index 34d46ad..5c7fca2 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -94,6 +94,8 @@ cassandra.keyspace.strategy=SimpleStrategy
 #
 cassandra.keyspace.replication=replication_factor:1
 
+cassandra.keyspace.local.replication=replication_factor:1
+
 # Tell Usergrid that Cassandra is not embedded.
 #
 cassandra.embedded=false

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
index a17ac48..b003c2f 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
@@ -115,6 +115,9 @@ public class GuiceFactory implements FactoryBean<Injector> {
             cpProps
                 .put( "collections.keyspace.strategy.class", 
getAndValidateProperty( "cassandra.keyspace.strategy" ) );
 
+            cpProps.put( "collections.keyspace.local.strategy.options",
+                getAndValidateProperty( "cassandra.keyspace.local.replication" 
) );
+
             cpProps.put( "collections.keyspace.strategy.options",
                 getAndValidateProperty( "cassandra.keyspace.replication" ) );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
 
b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index 7e4adb0..e2574c1 100644
--- 
a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ 
b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -280,8 +280,14 @@ public class ScopedCacheSerializationImpl<K,V> implements 
ScopedCacheSerializati
     public Collection<TableDefinition> getTables() {
 
         final TableDefinition scopedCache =
-            new TableDefinitionImpl( SCOPED_CACHE_TABLE, 
SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS,
-                SCOPED_CACHE_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
SCOPED_CACHE_CLUSTERING_ORDER);
+            new TableDefinitionImpl(
+                cassandraConfig.getApplicationKeyspace(),
+                SCOPED_CACHE_TABLE,
+                SCOPED_CACHE_PARTITION_KEYS,
+                SCOPED_CACHE_COLUMN_KEYS,
+                SCOPED_CACHE_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                SCOPED_CACHE_CLUSTERING_ORDER);
 
         return Collections.singletonList(scopedCache);
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 450c098..cf168cb 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -68,7 +68,7 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
 
     private final SerializationFig serializationFig;
-    private final CassandraFig cassandraFig;
+    protected final CassandraFig cassandraFig;
 
     private final Session session;
     private final CassandraConfig cassandraConfig;
@@ -90,8 +90,8 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
         this.session = session;
         this.cassandraConfig = cassandraConfig;
 
-        TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName();
-        TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName();
+        TABLE_UNIQUE_VALUES = getUniqueValuesTable( cassandraFig 
).getTableName();
+        TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable( cassandraFig 
).getTableName();
     }
 
     @Override
@@ -272,9 +272,11 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
 
         for ( Field field : fields ) {
 
-            //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, 
field.getTypeName().toString(), field.getName(), field.getValue())));
+            //log.info(Bytes.toHexString(getPartitionKey(applicationId, type,
+            // field.getTypeName().toString(), field.getName(), 
field.getValue())));
 
-            //partitionKeys.add(getPartitionKey(applicationId, type, 
field.getTypeName().toString(), field.getName(), field.getValue()));
+            //partitionKeys.add(getPartitionKey(applicationId, type,
+            // field.getTypeName().toString(), field.getName(), 
field.getValue()));
 
             final Clause inKey = QueryBuilder.in("key", 
getPartitionKey(applicationId, type,
                 field.getTypeName().toString(), field.getName(), 
field.getValue()) );
@@ -492,7 +494,7 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     /**
      * Get the CQL table definition for the unique values log table
      */
-    protected abstract TableDefinition getUniqueValuesTable();
+    protected abstract TableDefinition getUniqueValuesTable( CassandraFig 
cassandraFig );
 
 
     protected abstract List<Object> deserializePartitionKey(ByteBuffer bb);
@@ -514,7 +516,7 @@ public abstract class 
UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
     /**
      * Get the CQL table definition for the unique values log table
      */
-    protected abstract TableDefinition getEntityUniqueLogTable();
+    protected abstract TableDefinition getEntityUniqueLogTable( CassandraFig 
cassandraFig );
 
 
     public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, 
Iterator<UniqueValue> {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index 4ee5b70..ba3fcb0 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@ -46,9 +46,8 @@ import com.google.inject.Singleton;
  * V1 impl with unique value serialization strategy with the collection scope
  */
 @Singleton
-public class UniqueValueSerializationStrategyV1Impl  extends 
UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, 
CollectionPrefixedKey<Id>> {
-
-
+public class UniqueValueSerializationStrategyV1Impl  extends
+    UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, 
CollectionPrefixedKey<Id>> {
 
     private static final String UNIQUE_VALUES_TABLE = 
CQLUtils.quote("Unique_Values");
     private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = 
Collections.singletonList("key");
@@ -58,6 +57,7 @@ public class UniqueValueSerializationStrategyV1Impl  extends 
UniqueValueSerializ
             put( "key", DataType.Name.BLOB );
             put( "column1", DataType.Name.CUSTOM );
             put( "value", DataType.Name.BLOB ); }};
+
     private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
         new HashMap<String, String>(){{ put( "column1", "ASC" );}};
 
@@ -70,18 +70,14 @@ public class UniqueValueSerializationStrategyV1Impl  
extends UniqueValueSerializ
             put( "key", DataType.Name.BLOB );
             put( "column1", DataType.Name.CUSTOM );
             put( "value", DataType.Name.BLOB ); }};
+
     private static final Map<String, String> 
UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
         new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
 
 
-    private final static TableDefinition uniqueValues =
-        new TableDefinitionImpl( UNIQUE_VALUES_TABLE, 
UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
-            UNIQUE_VALUES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
UNIQUE_VALUES_CLUSTERING_ORDER);
-
-    private final static TableDefinition uniqueValuesLog =
-        new TableDefinitionImpl( UNIQUE_VALUES_LOG_TABLE, 
UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
-            UNIQUE_VALUES_LOG_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+    private TableDefinition uniqueValues;
 
+    private TableDefinition uniqueValuesLog;
 
 
     /**
@@ -95,7 +91,9 @@ public class UniqueValueSerializationStrategyV1Impl  extends 
UniqueValueSerializ
                                                    final SerializationFig 
serializationFig,
                                                    final Session session,
                                                    final CassandraConfig 
cassandraConfig) {
+
         super( cassandraFig, serializationFig, session, cassandraConfig );
+
     }
 
 
@@ -109,8 +107,8 @@ public class UniqueValueSerializationStrategyV1Impl  
extends UniqueValueSerializ
     @Override
     public Collection<TableDefinition> getTables() {
 
-        final TableDefinition uniqueValues = getUniqueValuesTable();
-        final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+        final TableDefinition uniqueValues = getUniqueValuesTable( 
cassandraFig );
+        final TableDefinition uniqueValuesLog = getEntityUniqueLogTable( 
cassandraFig );
 
         return Arrays.asList( uniqueValues, uniqueValuesLog );
 
@@ -119,15 +117,33 @@ public class UniqueValueSerializationStrategyV1Impl  
extends UniqueValueSerializ
 
 
     @Override
-    protected TableDefinition getUniqueValuesTable(){
+    protected TableDefinition getUniqueValuesTable( CassandraFig cassandraFig 
) {
+        if ( uniqueValues == null ) {
+
+            uniqueValues = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
+                UNIQUE_VALUES_TABLE,
+                UNIQUE_VALUES_PARTITION_KEYS,
+                UNIQUE_VALUES_COLUMN_KEYS,
+                UNIQUE_VALUES_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS, 
UNIQUE_VALUES_CLUSTERING_ORDER);
 
+        }
         return uniqueValues;
     }
 
 
     @Override
-    protected TableDefinition getEntityUniqueLogTable(){
-
+    protected TableDefinition getEntityUniqueLogTable( CassandraFig 
cassandraFig ) {
+        if ( uniqueValuesLog == null ) {
+
+            uniqueValuesLog = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
+                UNIQUE_VALUES_LOG_TABLE,
+                UNIQUE_VALUES_LOG_PARTITION_KEYS,
+                UNIQUE_VALUES_LOG_COLUMN_KEYS,
+                UNIQUE_VALUES_LOG_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+        }
         return uniqueValuesLog;
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 522dad9..aeb1720 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@ -69,13 +69,9 @@ public class UniqueValueSerializationStrategyV2Impl  extends 
UniqueValueSerializ
     private static final Map<String, String> 
UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
         new HashMap<String, String>(){{ put( "column1", "ASC" );}};
 
-    private final static TableDefinition uniqueValues =
-        new TableDefinitionImpl( UNIQUE_VALUES_TABLE, 
UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
-            UNIQUE_VALUES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
UNIQUE_VALUES_CLUSTERING_ORDER);
+    private static TableDefinition uniqueValues;
 
-    private final static TableDefinition uniqueValuesLog =
-        new TableDefinitionImpl( UNIQUE_VALUES_LOG_TABLE, 
UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
-            UNIQUE_VALUES_LOG_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+    private static TableDefinition uniqueValuesLog;
 
 
     /**
@@ -104,8 +100,8 @@ public class UniqueValueSerializationStrategyV2Impl  
extends UniqueValueSerializ
     @Override
     public Collection<TableDefinition> getTables() {
 
-        final TableDefinition uniqueValues = getUniqueValuesTable();
-        final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+        final TableDefinition uniqueValues = getUniqueValuesTable( 
cassandraFig );
+        final TableDefinition uniqueValuesLog = getEntityUniqueLogTable( 
cassandraFig );
 
         return Arrays.asList( uniqueValues, uniqueValuesLog );
 
@@ -113,13 +109,31 @@ public class UniqueValueSerializationStrategyV2Impl  
extends UniqueValueSerializ
 
 
     @Override
-    protected TableDefinition getUniqueValuesTable(){
+    protected TableDefinition getUniqueValuesTable( CassandraFig cassandraFig 
) {
+        if ( uniqueValues == null ) {
+            uniqueValues = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
+                UNIQUE_VALUES_TABLE,
+                UNIQUE_VALUES_PARTITION_KEYS,
+                UNIQUE_VALUES_COLUMN_KEYS,
+                UNIQUE_VALUES_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                UNIQUE_VALUES_CLUSTERING_ORDER);
+        }
         return uniqueValues;
     }
 
 
     @Override
-    protected TableDefinition getEntityUniqueLogTable(){
+    protected TableDefinition getEntityUniqueLogTable( CassandraFig 
cassandraFig ){
+        if ( uniqueValuesLog == null ) {
+            uniqueValuesLog = new TableDefinitionImpl(  
cassandraFig.getApplicationKeyspace(),
+                UNIQUE_VALUES_LOG_TABLE,
+                UNIQUE_VALUES_LOG_PARTITION_KEYS,
+                UNIQUE_VALUES_LOG_COLUMN_KEYS,
+                UNIQUE_VALUES_LOG_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+        }
         return uniqueValuesLog;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index 595b65f..f45ad43 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -74,4 +74,7 @@ public interface CassandraConfig {
     int[] getShardSettings();
 
 
+    String getApplicationKeyspace();
+
+    String getApplicationLocalKeyspace();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index e87ebb8..729f5b2 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -41,6 +41,9 @@ public class CassandraConfigImpl implements CassandraConfig {
     private int[] shardSettings;
     private ConsistencyLevel consistentCl;
 
+    private String applicationKeyspace;
+    private String applicationLocalKeyspace;
+
     // DataStax driver's CL
     private com.datastax.driver.core.ConsistencyLevel dataStaxReadCl;
     private com.datastax.driver.core.ConsistencyLevel dataStaxWriteCl;
@@ -65,6 +68,10 @@ public class CassandraConfigImpl implements CassandraConfig {
 
         this.dataStaxWriteCl = 
com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() );
 
+        this.applicationKeyspace = cassandraFig.getApplicationKeyspace();
+
+        this.applicationLocalKeyspace = 
cassandraFig.getApplicationLocalKeyspace();
+
         //add the listeners to update the values
         cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
             @Override
@@ -133,4 +140,15 @@ public class CassandraConfigImpl implements 
CassandraConfig {
 
       return settings;
     }
+
+    @Override
+    public String getApplicationKeyspace() {
+        return applicationKeyspace;
+    }
+
+    @Override
+    public String getApplicationLocalKeyspace() {
+        return applicationLocalKeyspace;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index 2996465..90f4ae8 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -87,6 +87,10 @@ public interface CassandraFig extends GuicyFig {
     @Default( "Usergrid_Applications" )
     String getApplicationKeyspace();
 
+    @Key( "cassandra.keyspace.application_local" )
+    @Default( "Usergrid_Applications_Local" )
+    String getApplicationLocalKeyspace();
+
     @Key( "cassandra.port" )
     @Default( "9160" )
     int getThriftPort();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
index 082f2d5..ed5c9e8 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
@@ -45,7 +45,8 @@ public class CQLUtils {
     }
 
 
-    public static String getFormattedReplication(String strategy, String 
strategyOptions) throws JsonProcessingException {
+    public static String getFormattedReplication(
+        String strategy, String strategyOptions) throws 
JsonProcessingException {
 
         Map<String, String> replicationSettings = new HashMap<>();
         replicationSettings.put("class", strategy);
@@ -86,7 +87,8 @@ public class CQLUtils {
     }
 
 
-    public static String getCachingOptions(CassandraFig cassandraFig, 
TableDefinitionImpl.CacheOption cacheOption) throws JsonProcessingException {
+    public static String getCachingOptions(
+        CassandraFig cassandraFig, TableDefinitionImpl.CacheOption 
cacheOption) throws JsonProcessingException {
 
         // Cassandra 2.0 and below has a different CQL syntax for caching
         if( Double.parseDouble( cassandraFig.getVersion() ) <= 2.0 ){

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
index ea76f92..5944b36 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
@@ -30,8 +30,12 @@ public interface DataStaxCluster {
 
     Session getApplicationSession();
 
+    Session getApplicationLocalSession();
+
     void createApplicationKeyspace() throws Exception;
 
+    void createApplicationLocalKeyspace() throws Exception;
+
     void waitForSchemaAgreement();
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
index 8178129..e1c5afb 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
@@ -28,6 +28,8 @@ public interface TableDefinition {
         CREATE, UPDATE
     }
 
+    String getKeyspace();
+
     String getTableName();
 
     String getTableCQL( CassandraFig cassandraFig, ACTION tableAction ) throws 
Exception;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index f926d1e..fe9803d 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -39,6 +39,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
     private final CassandraFig cassandraFig;
     private Cluster cluster;
     private Session applicationSession;
+    private Session queueMessageSession;
     private Session clusterSession;
 
     @Inject
@@ -90,6 +91,17 @@ public class DataStaxClusterImpl implements DataStaxCluster {
     }
 
 
+    @Override
+    public Session getApplicationLocalSession(){
+
+        // always grab cluster from getCluster() in case it was prematurely 
closed
+        if ( queueMessageSession == null || queueMessageSession.isClosed() ){
+            queueMessageSession = getCluster().connect( 
CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) );
+        }
+        return queueMessageSession;
+    }
+
+
     /**
      * Execute CQL that will create the keyspace if it doesn't exist and alter 
it if it does.
      * @throws Exception
@@ -117,6 +129,36 @@ public class DataStaxClusterImpl implements 
DataStaxCluster {
 
     }
 
+
+
+    /**
+     * Execute CQL that will create the keyspace if it doesn't exist and alter 
it if it does.
+     * @throws Exception
+     */
+    @Override
+    public void createApplicationLocalKeyspace() throws Exception {
+
+        boolean exists = getClusterSession().getCluster().getMetadata()
+            
.getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != 
null;
+
+        if (exists) {
+            return;
+        }
+
+        final String createQueueMessageKeyspace = String.format(
+            "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
+            CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
+            CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), 
cassandraFig.getStrategyOptions())
+
+        );
+
+        getClusterSession().execute(createQueueMessageKeyspace);
+
+        logger.info("Created keyspace: {}", 
cassandraFig.getApplicationLocalKeyspace());
+
+    }
+
+
     /**
      * Wait until all Cassandra nodes agree on the schema.  Sleeps 100ms 
between checks.
      *

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java
index a39c47e..f22c450 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java
@@ -40,6 +40,7 @@ public class TableDefinitionImpl implements TableDefinition {
         ALL, KEYS, ROWS, NONE
     }
 
+    private final String keyspace;
     private final String tableName;
     private final Collection<String> partitionKeys;
     private final Collection<String> columnKeys;
@@ -71,16 +72,21 @@ public class TableDefinitionImpl implements TableDefinition 
{
     static String COMPOSITE_TYPE = 
"'org.apache.cassandra.db.marshal.DynamicCompositeType(a=>org.apache.cassandra.db.marshal.AsciiType,A=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.AsciiType),b=>org.apache.cassandra.db.marshal.BytesType,B=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.BytesType),i=>org.apache.cassandra.db.marshal.IntegerType,I=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.IntegerType),l=>org.apache.cassandra.db.marshal.LongType,L=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LongType),s=>org.apache.cassandra.db.marshal.UTF8Type,S=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type),t=>org.apache.cassandra.db.marshal.TimeUUIDType,T=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType),u=>org.apache.cassandra.db.marshal.UUIDType,U=>org.apache.cassandra.db.marshal.Revers
 
edType(org.apache.cassandra.db.marshal.UUIDType),x=>org.apache.cassandra.db.marshal.LexicalUUIDType,X=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LexicalUUIDType))'";
 
 
-    public TableDefinitionImpl(final String tableName, final 
Collection<String> partitionKeys,
-                               final Collection<String> columnKeys, final 
Map<String, DataType.Name> columns,
-                               final CacheOption cacheOption, final 
Map<String, String> clusteringOrder){
+    public TableDefinitionImpl(
+        final String keyspace,
+        final String tableName,
+        final Collection<String> partitionKeys,
+        final Collection<String> columnKeys,
+        final Map<String, DataType.Name> columns,
+        final CacheOption cacheOption,
+        final Map<String, String> clusteringOrder) {
 
         Preconditions.checkNotNull(tableName, "Table name cannot be null");
         Preconditions.checkNotNull(partitionKeys, "Primary Key(s) cannot be 
null");
         Preconditions.checkNotNull(columns, "Columns cannot be null");
         Preconditions.checkNotNull(cacheOption, "CacheOption cannot be null");
 
-
+        this.keyspace = keyspace;
         this.tableName = tableName;
         this.partitionKeys = partitionKeys;
         this.columnKeys = columnKeys;
@@ -97,9 +103,11 @@ public class TableDefinitionImpl implements TableDefinition 
{
         this.compression = new HashMap<>(1);
         compression.put("sstable_compression", "LZ4Compressor");
         this.gcGraceSeconds = "864000";
+    }
 
-
-
+    @Override
+    public String getKeyspace() {
+        return keyspace;
     }
 
     public String getTableName() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java
index 8e7d854..f4a58f8 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java
@@ -25,17 +25,24 @@ import 
org.apache.usergrid.persistence.core.datastax.TableDefinition;
 
 public class TableDefinitionStringImpl implements TableDefinition {
 
+    private String keyspace;
     private String tableName;
     private String cql;
 
 
-    public TableDefinitionStringImpl( String tableName, String cql ) {
+    public TableDefinitionStringImpl( String keyspace, String tableName, 
String cql ) {
+        this.keyspace = keyspace;
         this.tableName = tableName;
         this.cql = cql;
     }
 
 
     @Override
+    public String getKeyspace() {
+        return keyspace;
+    }
+
+    @Override
     public String getTableName() {
         return tableName;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
index b2ab031..2b3e7fa 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.core.migration.schema;
 
 
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
index d8f3d1f..be2866a 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
@@ -38,4 +38,8 @@ public interface MigrationManagerFig extends GuicyFig {
     @Default("replication_factor:1")
     String getStrategyOptions();
 
+    @Key( "collections.keyspace.strategy.options" )
+    @Default("replication_factor:1")
+    String getLocalStrategyOptions();
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
index f5ec8ac..9178bde 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
@@ -73,6 +73,8 @@ public class MigrationManagerImpl implements MigrationManager 
{
 
             dataStaxCluster.createApplicationKeyspace();
 
+            dataStaxCluster.createApplicationLocalKeyspace();
+
             for ( Migration migration : migrations ) {
 
                 final Collection<MultiTenantColumnFamilyDefinition> 
columnFamilies = migration.getColumnFamilies();
@@ -143,10 +145,10 @@ public class MigrationManagerImpl implements 
MigrationManager {
     private void createTable(TableDefinition tableDefinition ) throws 
Exception {
 
         KeyspaceMetadata keyspaceMetadata = 
dataStaxCluster.getClusterSession().getCluster().getMetadata()
-            
.getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace()));
+            .getKeyspace(CQLUtils.quote( tableDefinition.getKeyspace() ) );
 
         boolean exists =  keyspaceMetadata != null
-            && keyspaceMetadata.getTable(tableDefinition.getTableName()) != 
null;
+            && keyspaceMetadata.getTable( tableDefinition.getTableName() ) != 
null;
 
         if( exists ){
             return;
@@ -156,10 +158,15 @@ public class MigrationManagerImpl implements 
MigrationManager {
         if (logger.isDebugEnabled()) {
             logger.debug(CQL);
         }
-        dataStaxCluster.getApplicationSession()
-            .execute(CQL);
 
-        logger.info("Created table: {}", tableDefinition.getTableName());
+        if ( tableDefinition.getKeyspace().equals( 
cassandraFig.getApplicationKeyspace() )) {
+            dataStaxCluster.getApplicationSession().execute( CQL );
+        } else {
+            dataStaxCluster.getApplicationLocalSession().execute( CQL );
+        }
+
+        logger.info("Created table: {} in keyspace {}",
+            tableDefinition.getTableName(), tableDefinition.getKeyspace());
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index caa6294..c45fdd1 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -115,6 +115,17 @@ public class ColumnNameIteratorTest {
             public int[] getShardSettings() {
                 return new int[]{20};
             }
+
+            @Override
+            public String getApplicationKeyspace() {
+                return cassandraFig.getApplicationKeyspace();
+            }
+
+            @Override
+            public String getApplicationLocalKeyspace() {
+                return cassandraFig.getApplicationLocalKeyspace();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index b31fa2f..5cdf0e1 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -118,6 +118,17 @@ public class MultiKeyColumnNameIteratorTest {
             public int[] getShardSettings() {
                 return new int[]{20};
             }
+
+            @Override
+            public String getApplicationKeyspace() {
+                return cassandraFig.getApplicationKeyspace();
+            }
+
+            @Override
+            public String getApplicationLocalKeyspace() {
+                return cassandraFig.getApplicationLocalKeyspace();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index ea5359e..6331941 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -122,6 +122,16 @@ public class MultiRowColumnIteratorTest {
             public int[] getShardSettings() {
                 return new int[]{20};
             }
+
+            @Override
+            public String getApplicationKeyspace() {
+                return cassandraFig.getApplicationKeyspace();
+            }
+
+            @Override
+            public String getApplicationLocalKeyspace() {
+                return cassandraFig.getApplicationLocalKeyspace();
+            }
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
index 37311ba..8531bfd 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
@@ -70,7 +70,7 @@ public class CQLUtilsTest {
 
 
 
-        TableDefinitionImpl table1 = new TableDefinitionImpl(
+        TableDefinitionImpl table1 = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
             CQLUtils.quote("table1"),
             partitionKeys,
             columnKeys,
@@ -122,7 +122,7 @@ public class CQLUtilsTest {
 
 
 
-        TableDefinitionImpl table1 = new TableDefinitionImpl(
+        TableDefinitionImpl table1 = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
             CQLUtils.quote("table1"),
             partitionKeys,
             columnKeys,
@@ -165,7 +165,7 @@ public class CQLUtilsTest {
 
 
 
-        TableDefinitionImpl table1 = new TableDefinitionImpl(
+        TableDefinitionImpl table1 = new TableDefinitionImpl( 
cassandraFig.getApplicationKeyspace(),
             CQLUtils.quote("table1"),
             partitionKeys,
             columnKeys,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
index b5f98d9..ac9167b 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
@@ -33,7 +33,7 @@ public class TableDefinitionTest {
     public void testNullTableName(){
 
         try{
-            TableDefinitionImpl table1 = new TableDefinitionImpl(null, null, 
null, null, null, null);
+            TableDefinitionImpl table1 = new TableDefinitionImpl(null, null, 
null, null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Table name cannot be null", npe.getMessage());
         }
@@ -45,7 +45,7 @@ public class TableDefinitionTest {
     public void testNullPrimaryKeys(){
 
         try{
-            TableDefinitionImpl table1 = new TableDefinitionImpl("table1", 
null, null, null, null, null);
+            TableDefinitionImpl table1 = new TableDefinitionImpl(null, 
"table1", null, null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Primary Key(s) cannot be null", npe.getMessage());
         }
@@ -57,7 +57,7 @@ public class TableDefinitionTest {
     public void testNullColumns(){
 
         try{
-            TableDefinitionImpl table1 = new TableDefinitionImpl("table1",
+            TableDefinitionImpl table1 = new TableDefinitionImpl(null, 
"table1",
                 new ArrayList<>(), null, null, null, null);
         } catch (NullPointerException npe){
             assertEquals("Columns cannot be null", npe.getMessage());
@@ -70,7 +70,7 @@ public class TableDefinitionTest {
     public void testNullCacheOption(){
 
         try{
-            TableDefinitionImpl table1 = new TableDefinitionImpl("table1",
+            TableDefinitionImpl table1 = new TableDefinitionImpl(null, 
"table1",
                 new ArrayList<>(),
                 new ArrayList<>(), new HashMap<>(), null, null);
         } catch (NullPointerException npe){

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
index 7e02a8e..4e5ac49 100644
--- 
a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
+++ 
b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java
@@ -31,8 +31,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
-/**
- */
 @Singleton
 public class MigrationManagerRule extends ExternalResource {
     private static final Logger logger = LoggerFactory.getLogger( 
MigrationManagerRule.class );
@@ -47,11 +45,10 @@ public class MigrationManagerRule extends ExternalResource {
         this.migrationManager = migrationManager;
 
         try {
-                   this.migrationManager.migrate();
-               }
-               catch ( MigrationException e ) {
-                   throw new RuntimeException(e);
-               }
+            this.migrationManager.migrate();
+        } catch ( MigrationException e ) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Inject

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
 
b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 6e50c75..ab6087d 100644
--- 
a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ 
b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -309,15 +309,21 @@ public class MapSerializationImpl implements 
MapSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
 
-        final TableDefinition mapEntries =
-            new TableDefinitionImpl( MAP_ENTRIES_TABLE, 
MAP_ENTRIES_PARTITION_KEYS, MAP_ENTRIES_COLUMN_KEYS,
-                MAP_ENTRIES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
MAP_ENTRIES_CLUSTERING_ORDER);
-
-        final TableDefinition mapKeys =
-            new TableDefinitionImpl( MAP_KEYS_TABLE, MAP_KEYS_PARTITION_KEYS, 
MAP_KEYS_COLUMN_KEYS,
-                MAP_KEYS_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, 
MAP_KEYS_CLUSTERING_ORDER);
-
-
+        final TableDefinition mapEntries = new TableDefinitionImpl( 
cassandraConfig.getApplicationKeyspace(),
+            MAP_ENTRIES_TABLE,
+            MAP_ENTRIES_PARTITION_KEYS,
+            MAP_ENTRIES_COLUMN_KEYS,
+            MAP_ENTRIES_COLUMNS,
+            TableDefinitionImpl.CacheOption.KEYS,
+            MAP_ENTRIES_CLUSTERING_ORDER);
+
+        final TableDefinition mapKeys = new TableDefinitionImpl( 
cassandraConfig.getApplicationKeyspace(),
+            MAP_KEYS_TABLE,
+            MAP_KEYS_PARTITION_KEYS,
+            MAP_KEYS_COLUMN_KEYS,
+            MAP_KEYS_COLUMNS,
+            TableDefinitionImpl.CacheOption.KEYS,
+            MAP_KEYS_CLUSTERING_ORDER);
 
         return Arrays.asList( mapEntries, mapKeys );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml 
b/stack/corepersistence/queue/pom.xml
index 488d9f2..153ed4b 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -66,6 +66,9 @@
                     <configuration>
                         <forkCount>0</forkCount>
                         <threadCount>0</threadCount>
+                        <argLine>
+                        -Xms2G -Xmx4G 
-Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties
+                        </argLine>
                     </configuration>
                 </plugin>
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 6f3df11..3b901b2 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -38,7 +38,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_TIMEOUT_SECONDS                  = "queue.timeout.seconds";
 
-    String QUEUE_REFRESH_MILLISECONDS             = 
"queue.refresh.milliseconds";
+    String QUEUE_REFRESH_MILLISECONDS             = "queue.refresh.millis";
 
     String QUEUE_INMEMORY_SIZE                    = "queue.inmemory.size";
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
index 9f40b51..d611feb 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java
@@ -25,5 +25,8 @@ import com.datastax.driver.core.Session;
  * Created by Dave Johnson (snoopd...@apache.org) on 9/9/16.
  */
 public interface CassandraClient {
-    Session getSession();
+
+    Session getApplicationSession();
+
+    Session getQueueMessageSession();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
index ed665c2..d1ad442 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java
@@ -31,19 +31,32 @@ import org.slf4j.LoggerFactory;
 public class CassandraClientImpl implements CassandraClient {
     private static final Logger logger = LoggerFactory.getLogger( 
CassandraClientImpl.class );
 
-    private final Session session;
+    private final DataStaxCluster dataStaxCluster;
+    private Session applicationSession = null;
+    private Session queueMessageSession = null;
+
 
     @Inject
     public CassandraClientImpl( DataStaxCluster dataStaxCluster) {
-
         logger.info("Constructing Cassandra client");
+        this.dataStaxCluster = dataStaxCluster;
+    }
 
-        this.session = dataStaxCluster.getApplicationSession();
+
+    @Override
+    public Session getApplicationSession() {
+        if ( applicationSession == null ) {
+            applicationSession = dataStaxCluster.getApplicationSession();
+        }
+        return applicationSession;
     }
 
 
     @Override
-    public Session getSession() {
-        return session;
+    public Session getQueueMessageSession() {
+        if ( queueMessageSession == null ) {
+            queueMessageSession = dataStaxCluster.getApplicationLocalSession();
+        }
+        return queueMessageSession;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 42557e6..6ec0774 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -143,7 +143,7 @@ public class MultiShardMessageIterator implements 
Iterator<DatabaseQueueMessage>
                     .limit(PAGE_SIZE);
         }
 
-        List<Row> rows = cassandraClient.getSession().execute(query).all();
+        List<Row> rows = 
cassandraClient.getQueueMessageSession().execute(query).all();
 
         if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
index d9dbab6..ddbd345 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
@@ -24,6 +24,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -46,6 +47,7 @@ public class AuditLogSerializationImpl implements 
AuditLogSerialization {
     private static final Logger logger = LoggerFactory.getLogger( 
AuditLogSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     public final static String TABLE_AUDIT_LOG   = "audit_log";
 
@@ -75,7 +77,8 @@ public class AuditLogSerializationImpl implements 
AuditLogSerialization {
 
 
     @Inject
-    public AuditLogSerializationImpl( CassandraClient cassandraClient ) {
+    public AuditLogSerializationImpl( CassandraFig cassandraFig, 
CassandraClient cassandraClient ) {
+        this.cassandraFig = cassandraFig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -97,7 +100,7 @@ public class AuditLogSerializationImpl implements 
AuditLogSerialization {
                 .value(COLUMN_MESSAGE_ID, messageId )
                 .value(COLUMN_QUEUE_MESSAGE_ID, queueMessageId )
                 .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
-        cassandraClient.getSession().execute(insert);
+        cassandraClient.getApplicationSession().execute(insert);
     }
 
 
@@ -107,7 +110,7 @@ public class AuditLogSerializationImpl implements 
AuditLogSerialization {
         Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG)
             .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) );
 
-        ResultSet rs = cassandraClient.getSession().execute( query );
+        ResultSet rs = cassandraClient.getApplicationSession().execute( query 
);
 
         final List<AuditLog> auditLogs = rs.all().stream().map( row ->
             new AuditLog(
@@ -143,6 +146,7 @@ public class AuditLogSerializationImpl implements 
AuditLogSerialization {
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList( new TableDefinitionStringImpl( 
TABLE_AUDIT_LOG, CQL ) );
+        return Collections.singletonList(
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 65ffc47..5206ec7 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -24,6 +24,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -50,6 +51,7 @@ public class MessageCounterSerializationImpl implements 
ShardCounterSerializatio
     private static final Logger logger = LoggerFactory.getLogger( 
MessageCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     final static String TABLE_SHARD_COUNTERS       = "counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
@@ -93,7 +95,8 @@ public class MessageCounterSerializationImpl implements 
ShardCounterSerializatio
 
 
     @Inject
-    public MessageCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient 
cassandraClient ) {
+    public MessageCounterSerializationImpl( CassandraFig cassandraFig, 
QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+        this.cassandraFig = cassandraFig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;
     }
@@ -166,7 +169,7 @@ public class MessageCounterSerializationImpl implements 
ShardCounterSerializatio
                 .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() 
) )
                 .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
                 .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
-        cassandraClient.getSession().execute( update );
+        cassandraClient.getQueueMessageSession().execute( update );
     }
 
 
@@ -177,7 +180,7 @@ public class MessageCounterSerializationImpl implements 
ShardCounterSerializatio
                 .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
                 .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
 
-        ResultSet resultSet = cassandraClient.getSession().execute( query );
+        ResultSet resultSet = 
cassandraClient.getQueueMessageSession().execute( query );
         List<Row> all = resultSet.all();
 
         if ( all.size() > 1 ) {
@@ -198,7 +201,8 @@ public class MessageCounterSerializationImpl implements 
ShardCounterSerializatio
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList( new TableDefinitionStringImpl( 
TABLE_SHARD_COUNTERS, CQL ) );
+        return Collections.singletonList(
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index f55b936..99ff783 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -52,6 +53,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
@@ -107,11 +109,13 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
     @Inject
     public QueueMessageSerializationImpl(
+            CassandraFig              cassandraFig,
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
             CassandraClient           cassandraClient
         ) {
+        this.cassandraFig              = cassandraFig;
         this.actorSystemFig            = actorSystemFig;
         this.shardStrategy             = shardStrategy;
         this.shardCounterSerialization = shardCounterSerialization;
@@ -149,7 +153,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .value( COLUMN_INFLIGHT_AT,      inflightAt )
                 .value( COLUMN_QUEUED_AT,        queuedAt);
 
-        cassandraClient.getSession().execute(insert);
+        cassandraClient.getQueueMessageSession().execute(insert);
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), 
shardType, message.getShardId(), 1 );
 
@@ -191,7 +195,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .and(shardIdClause)
                 .and(queueMessageIdClause);
 
-        Row row = cassandraClient.getSession().execute(select).one();
+        Row row = 
cassandraClient.getQueueMessageSession().execute(select).one();
 
         if (row == null) {
             return null;
@@ -240,7 +244,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .and(shardIdClause)
                 .and(queueMessageIdClause);
 
-        ResultSet resultSet = cassandraClient.getSession().execute( delete );
+        ResultSet resultSet = 
cassandraClient.getQueueMessageSession().execute( delete );
 
         String s = "s";
     }
@@ -253,7 +257,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
         Statement select = QueryBuilder.select().from( 
TABLE_MESSAGE_DATA).where(messageIdClause);
 
-        Row row = cassandraClient.getSession().execute(select).one();
+        Row row = 
cassandraClient.getApplicationSession().execute(select).one();
         if ( row == null ) {
             return null;
         }
@@ -273,7 +277,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
                 .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
 
-        cassandraClient.getSession().execute(insert);
+        cassandraClient.getApplicationSession().execute(insert);
     }
 
 
@@ -285,7 +289,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
         Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
                 .where(messageIdClause);
 
-        cassandraClient.getSession().execute(delete);
+        cassandraClient.getApplicationSession().execute(delete);
     }
 
 
@@ -311,9 +315,15 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
     @Override
     public Collection<TableDefinition> getTables() {
         return Lists.newArrayList(
-                new TableDefinitionStringImpl( TABLE_MESSAGES_AVAILABLE, 
MESSAGES_AVAILABLE ),
-                new TableDefinitionStringImpl( TABLE_MESSAGES_INFLIGHT, 
MESSAGES_INFLIGHT ),
-                new TableDefinitionStringImpl( TABLE_MESSAGE_DATA, 
MESSAGE_DATA )
+
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(),
+                TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ),
+
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(),
+                TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ),
+
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationKeyspace(),
+                TABLE_MESSAGE_DATA, MESSAGE_DATA )
         );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index 932097a..07a201c 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -26,6 +26,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -46,6 +47,7 @@ public class QueueSerializationImpl implements 
QueueSerialization {
     private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     public final static String COLUMN_QUEUE_NAME = "queue_name";
     public final static String COLUMN_REGIONS = "regions";
@@ -72,7 +74,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
 
 
     @Inject
-    public QueueSerializationImpl( CassandraClient cassandraClient ) {
+    public QueueSerializationImpl( CassandraFig cassandraFig,  CassandraClient 
cassandraClient ) {
+        this.cassandraFig = cassandraFig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -90,7 +93,7 @@ public class QueueSerializationImpl implements 
QueueSerialization {
                 .value(COLUMN_DEAD_LETTER_QUEUE, queue.getDeadLetterQueue());
 
 
-        cassandraClient.getSession().execute(insert);
+        cassandraClient.getApplicationSession().execute(insert);
 
     }
 
@@ -102,7 +105,7 @@ public class QueueSerializationImpl implements 
QueueSerialization {
         Statement query = QueryBuilder.select().all().from(TABLE_QUEUES)
                 .where(queueNameClause);
 
-        Row row = cassandraClient.getSession().execute(query).one();
+        Row row = cassandraClient.getApplicationSession().execute(query).one();
 
         if(row == null){
             return null;
@@ -129,14 +132,14 @@ public class QueueSerializationImpl implements 
QueueSerialization {
         Statement delete = QueryBuilder.delete().from(TABLE_QUEUES)
                 .where(queueNameClause);
 
-        cassandraClient.getSession().execute(delete);
+        cassandraClient.getApplicationSession().execute(delete);
     }
 
     @Override
     public List<String> getListOfQueues() {
 
         Statement select = QueryBuilder.select().all().from( TABLE_QUEUES );
-        ResultSet rs = cassandraClient.getSession().execute( select );
+        ResultSet rs = cassandraClient.getApplicationSession().execute( select 
);
 
         return rs.all().stream()
                 .map( row -> row.getString( COLUMN_QUEUE_NAME ))
@@ -151,7 +154,8 @@ public class QueueSerializationImpl implements 
QueueSerialization {
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList( new TableDefinitionStringImpl( 
"queues", CQL ) );
+        return Collections.singletonList(
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationKeyspace(), "queues", CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
index 31e31ce..402d429 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
@@ -108,7 +108,7 @@ public class ShardIterator implements Iterator<Shard> {
                 .and(shardIdClause)
                 .limit(PAGE_SIZE);
 
-        List<Row> rows = cassandraClient.getSession().execute(query).all();
+        List<Row> rows = 
cassandraClient.getQueueMessageSession().execute(query).all();
 
 
         currentIterator = getIteratorFromRows(rows);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index 9158412..f14d234 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -24,6 +24,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -50,6 +51,7 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
     private static final Logger logger = LoggerFactory.getLogger( 
ShardCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     final static String TABLE_COUNTERS       = "shard_counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
@@ -89,7 +91,8 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
 
     @Inject
-    public ShardCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient 
cassandraClient ) {
+    public ShardCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig 
qakkaFig, CassandraClient cassandraClient ) {
+        this.cassandraFig = cassandraFig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;
     }
@@ -162,7 +165,7 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
                 .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() 
) )
                 .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
                 .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
-        cassandraClient.getSession().execute( update );
+        cassandraClient.getQueueMessageSession().execute( update );
     }
 
 
@@ -173,7 +176,7 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
                 .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
                 .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
 
-        ResultSet resultSet = cassandraClient.getSession().execute( query );
+        ResultSet resultSet = 
cassandraClient.getQueueMessageSession().execute( query );
         List<Row> all = resultSet.all();
 
         if ( all.size() > 1 ) {
@@ -193,6 +196,7 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList( new TableDefinitionStringImpl( 
"shard_counters", CQL ) );
+        return Collections.singletonList(
+            new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(), "shard_counters", CQL ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
index 7b9fd8e..989622b 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -26,6 +26,7 @@ import com.datastax.driver.core.querybuilder.Clause;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -45,6 +46,7 @@ public class ShardSerializationImpl implements 
ShardSerialization {
     private static final Logger logger = LoggerFactory.getLogger( 
ShardSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
+    private final CassandraFig cassandraFig;
 
     public final static String COLUMN_QUEUE_NAME = "queue_name";
     public final static String COLUMN_REGION = "region";
@@ -80,7 +82,8 @@ public class ShardSerializationImpl implements 
ShardSerialization {
 
 
     @Inject
-    public ShardSerializationImpl( CassandraClient cassandraClient ) {
+    public ShardSerializationImpl( CassandraFig cassandraFig,  CassandraClient 
cassandraClient ) {
+        this.cassandraFig = cassandraFig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -93,7 +96,7 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .value(COLUMN_ACTIVE, 1)
                 .value(COLUMN_POINTER, shard.getPointer());
 
-        cassandraClient.getSession().execute(insert);
+        cassandraClient.getQueueMessageSession().execute(insert);
 
     }
 
@@ -112,7 +115,7 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .and(activeClause)
                 .and(shardIdClause);
 
-        Row row = cassandraClient.getSession().execute(select).one();
+        Row row = 
cassandraClient.getQueueMessageSession().execute(select).one();
 
         if (row == null){
             return null;
@@ -145,7 +148,7 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .and(activeClause)
                 .and(shardIdClause);
 
-        cassandraClient.getSession().execute(delete);
+        cassandraClient.getQueueMessageSession().execute(delete);
 
     }
 
@@ -165,7 +168,7 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .and(activeClause)
                 .and(shardIdClause);
 
-        cassandraClient.getSession().execute(update);
+        cassandraClient.getQueueMessageSession().execute(update);
 
     }
 
@@ -192,8 +195,10 @@ public class ShardSerializationImpl implements 
ShardSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
         return Lists.newArrayList(
-                new TableDefinitionStringImpl( 
TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ),
-                new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_INFLIGHT, 
SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
+                new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(),
+                    TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE 
),
+                new TableDefinitionStringImpl( 
cassandraFig.getApplicationLocalKeyspace(),
+                    TABLE_SHARDS_MESSAGES_INFLIGHT, 
SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
         );
     }
 

Reply via email to