switch Qakka to using two keyspaces, the original replicated Applications one and a new un-replicated Applications Local one
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/483ca0f5 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/483ca0f5 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/483ca0f5 Branch: refs/heads/usergrid-1318-queue Commit: 483ca0f5485891f2acc5d3b1a921d68c87d1d647 Parents: ee0dda4 Author: Dave Johnson <snoopd...@apache.org> Authored: Fri Sep 16 07:48:09 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Fri Sep 16 07:48:09 2016 -0400 ---------------------------------------------------------------------- .../main/resources/usergrid-default.properties | 2 + .../usergrid/corepersistence/GuiceFactory.java | 3 ++ .../impl/ScopedCacheSerializationImpl.java | 10 ++++- .../UniqueValueSerializationStrategyImpl.java | 16 ++++--- .../UniqueValueSerializationStrategyV1Impl.java | 46 +++++++++++++------- .../UniqueValueSerializationStrategyV2Impl.java | 34 ++++++++++----- .../persistence/core/CassandraConfig.java | 3 ++ .../persistence/core/CassandraConfigImpl.java | 18 ++++++++ .../usergrid/persistence/core/CassandraFig.java | 4 ++ .../persistence/core/datastax/CQLUtils.java | 6 ++- .../core/datastax/DataStaxCluster.java | 4 ++ .../core/datastax/TableDefinition.java | 2 + .../core/datastax/impl/DataStaxClusterImpl.java | 42 ++++++++++++++++++ .../core/datastax/impl/TableDefinitionImpl.java | 20 ++++++--- .../impl/TableDefinitionStringImpl.java | 9 +++- .../core/migration/schema/Migration.java | 1 + .../migration/schema/MigrationManagerFig.java | 4 ++ .../migration/schema/MigrationManagerImpl.java | 17 +++++--- .../core/astyanax/ColumnNameIteratorTest.java | 11 +++++ .../MultiKeyColumnNameIteratorTest.java | 11 +++++ .../astyanax/MultiRowColumnIteratorTest.java | 10 +++++ .../persistence/core/datastax/CQLUtilsTest.java | 6 +-- .../core/datastax/TableDefinitionTest.java | 8 ++-- .../core/guice/MigrationManagerRule.java | 11 ++--- .../map/impl/MapSerializationImpl.java | 24 ++++++---- stack/corepersistence/queue/pom.xml | 3 ++ .../usergrid/persistence/qakka/QakkaFig.java | 2 +- .../persistence/qakka/core/CassandraClient.java | 5 ++- .../qakka/core/CassandraClientImpl.java | 23 +++++++--- .../MultiShardMessageIterator.java | 2 +- .../impl/AuditLogSerializationImpl.java | 12 +++-- .../impl/MessageCounterSerializationImpl.java | 12 +++-- .../impl/QueueMessageSerializationImpl.java | 28 ++++++++---- .../queues/impl/QueueSerializationImpl.java | 16 ++++--- .../serialization/sharding/ShardIterator.java | 2 +- .../impl/ShardCounterSerializationImpl.java | 12 +++-- .../sharding/impl/ShardSerializationImpl.java | 19 +++++--- .../impl/TransferLogSerializationImpl.java | 16 ++++--- .../persistence/qakka/AbstractTest.java | 5 ++- .../persistence/qakka/KeyspaceDropper.java | 20 ++++++--- .../qakka/common/CassandraClientTest.java | 4 +- .../qakka/core/QueueMessageManagerTest.java | 14 +++--- .../distributed/QueueActorServiceTest.java | 2 - .../actors/QueueActorHelperTest.java | 11 ++--- .../distributed/actors/QueueReaderTest.java | 1 - .../distributed/actors/QueueTimeouterTest.java | 33 +++++++------- .../distributed/actors/ShardAllocatorTest.java | 4 +- ...tiShardDatabaseQueueMessageIteratorTest.java | 23 +++++----- .../auditlogs/AuditLogSerializationTest.java | 17 +++----- .../queues/DatabaseQueueSerializationTest.java | 3 -- .../sharding/ShardCounterSerializationTest.java | 17 ++++---- .../sharding/ShardIteratorTest.java | 20 +++++---- .../sharding/ShardSerializationTest.java | 15 ++++--- .../sharding/ShardStrategyTest.java | 7 ++- .../TransferLogSerializationTest.java | 24 +++++----- .../queue/LegacyQueueManagerTest.java | 5 --- .../queue/src/test/resources/qakka.properties | 18 +++++--- 57 files changed, 477 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/config/src/main/resources/usergrid-default.properties ---------------------------------------------------------------------- diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties index 34d46ad..5c7fca2 100644 --- a/stack/config/src/main/resources/usergrid-default.properties +++ b/stack/config/src/main/resources/usergrid-default.properties @@ -94,6 +94,8 @@ cassandra.keyspace.strategy=SimpleStrategy # cassandra.keyspace.replication=replication_factor:1 +cassandra.keyspace.local.replication=replication_factor:1 + # Tell Usergrid that Cassandra is not embedded. # cassandra.embedded=false http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java index a17ac48..b003c2f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java @@ -115,6 +115,9 @@ public class GuiceFactory implements FactoryBean<Injector> { cpProps .put( "collections.keyspace.strategy.class", getAndValidateProperty( "cassandra.keyspace.strategy" ) ); + cpProps.put( "collections.keyspace.local.strategy.options", + getAndValidateProperty( "cassandra.keyspace.local.replication" ) ); + cpProps.put( "collections.keyspace.strategy.options", getAndValidateProperty( "cassandra.keyspace.replication" ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java index 7e4adb0..e2574c1 100644 --- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java +++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java @@ -280,8 +280,14 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati public Collection<TableDefinition> getTables() { final TableDefinition scopedCache = - new TableDefinitionImpl( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS, - SCOPED_CACHE_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER); + new TableDefinitionImpl( + cassandraConfig.getApplicationKeyspace(), + SCOPED_CACHE_TABLE, + SCOPED_CACHE_PARTITION_KEYS, + SCOPED_CACHE_COLUMN_KEYS, + SCOPED_CACHE_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + SCOPED_CACHE_CLUSTERING_ORDER); return Collections.singletonList(scopedCache); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index 450c098..cf168cb 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -68,7 +68,7 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> private final SerializationFig serializationFig; - private final CassandraFig cassandraFig; + protected final CassandraFig cassandraFig; private final Session session; private final CassandraConfig cassandraConfig; @@ -90,8 +90,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> this.session = session; this.cassandraConfig = cassandraConfig; - TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName(); - TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName(); + TABLE_UNIQUE_VALUES = getUniqueValuesTable( cassandraFig ).getTableName(); + TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable( cassandraFig ).getTableName(); } @Override @@ -272,9 +272,11 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> for ( Field field : fields ) { - //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()))); + //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, + // field.getTypeName().toString(), field.getName(), field.getValue()))); - //partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())); + //partitionKeys.add(getPartitionKey(applicationId, type, + // field.getTypeName().toString(), field.getName(), field.getValue())); final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()) ); @@ -492,7 +494,7 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> /** * Get the CQL table definition for the unique values log table */ - protected abstract TableDefinition getUniqueValuesTable(); + protected abstract TableDefinition getUniqueValuesTable( CassandraFig cassandraFig ); protected abstract List<Object> deserializePartitionKey(ByteBuffer bb); @@ -514,7 +516,7 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> /** * Get the CQL table definition for the unique values log table */ - protected abstract TableDefinition getEntityUniqueLogTable(); + protected abstract TableDefinition getEntityUniqueLogTable( CassandraFig cassandraFig ); public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> { http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java index 4ee5b70..ba3fcb0 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java @@ -46,9 +46,8 @@ import com.google.inject.Singleton; * V1 impl with unique value serialization strategy with the collection scope */ @Singleton -public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> { - - +public class UniqueValueSerializationStrategyV1Impl extends + UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> { private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values"); private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key"); @@ -58,6 +57,7 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ put( "key", DataType.Name.BLOB ); put( "column1", DataType.Name.CUSTOM ); put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER = new HashMap<String, String>(){{ put( "column1", "ASC" );}}; @@ -70,18 +70,14 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ put( "key", DataType.Name.BLOB ); put( "column1", DataType.Name.CUSTOM ); put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER = new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; - private final static TableDefinition uniqueValues = - new TableDefinitionImpl( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS, - UNIQUE_VALUES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER); - - private final static TableDefinition uniqueValuesLog = - new TableDefinitionImpl( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS, - UNIQUE_VALUES_LOG_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + private TableDefinition uniqueValues; + private TableDefinition uniqueValuesLog; /** @@ -95,7 +91,9 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ final SerializationFig serializationFig, final Session session, final CassandraConfig cassandraConfig) { + super( cassandraFig, serializationFig, session, cassandraConfig ); + } @@ -109,8 +107,8 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ @Override public Collection<TableDefinition> getTables() { - final TableDefinition uniqueValues = getUniqueValuesTable(); - final TableDefinition uniqueValuesLog = getEntityUniqueLogTable(); + final TableDefinition uniqueValues = getUniqueValuesTable( cassandraFig ); + final TableDefinition uniqueValuesLog = getEntityUniqueLogTable( cassandraFig ); return Arrays.asList( uniqueValues, uniqueValuesLog ); @@ -119,15 +117,33 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ @Override - protected TableDefinition getUniqueValuesTable(){ + protected TableDefinition getUniqueValuesTable( CassandraFig cassandraFig ) { + if ( uniqueValues == null ) { + + uniqueValues = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), + UNIQUE_VALUES_TABLE, + UNIQUE_VALUES_PARTITION_KEYS, + UNIQUE_VALUES_COLUMN_KEYS, + UNIQUE_VALUES_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER); + } return uniqueValues; } @Override - protected TableDefinition getEntityUniqueLogTable(){ - + protected TableDefinition getEntityUniqueLogTable( CassandraFig cassandraFig ) { + if ( uniqueValuesLog == null ) { + + uniqueValuesLog = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), + UNIQUE_VALUES_LOG_TABLE, + UNIQUE_VALUES_LOG_PARTITION_KEYS, + UNIQUE_VALUES_LOG_COLUMN_KEYS, + UNIQUE_VALUES_LOG_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + } return uniqueValuesLog; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java index 522dad9..aeb1720 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java @@ -69,13 +69,9 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER = new HashMap<String, String>(){{ put( "column1", "ASC" );}}; - private final static TableDefinition uniqueValues = - new TableDefinitionImpl( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS, - UNIQUE_VALUES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER); + private static TableDefinition uniqueValues; - private final static TableDefinition uniqueValuesLog = - new TableDefinitionImpl( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS, - UNIQUE_VALUES_LOG_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + private static TableDefinition uniqueValuesLog; /** @@ -104,8 +100,8 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ @Override public Collection<TableDefinition> getTables() { - final TableDefinition uniqueValues = getUniqueValuesTable(); - final TableDefinition uniqueValuesLog = getEntityUniqueLogTable(); + final TableDefinition uniqueValues = getUniqueValuesTable( cassandraFig ); + final TableDefinition uniqueValuesLog = getEntityUniqueLogTable( cassandraFig ); return Arrays.asList( uniqueValues, uniqueValuesLog ); @@ -113,13 +109,31 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ @Override - protected TableDefinition getUniqueValuesTable(){ + protected TableDefinition getUniqueValuesTable( CassandraFig cassandraFig ) { + if ( uniqueValues == null ) { + uniqueValues = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), + UNIQUE_VALUES_TABLE, + UNIQUE_VALUES_PARTITION_KEYS, + UNIQUE_VALUES_COLUMN_KEYS, + UNIQUE_VALUES_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + UNIQUE_VALUES_CLUSTERING_ORDER); + } return uniqueValues; } @Override - protected TableDefinition getEntityUniqueLogTable(){ + protected TableDefinition getEntityUniqueLogTable( CassandraFig cassandraFig ){ + if ( uniqueValuesLog == null ) { + uniqueValuesLog = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), + UNIQUE_VALUES_LOG_TABLE, + UNIQUE_VALUES_LOG_PARTITION_KEYS, + UNIQUE_VALUES_LOG_COLUMN_KEYS, + UNIQUE_VALUES_LOG_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + } return uniqueValuesLog; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java index 595b65f..f45ad43 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java @@ -74,4 +74,7 @@ public interface CassandraConfig { int[] getShardSettings(); + String getApplicationKeyspace(); + + String getApplicationLocalKeyspace(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java index e87ebb8..729f5b2 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java @@ -41,6 +41,9 @@ public class CassandraConfigImpl implements CassandraConfig { private int[] shardSettings; private ConsistencyLevel consistentCl; + private String applicationKeyspace; + private String applicationLocalKeyspace; + // DataStax driver's CL private com.datastax.driver.core.ConsistencyLevel dataStaxReadCl; private com.datastax.driver.core.ConsistencyLevel dataStaxWriteCl; @@ -65,6 +68,10 @@ public class CassandraConfigImpl implements CassandraConfig { this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() ); + this.applicationKeyspace = cassandraFig.getApplicationKeyspace(); + + this.applicationLocalKeyspace = cassandraFig.getApplicationLocalKeyspace(); + //add the listeners to update the values cassandraFig.addPropertyChangeListener( new PropertyChangeListener() { @Override @@ -133,4 +140,15 @@ public class CassandraConfigImpl implements CassandraConfig { return settings; } + + @Override + public String getApplicationKeyspace() { + return applicationKeyspace; + } + + @Override + public String getApplicationLocalKeyspace() { + return applicationLocalKeyspace; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java index 2996465..90f4ae8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java @@ -87,6 +87,10 @@ public interface CassandraFig extends GuicyFig { @Default( "Usergrid_Applications" ) String getApplicationKeyspace(); + @Key( "cassandra.keyspace.application_local" ) + @Default( "Usergrid_Applications_Local" ) + String getApplicationLocalKeyspace(); + @Key( "cassandra.port" ) @Default( "9160" ) int getThriftPort(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java index 082f2d5..ed5c9e8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java @@ -45,7 +45,8 @@ public class CQLUtils { } - public static String getFormattedReplication(String strategy, String strategyOptions) throws JsonProcessingException { + public static String getFormattedReplication( + String strategy, String strategyOptions) throws JsonProcessingException { Map<String, String> replicationSettings = new HashMap<>(); replicationSettings.put("class", strategy); @@ -86,7 +87,8 @@ public class CQLUtils { } - public static String getCachingOptions(CassandraFig cassandraFig, TableDefinitionImpl.CacheOption cacheOption) throws JsonProcessingException { + public static String getCachingOptions( + CassandraFig cassandraFig, TableDefinitionImpl.CacheOption cacheOption) throws JsonProcessingException { // Cassandra 2.0 and below has a different CQL syntax for caching if( Double.parseDouble( cassandraFig.getVersion() ) <= 2.0 ){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java index ea76f92..5944b36 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java @@ -30,8 +30,12 @@ public interface DataStaxCluster { Session getApplicationSession(); + Session getApplicationLocalSession(); + void createApplicationKeyspace() throws Exception; + void createApplicationLocalKeyspace() throws Exception; + void waitForSchemaAgreement(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java index 8178129..e1c5afb 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java @@ -28,6 +28,8 @@ public interface TableDefinition { CREATE, UPDATE } + String getKeyspace(); + String getTableName(); String getTableCQL( CassandraFig cassandraFig, ACTION tableAction ) throws Exception; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java index f926d1e..fe9803d 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java @@ -39,6 +39,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { private final CassandraFig cassandraFig; private Cluster cluster; private Session applicationSession; + private Session queueMessageSession; private Session clusterSession; @Inject @@ -90,6 +91,17 @@ public class DataStaxClusterImpl implements DataStaxCluster { } + @Override + public Session getApplicationLocalSession(){ + + // always grab cluster from getCluster() in case it was prematurely closed + if ( queueMessageSession == null || queueMessageSession.isClosed() ){ + queueMessageSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) ); + } + return queueMessageSession; + } + + /** * Execute CQL that will create the keyspace if it doesn't exist and alter it if it does. * @throws Exception @@ -117,6 +129,36 @@ public class DataStaxClusterImpl implements DataStaxCluster { } + + + /** + * Execute CQL that will create the keyspace if it doesn't exist and alter it if it does. + * @throws Exception + */ + @Override + public void createApplicationLocalKeyspace() throws Exception { + + boolean exists = getClusterSession().getCluster().getMetadata() + .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != null; + + if (exists) { + return; + } + + final String createQueueMessageKeyspace = String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s", + CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()), + CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()) + + ); + + getClusterSession().execute(createQueueMessageKeyspace); + + logger.info("Created keyspace: {}", cassandraFig.getApplicationLocalKeyspace()); + + } + + /** * Wait until all Cassandra nodes agree on the schema. Sleeps 100ms between checks. * http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java index a39c47e..f22c450 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionImpl.java @@ -40,6 +40,7 @@ public class TableDefinitionImpl implements TableDefinition { ALL, KEYS, ROWS, NONE } + private final String keyspace; private final String tableName; private final Collection<String> partitionKeys; private final Collection<String> columnKeys; @@ -71,16 +72,21 @@ public class TableDefinitionImpl implements TableDefinition { static String COMPOSITE_TYPE = "'org.apache.cassandra.db.marshal.DynamicCompositeType(a=>org.apache.cassandra.db.marshal.AsciiType,A=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.AsciiType),b=>org.apache.cassandra.db.marshal.BytesType,B=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.BytesType),i=>org.apache.cassandra.db.marshal.IntegerType,I=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.IntegerType),l=>org.apache.cassandra.db.marshal.LongType,L=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LongType),s=>org.apache.cassandra.db.marshal.UTF8Type,S=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type),t=>org.apache.cassandra.db.marshal.TimeUUIDType,T=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType),u=>org.apache.cassandra.db.marshal.UUIDType,U=>org.apache.cassandra.db.marshal.Revers edType(org.apache.cassandra.db.marshal.UUIDType),x=>org.apache.cassandra.db.marshal.LexicalUUIDType,X=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LexicalUUIDType))'"; - public TableDefinitionImpl(final String tableName, final Collection<String> partitionKeys, - final Collection<String> columnKeys, final Map<String, DataType.Name> columns, - final CacheOption cacheOption, final Map<String, String> clusteringOrder){ + public TableDefinitionImpl( + final String keyspace, + final String tableName, + final Collection<String> partitionKeys, + final Collection<String> columnKeys, + final Map<String, DataType.Name> columns, + final CacheOption cacheOption, + final Map<String, String> clusteringOrder) { Preconditions.checkNotNull(tableName, "Table name cannot be null"); Preconditions.checkNotNull(partitionKeys, "Primary Key(s) cannot be null"); Preconditions.checkNotNull(columns, "Columns cannot be null"); Preconditions.checkNotNull(cacheOption, "CacheOption cannot be null"); - + this.keyspace = keyspace; this.tableName = tableName; this.partitionKeys = partitionKeys; this.columnKeys = columnKeys; @@ -97,9 +103,11 @@ public class TableDefinitionImpl implements TableDefinition { this.compression = new HashMap<>(1); compression.put("sstable_compression", "LZ4Compressor"); this.gcGraceSeconds = "864000"; + } - - + @Override + public String getKeyspace() { + return keyspace; } public String getTableName() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java index 8e7d854..f4a58f8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/TableDefinitionStringImpl.java @@ -25,17 +25,24 @@ import org.apache.usergrid.persistence.core.datastax.TableDefinition; public class TableDefinitionStringImpl implements TableDefinition { + private String keyspace; private String tableName; private String cql; - public TableDefinitionStringImpl( String tableName, String cql ) { + public TableDefinitionStringImpl( String keyspace, String tableName, String cql ) { + this.keyspace = keyspace; this.tableName = tableName; this.cql = cql; } @Override + public String getKeyspace() { + return keyspace; + } + + @Override public String getTableName() { return tableName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java index b2ab031..2b3e7fa 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.core.migration.schema; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java index d8f3d1f..be2866a 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java @@ -38,4 +38,8 @@ public interface MigrationManagerFig extends GuicyFig { @Default("replication_factor:1") String getStrategyOptions(); + @Key( "collections.keyspace.strategy.options" ) + @Default("replication_factor:1") + String getLocalStrategyOptions(); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java index f5ec8ac..9178bde 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java @@ -73,6 +73,8 @@ public class MigrationManagerImpl implements MigrationManager { dataStaxCluster.createApplicationKeyspace(); + dataStaxCluster.createApplicationLocalKeyspace(); + for ( Migration migration : migrations ) { final Collection<MultiTenantColumnFamilyDefinition> columnFamilies = migration.getColumnFamilies(); @@ -143,10 +145,10 @@ public class MigrationManagerImpl implements MigrationManager { private void createTable(TableDefinition tableDefinition ) throws Exception { KeyspaceMetadata keyspaceMetadata = dataStaxCluster.getClusterSession().getCluster().getMetadata() - .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())); + .getKeyspace(CQLUtils.quote( tableDefinition.getKeyspace() ) ); boolean exists = keyspaceMetadata != null - && keyspaceMetadata.getTable(tableDefinition.getTableName()) != null; + && keyspaceMetadata.getTable( tableDefinition.getTableName() ) != null; if( exists ){ return; @@ -156,10 +158,15 @@ public class MigrationManagerImpl implements MigrationManager { if (logger.isDebugEnabled()) { logger.debug(CQL); } - dataStaxCluster.getApplicationSession() - .execute(CQL); - logger.info("Created table: {}", tableDefinition.getTableName()); + if ( tableDefinition.getKeyspace().equals( cassandraFig.getApplicationKeyspace() )) { + dataStaxCluster.getApplicationSession().execute( CQL ); + } else { + dataStaxCluster.getApplicationLocalSession().execute( CQL ); + } + + logger.info("Created table: {} in keyspace {}", + tableDefinition.getTableName(), tableDefinition.getKeyspace()); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java index caa6294..c45fdd1 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java @@ -115,6 +115,17 @@ public class ColumnNameIteratorTest { public int[] getShardSettings() { return new int[]{20}; } + + @Override + public String getApplicationKeyspace() { + return cassandraFig.getApplicationKeyspace(); + } + + @Override + public String getApplicationLocalKeyspace() { + return cassandraFig.getApplicationLocalKeyspace(); + } + }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java index b31fa2f..5cdf0e1 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java @@ -118,6 +118,17 @@ public class MultiKeyColumnNameIteratorTest { public int[] getShardSettings() { return new int[]{20}; } + + @Override + public String getApplicationKeyspace() { + return cassandraFig.getApplicationKeyspace(); + } + + @Override + public String getApplicationLocalKeyspace() { + return cassandraFig.getApplicationLocalKeyspace(); + } + }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java index ea5359e..6331941 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java @@ -122,6 +122,16 @@ public class MultiRowColumnIteratorTest { public int[] getShardSettings() { return new int[]{20}; } + + @Override + public String getApplicationKeyspace() { + return cassandraFig.getApplicationKeyspace(); + } + + @Override + public String getApplicationLocalKeyspace() { + return cassandraFig.getApplicationLocalKeyspace(); + } }; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java index 37311ba..8531bfd 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java @@ -70,7 +70,7 @@ public class CQLUtilsTest { - TableDefinitionImpl table1 = new TableDefinitionImpl( + TableDefinitionImpl table1 = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), CQLUtils.quote("table1"), partitionKeys, columnKeys, @@ -122,7 +122,7 @@ public class CQLUtilsTest { - TableDefinitionImpl table1 = new TableDefinitionImpl( + TableDefinitionImpl table1 = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), CQLUtils.quote("table1"), partitionKeys, columnKeys, @@ -165,7 +165,7 @@ public class CQLUtilsTest { - TableDefinitionImpl table1 = new TableDefinitionImpl( + TableDefinitionImpl table1 = new TableDefinitionImpl( cassandraFig.getApplicationKeyspace(), CQLUtils.quote("table1"), partitionKeys, columnKeys, http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java index b5f98d9..ac9167b 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java @@ -33,7 +33,7 @@ public class TableDefinitionTest { public void testNullTableName(){ try{ - TableDefinitionImpl table1 = new TableDefinitionImpl(null, null, null, null, null, null); + TableDefinitionImpl table1 = new TableDefinitionImpl(null, null, null, null, null, null, null); } catch (NullPointerException npe){ assertEquals("Table name cannot be null", npe.getMessage()); } @@ -45,7 +45,7 @@ public class TableDefinitionTest { public void testNullPrimaryKeys(){ try{ - TableDefinitionImpl table1 = new TableDefinitionImpl("table1", null, null, null, null, null); + TableDefinitionImpl table1 = new TableDefinitionImpl(null, "table1", null, null, null, null, null); } catch (NullPointerException npe){ assertEquals("Primary Key(s) cannot be null", npe.getMessage()); } @@ -57,7 +57,7 @@ public class TableDefinitionTest { public void testNullColumns(){ try{ - TableDefinitionImpl table1 = new TableDefinitionImpl("table1", + TableDefinitionImpl table1 = new TableDefinitionImpl(null, "table1", new ArrayList<>(), null, null, null, null); } catch (NullPointerException npe){ assertEquals("Columns cannot be null", npe.getMessage()); @@ -70,7 +70,7 @@ public class TableDefinitionTest { public void testNullCacheOption(){ try{ - TableDefinitionImpl table1 = new TableDefinitionImpl("table1", + TableDefinitionImpl table1 = new TableDefinitionImpl(null, "table1", new ArrayList<>(), new ArrayList<>(), new HashMap<>(), null, null); } catch (NullPointerException npe){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java index 7e02a8e..4e5ac49 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MigrationManagerRule.java @@ -31,8 +31,6 @@ import com.google.inject.Inject; import com.google.inject.Singleton; -/** - */ @Singleton public class MigrationManagerRule extends ExternalResource { private static final Logger logger = LoggerFactory.getLogger( MigrationManagerRule.class ); @@ -47,11 +45,10 @@ public class MigrationManagerRule extends ExternalResource { this.migrationManager = migrationManager; try { - this.migrationManager.migrate(); - } - catch ( MigrationException e ) { - throw new RuntimeException(e); - } + this.migrationManager.migrate(); + } catch ( MigrationException e ) { + throw new RuntimeException(e); + } } @Inject http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java index 6e50c75..ab6087d 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java @@ -309,15 +309,21 @@ public class MapSerializationImpl implements MapSerialization { @Override public Collection<TableDefinition> getTables() { - final TableDefinition mapEntries = - new TableDefinitionImpl( MAP_ENTRIES_TABLE, MAP_ENTRIES_PARTITION_KEYS, MAP_ENTRIES_COLUMN_KEYS, - MAP_ENTRIES_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, MAP_ENTRIES_CLUSTERING_ORDER); - - final TableDefinition mapKeys = - new TableDefinitionImpl( MAP_KEYS_TABLE, MAP_KEYS_PARTITION_KEYS, MAP_KEYS_COLUMN_KEYS, - MAP_KEYS_COLUMNS, TableDefinitionImpl.CacheOption.KEYS, MAP_KEYS_CLUSTERING_ORDER); - - + final TableDefinition mapEntries = new TableDefinitionImpl( cassandraConfig.getApplicationKeyspace(), + MAP_ENTRIES_TABLE, + MAP_ENTRIES_PARTITION_KEYS, + MAP_ENTRIES_COLUMN_KEYS, + MAP_ENTRIES_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + MAP_ENTRIES_CLUSTERING_ORDER); + + final TableDefinition mapKeys = new TableDefinitionImpl( cassandraConfig.getApplicationKeyspace(), + MAP_KEYS_TABLE, + MAP_KEYS_PARTITION_KEYS, + MAP_KEYS_COLUMN_KEYS, + MAP_KEYS_COLUMNS, + TableDefinitionImpl.CacheOption.KEYS, + MAP_KEYS_CLUSTERING_ORDER); return Arrays.asList( mapEntries, mapKeys ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml index 488d9f2..153ed4b 100644 --- a/stack/corepersistence/queue/pom.xml +++ b/stack/corepersistence/queue/pom.xml @@ -66,6 +66,9 @@ <configuration> <forkCount>0</forkCount> <threadCount>0</threadCount> + <argLine> + -Xms2G -Xmx4G -Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties + </argLine> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 6f3df11..3b901b2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -38,7 +38,7 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_TIMEOUT_SECONDS = "queue.timeout.seconds"; - String QUEUE_REFRESH_MILLISECONDS = "queue.refresh.milliseconds"; + String QUEUE_REFRESH_MILLISECONDS = "queue.refresh.millis"; String QUEUE_INMEMORY_SIZE = "queue.inmemory.size"; http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java index 9f40b51..d611feb 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java @@ -25,5 +25,8 @@ import com.datastax.driver.core.Session; * Created by Dave Johnson (snoopd...@apache.org) on 9/9/16. */ public interface CassandraClient { - Session getSession(); + + Session getApplicationSession(); + + Session getQueueMessageSession(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java index ed665c2..d1ad442 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java @@ -31,19 +31,32 @@ import org.slf4j.LoggerFactory; public class CassandraClientImpl implements CassandraClient { private static final Logger logger = LoggerFactory.getLogger( CassandraClientImpl.class ); - private final Session session; + private final DataStaxCluster dataStaxCluster; + private Session applicationSession = null; + private Session queueMessageSession = null; + @Inject public CassandraClientImpl( DataStaxCluster dataStaxCluster) { - logger.info("Constructing Cassandra client"); + this.dataStaxCluster = dataStaxCluster; + } - this.session = dataStaxCluster.getApplicationSession(); + + @Override + public Session getApplicationSession() { + if ( applicationSession == null ) { + applicationSession = dataStaxCluster.getApplicationSession(); + } + return applicationSession; } @Override - public Session getSession() { - return session; + public Session getQueueMessageSession() { + if ( queueMessageSession == null ) { + queueMessageSession = dataStaxCluster.getApplicationLocalSession(); + } + return queueMessageSession; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java index 42557e6..6ec0774 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java @@ -143,7 +143,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> .limit(PAGE_SIZE); } - List<Row> rows = cassandraClient.getSession().execute(query).all(); + List<Row> rows = cassandraClient.getQueueMessageSession().execute(query).all(); if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java index d9dbab6..ddbd345 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java @@ -24,6 +24,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -46,6 +47,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; public final static String TABLE_AUDIT_LOG = "audit_log"; @@ -75,7 +77,8 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { @Inject - public AuditLogSerializationImpl( CassandraClient cassandraClient ) { + public AuditLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.cassandraClient = cassandraClient; } @@ -97,7 +100,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { .value(COLUMN_MESSAGE_ID, messageId ) .value(COLUMN_QUEUE_MESSAGE_ID, queueMessageId ) .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() ); - cassandraClient.getSession().execute(insert); + cassandraClient.getApplicationSession().execute(insert); } @@ -107,7 +110,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG) .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) ); - ResultSet rs = cassandraClient.getSession().execute( query ); + ResultSet rs = cassandraClient.getApplicationSession().execute( query ); final List<AuditLog> auditLogs = rs.all().stream().map( row -> new AuditLog( @@ -143,6 +146,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( new TableDefinitionStringImpl( TABLE_AUDIT_LOG, CQL ) ); + return Collections.singletonList( + new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 65ffc47..5206ec7 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -24,6 +24,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -50,6 +51,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; final static String TABLE_SHARD_COUNTERS = "counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -93,7 +95,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Inject - public MessageCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) { + public MessageCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); this.cassandraClient = cassandraClient; } @@ -166,7 +169,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) ); - cassandraClient.getSession().execute( update ); + cassandraClient.getQueueMessageSession().execute( update ); } @@ -177,7 +180,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) ) .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ); - ResultSet resultSet = cassandraClient.getSession().execute( query ); + ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query ); List<Row> all = resultSet.all(); if ( all.size() > 1 ) { @@ -198,7 +201,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( new TableDefinitionStringImpl( TABLE_SHARD_COUNTERS, CQL ) ); + return Collections.singletonList( + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index f55b936..99ff783 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -52,6 +53,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; private final ActorSystemFig actorSystemFig; private final ShardStrategy shardStrategy; @@ -107,11 +109,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Inject public QueueMessageSerializationImpl( + CassandraFig cassandraFig, ActorSystemFig actorSystemFig, ShardStrategy shardStrategy, ShardCounterSerialization shardCounterSerialization, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.actorSystemFig = actorSystemFig; this.shardStrategy = shardStrategy; this.shardCounterSerialization = shardCounterSerialization; @@ -149,7 +153,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .value( COLUMN_INFLIGHT_AT, inflightAt ) .value( COLUMN_QUEUED_AT, queuedAt); - cassandraClient.getSession().execute(insert); + cassandraClient.getQueueMessageSession().execute(insert); shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 ); @@ -191,7 +195,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .and(shardIdClause) .and(queueMessageIdClause); - Row row = cassandraClient.getSession().execute(select).one(); + Row row = cassandraClient.getQueueMessageSession().execute(select).one(); if (row == null) { return null; @@ -240,7 +244,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .and(shardIdClause) .and(queueMessageIdClause); - ResultSet resultSet = cassandraClient.getSession().execute( delete ); + ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( delete ); String s = "s"; } @@ -253,7 +257,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization Statement select = QueryBuilder.select().from( TABLE_MESSAGE_DATA).where(messageIdClause); - Row row = cassandraClient.getSession().execute(select).one(); + Row row = cassandraClient.getApplicationSession().execute(select).one(); if ( row == null ) { return null; } @@ -273,7 +277,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .value( COLUMN_MESSAGE_DATA, messageBody.getBlob()) .value( COLUMN_CONTENT_TYPE, messageBody.getContentType()); - cassandraClient.getSession().execute(insert); + cassandraClient.getApplicationSession().execute(insert); } @@ -285,7 +289,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA) .where(messageIdClause); - cassandraClient.getSession().execute(delete); + cassandraClient.getApplicationSession().execute(delete); } @@ -311,9 +315,15 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Override public Collection<TableDefinition> getTables() { return Lists.newArrayList( - new TableDefinitionStringImpl( TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ), - new TableDefinitionStringImpl( TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ), - new TableDefinitionStringImpl( TABLE_MESSAGE_DATA, MESSAGE_DATA ) + + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ), + + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ), + + new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), + TABLE_MESSAGE_DATA, MESSAGE_DATA ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java index 932097a..07a201c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -46,6 +47,7 @@ public class QueueSerializationImpl implements QueueSerialization { private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; public final static String COLUMN_QUEUE_NAME = "queue_name"; public final static String COLUMN_REGIONS = "regions"; @@ -72,7 +74,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Inject - public QueueSerializationImpl( CassandraClient cassandraClient ) { + public QueueSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.cassandraClient = cassandraClient; } @@ -90,7 +93,7 @@ public class QueueSerializationImpl implements QueueSerialization { .value(COLUMN_DEAD_LETTER_QUEUE, queue.getDeadLetterQueue()); - cassandraClient.getSession().execute(insert); + cassandraClient.getApplicationSession().execute(insert); } @@ -102,7 +105,7 @@ public class QueueSerializationImpl implements QueueSerialization { Statement query = QueryBuilder.select().all().from(TABLE_QUEUES) .where(queueNameClause); - Row row = cassandraClient.getSession().execute(query).one(); + Row row = cassandraClient.getApplicationSession().execute(query).one(); if(row == null){ return null; @@ -129,14 +132,14 @@ public class QueueSerializationImpl implements QueueSerialization { Statement delete = QueryBuilder.delete().from(TABLE_QUEUES) .where(queueNameClause); - cassandraClient.getSession().execute(delete); + cassandraClient.getApplicationSession().execute(delete); } @Override public List<String> getListOfQueues() { Statement select = QueryBuilder.select().all().from( TABLE_QUEUES ); - ResultSet rs = cassandraClient.getSession().execute( select ); + ResultSet rs = cassandraClient.getApplicationSession().execute( select ); return rs.all().stream() .map( row -> row.getString( COLUMN_QUEUE_NAME )) @@ -151,7 +154,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( new TableDefinitionStringImpl( "queues", CQL ) ); + return Collections.singletonList( + new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), "queues", CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java index 31e31ce..402d429 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java @@ -108,7 +108,7 @@ public class ShardIterator implements Iterator<Shard> { .and(shardIdClause) .limit(PAGE_SIZE); - List<Row> rows = cassandraClient.getSession().execute(query).all(); + List<Row> rows = cassandraClient.getQueueMessageSession().execute(query).all(); currentIterator = getIteratorFromRows(rows); http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index 9158412..f14d234 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -24,6 +24,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -50,6 +51,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; final static String TABLE_COUNTERS = "shard_counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -89,7 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Inject - public ShardCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) { + public ShardCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); this.cassandraClient = cassandraClient; } @@ -162,7 +165,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) ); - cassandraClient.getSession().execute( update ); + cassandraClient.getQueueMessageSession().execute( update ); } @@ -173,7 +176,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) ) .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ); - ResultSet resultSet = cassandraClient.getSession().execute( query ); + ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query ); List<Row> all = resultSet.all(); if ( all.size() > 1 ) { @@ -193,6 +196,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Override public Collection<TableDefinition> getTables() { - return Collections.singletonList( new TableDefinitionStringImpl( "shard_counters", CQL ) ); + return Collections.singletonList( + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), "shard_counters", CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/483ca0f5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java index 7b9fd8e..989622b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.collect.Lists; import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -45,6 +46,7 @@ public class ShardSerializationImpl implements ShardSerialization { private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class ); private final CassandraClient cassandraClient; + private final CassandraFig cassandraFig; public final static String COLUMN_QUEUE_NAME = "queue_name"; public final static String COLUMN_REGION = "region"; @@ -80,7 +82,8 @@ public class ShardSerializationImpl implements ShardSerialization { @Inject - public ShardSerializationImpl( CassandraClient cassandraClient ) { + public ShardSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { + this.cassandraFig = cassandraFig; this.cassandraClient = cassandraClient; } @@ -93,7 +96,7 @@ public class ShardSerializationImpl implements ShardSerialization { .value(COLUMN_ACTIVE, 1) .value(COLUMN_POINTER, shard.getPointer()); - cassandraClient.getSession().execute(insert); + cassandraClient.getQueueMessageSession().execute(insert); } @@ -112,7 +115,7 @@ public class ShardSerializationImpl implements ShardSerialization { .and(activeClause) .and(shardIdClause); - Row row = cassandraClient.getSession().execute(select).one(); + Row row = cassandraClient.getQueueMessageSession().execute(select).one(); if (row == null){ return null; @@ -145,7 +148,7 @@ public class ShardSerializationImpl implements ShardSerialization { .and(activeClause) .and(shardIdClause); - cassandraClient.getSession().execute(delete); + cassandraClient.getQueueMessageSession().execute(delete); } @@ -165,7 +168,7 @@ public class ShardSerializationImpl implements ShardSerialization { .and(activeClause) .and(shardIdClause); - cassandraClient.getSession().execute(update); + cassandraClient.getQueueMessageSession().execute(update); } @@ -192,8 +195,10 @@ public class ShardSerializationImpl implements ShardSerialization { @Override public Collection<TableDefinition> getTables() { return Lists.newArrayList( - new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ), - new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT ) + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ), + new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT ) ); }