Ensure local keyspace has unique name per datacenter / region.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6c204b9f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6c204b9f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6c204b9f Branch: refs/heads/usergrid-1318-queue Commit: 6c204b9f046fa9ea234d7e608b940ead60fb6720 Parents: 434e53e Author: Dave Johnson <[email protected]> Authored: Tue Sep 20 13:27:17 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Sep 20 13:27:17 2016 -0400 ---------------------------------------------------------------------- .../persistence/core/CassandraConfigImpl.java | 7 +++++-- .../usergrid/persistence/core/CassandraFig.java | 1 + .../auditlog/impl/AuditLogSerializationImpl.java | 10 +++++----- .../impl/MessageCounterSerializationImpl.java | 10 +++++----- .../impl/QueueMessageSerializationImpl.java | 14 +++++++------- .../queues/impl/QueueSerializationImpl.java | 10 +++++----- .../impl/ShardCounterSerializationImpl.java | 10 +++++----- .../sharding/impl/ShardSerializationImpl.java | 12 ++++++------ .../impl/TransferLogSerializationImpl.java | 10 +++++----- .../persistence/queue/impl/QakkaQueueManager.java | 6 ------ .../queue/impl/SNSQueueManagerImpl.java | 10 +++++----- ...ultiShardDatabaseQueueMessageIteratorTest.java | 6 +++--- .../serialization/sharding/ShardIteratorTest.java | 14 +++++++------- .../sharding/ShardSerializationTest.java | 18 +++++++++--------- 14 files changed, 68 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java index 729f5b2..77f7228 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java @@ -64,13 +64,16 @@ public class CassandraConfigImpl implements CassandraConfig { this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl()); - this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadClConsistent()); + this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( + cassandraFig.getReadClConsistent()); this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() ); this.applicationKeyspace = cassandraFig.getApplicationKeyspace(); - this.applicationLocalKeyspace = cassandraFig.getApplicationLocalKeyspace(); + this.applicationLocalKeyspace = + cassandraFig.getApplicationLocalKeyspace() + "_" + + cassandraFig.getLocalDataCenter().replace("-", "_"); //add the listeners to update the values cassandraFig.addPropertyChangeListener( new PropertyChangeListener() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java index bc8d087..1faf1e7 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java @@ -91,6 +91,7 @@ public interface CassandraFig extends GuicyFig { @Default( "Usergrid_Applications" ) String getApplicationKeyspace(); + /** Prefix for local keyspace name. Name will be this prefix plus "_" plus local data center name. */ @Key( "cassandra.keyspace.application.local" ) @Default( "Usergrid_Applications_Local" ) String getApplicationLocalKeyspace(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java index ddbd345..93dfe4b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java @@ -24,7 +24,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -47,7 +47,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; public final static String TABLE_AUDIT_LOG = "audit_log"; @@ -77,8 +77,8 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { @Inject - public AuditLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public AuditLogSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.cassandraClient = cassandraClient; } @@ -147,6 +147,6 @@ public class AuditLogSerializationImpl implements AuditLogSerialization { @Override public Collection<TableDefinition> getTables() { return Collections.singletonList( - new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) ); + new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 5206ec7..f198d05 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -51,7 +51,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; final static String TABLE_SHARD_COUNTERS = "counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -95,8 +95,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Inject - public MessageCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); this.cassandraClient = cassandraClient; } @@ -202,7 +202,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio @Override public Collection<TableDefinition> getTables() { return Collections.singletonList( - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) ); + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index 99ff783..d868021 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -28,7 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -53,7 +53,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; private final ActorSystemFig actorSystemFig; private final ShardStrategy shardStrategy; @@ -109,13 +109,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Inject public QueueMessageSerializationImpl( - CassandraFig cassandraFig, + CassandraConfig cassandraConfig, ActorSystemFig actorSystemFig, ShardStrategy shardStrategy, ShardCounterSerialization shardCounterSerialization, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + this.cassandraConfig = cassandraConfig; this.actorSystemFig = actorSystemFig; this.shardStrategy = shardStrategy; this.shardCounterSerialization = shardCounterSerialization; @@ -316,13 +316,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization public Collection<TableDefinition> getTables() { return Lists.newArrayList( - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ), - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ), - new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), + new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_MESSAGE_DATA, MESSAGE_DATA ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java index 07a201c..17a48c6 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java @@ -26,7 +26,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -47,7 +47,7 @@ public class QueueSerializationImpl implements QueueSerialization { private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; public final static String COLUMN_QUEUE_NAME = "queue_name"; public final static String COLUMN_REGIONS = "regions"; @@ -74,8 +74,8 @@ public class QueueSerializationImpl implements QueueSerialization { @Inject - public QueueSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public QueueSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.cassandraClient = cassandraClient; } @@ -155,7 +155,7 @@ public class QueueSerializationImpl implements QueueSerialization { @Override public Collection<TableDefinition> getTables() { return Collections.singletonList( - new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), "queues", CQL ) ); + new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), "queues", CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index f14d234..bcfb74d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -51,7 +51,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; final static String TABLE_COUNTERS = "shard_counters"; final static String COLUMN_QUEUE_NAME = "queue_name"; @@ -91,8 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Inject - public ShardCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); this.cassandraClient = cassandraClient; } @@ -197,6 +197,6 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Override public Collection<TableDefinition> getTables() { return Collections.singletonList( - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), "shard_counters", CQL ) ); + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), "shard_counters", CQL ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java index 989622b..cc5caab 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java @@ -26,7 +26,7 @@ import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.collect.Lists; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -46,7 +46,7 @@ public class ShardSerializationImpl implements ShardSerialization { private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; public final static String COLUMN_QUEUE_NAME = "queue_name"; public final static String COLUMN_REGION = "region"; @@ -82,8 +82,8 @@ public class ShardSerializationImpl implements ShardSerialization { @Inject - public ShardSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public ShardSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.cassandraClient = cassandraClient; } @@ -195,9 +195,9 @@ public class ShardSerializationImpl implements ShardSerialization { @Override public Collection<TableDefinition> getTables() { return Lists.newArrayList( - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ), - new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), + new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java index 9ebb841..51a168e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java @@ -25,7 +25,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; @@ -45,7 +45,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class ); private final CassandraClient cassandraClient; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; public final static String TABLE_TRANSFER_LOG = "transfer_log"; @@ -67,8 +67,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { @Inject - public TransferLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) { - this.cassandraFig = cassandraFig; + public TransferLogSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) { + this.cassandraConfig = cassandraConfig; this.cassandraClient = cassandraClient; } @@ -164,7 +164,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization { @Override public Collection<TableDefinition> getTables() { return Collections.singletonList( - new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) ); + new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index 832cecd..0eb609d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -42,27 +42,21 @@ public class QakkaQueueManager implements LegacyQueueManager { private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class ); private final LegacyQueueScope scope; - private final LegacyQueueFig fig; private final QueueManager queueManager; private final QueueMessageManager queueMessageManager; - private final QakkaFig qakkaFig; private final Regions regions; @Inject public QakkaQueueManager( @Assisted LegacyQueueScope scope, - LegacyQueueFig fig, QueueManager queueManager, QueueMessageManager queueMessageManager, - QakkaFig qakkaFig, Regions regions ) { this.scope = scope; - this.fig = fig; this.queueManager = queueManager; - this.qakkaFig = qakkaFig; this.queueMessageManager = queueMessageManager; this.regions = regions; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 6d4e0c4..637f157 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -32,7 +32,7 @@ import com.amazonaws.ClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; import org.apache.usergrid.persistence.queue.LegacyQueue; @@ -87,7 +87,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { private final LegacyQueueScope scope; private final LegacyQueueFig fig; private final ClusterFig clusterFig; - private final CassandraFig cassandraFig; + private final CassandraConfig cassandraConfig; private final ClientConfiguration clientConfiguration; private final AmazonSQSClient sqs; private final AmazonSNSClient sns; @@ -154,11 +154,11 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { @Inject public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig, - CassandraFig cassandraFig, LegacyQueueFig queueFig ) { + CassandraConfig cassandraConfig, LegacyQueueFig queueFig ) { this.scope = scope; this.fig = fig; this.clusterFig = clusterFig; - this.cassandraFig = cassandraFig; + this.cassandraConfig = cassandraConfig; // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks @@ -382,7 +382,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager { private String getName() { String name = - clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation(); name = name.toLowerCase(); //user lower case values Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java index 5fa3434..053fdd1 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java @@ -20,7 +20,7 @@ package org.apache.usergrid.persistence.qakka.serialization; import org.apache.commons.lang.RandomStringUtils; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.core.CassandraClient; @@ -51,8 +51,8 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest { public void testIterator() throws InterruptedException { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); QueueMessageSerialization queueMessageSerialization = getInjector().getInstance( QueueMessageSerialization.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java index 0d593aa..0c305fa 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java @@ -20,7 +20,7 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding; import org.apache.commons.lang.RandomStringUtils; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; @@ -47,8 +47,8 @@ public class ShardIteratorTest extends AbstractTest { public void getActiveShards(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 ); @@ -80,8 +80,8 @@ public class ShardIteratorTest extends AbstractTest { public void seekActiveShards(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 ); @@ -114,8 +114,8 @@ public class ShardIteratorTest extends AbstractTest { public void shardIteratorOrdering() throws Exception { CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); int numShards = 10; String region = "default"; http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java index e67db28..debfdd3 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java @@ -19,7 +19,7 @@ package org.apache.usergrid.persistence.qakka.serialization.sharding; -import org.apache.usergrid.persistence.core.CassandraFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; import org.apache.usergrid.persistence.qakka.AbstractTest; import org.apache.usergrid.persistence.qakka.core.CassandraClient; @@ -47,8 +47,8 @@ public class ShardSerializationTest extends AbstractTest { public void writeNewShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); shardSerialization.createShard(shard1); @@ -58,8 +58,8 @@ public class ShardSerializationTest extends AbstractTest { public void deleteShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); @@ -75,8 +75,8 @@ public class ShardSerializationTest extends AbstractTest { public void loadNullShard(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null); @@ -90,8 +90,8 @@ public class ShardSerializationTest extends AbstractTest { public void updatePointer(){ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class ); - CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class ); - ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient ); + CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class ); + ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient ); Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null); shardSerialization.createShard(shard1);
