Integrate datastax cluster into migration manager for creation of keyspaces using database/setup.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/01c4970a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/01c4970a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/01c4970a Branch: refs/heads/datastax-cass-driver Commit: 01c4970a80fa080f7deccf3f004112d9112f624d Parents: a631975 Author: Michael Russo <[email protected]> Authored: Tue Feb 9 10:18:27 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Tue Feb 9 10:18:27 2016 -0800 ---------------------------------------------------------------------- .../usergrid/corepersistence/CpSetup.java | 15 +-- .../persistence/core/datastax/CQLUtils.java | 49 ++++++++ .../core/datastax/DatastaxSessionProvider.java | 22 +++- .../core/datastax/impl/DatastaxClusterImpl.java | 68 +++-------- .../persistence/core/guice/CommonModule.java | 10 +- .../migration/schema/MigrationManagerImpl.java | 113 +++++++------------ 6 files changed, 142 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java index e97be3f..d2c38e4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java @@ -83,6 +83,9 @@ public class CpSetup implements Setup { //a no op, creating the injector creates the connections //init our index if required this.emf.initializeManagementIndex(); + + logger.info( "Initialize application keyspace" ); + migrate(); setupStaticKeyspace(); setupSystemKeyspace(); @@ -129,9 +132,7 @@ public class CpSetup implements Setup { @Override public void setupSystemKeyspace() throws Exception { - logger.info( "Initialize system keyspace" ); - - migrate(); + logger.info( "Initialize system tables" ); cass.createColumnFamily( getApplicationKeyspace(), createColumnFamilyDefinition( getApplicationKeyspace(), APPLICATIONS_CF, ComparatorType.BYTESTYPE ) ); @@ -145,7 +146,7 @@ public class CpSetup implements Setup { cass.createColumnFamily( getApplicationKeyspace(), createColumnFamilyDefinition( getApplicationKeyspace(), PRINCIPAL_TOKEN_CF, ComparatorType.UUIDTYPE ) ); - logger.info( "System keyspace initialized" ); + logger.info( "System tables initialized" ); } @@ -167,11 +168,9 @@ public class CpSetup implements Setup { @Override public void setupStaticKeyspace() throws Exception { - migrate(); - // Need this legacy stuff for queues - logger.info( "Creating static application keyspace {}", getApplicationKeyspace() ); + logger.info( "Initialize application tables" ); cass.createColumnFamily( getApplicationKeyspace(), createColumnFamilyDefinition( getApplicationKeyspace(), APPLICATIONS_CF, @@ -183,6 +182,8 @@ public class CpSetup implements Setup { cass.createColumnFamilies( getApplicationKeyspace(), getCfDefs( QueuesCF.class, getApplicationKeyspace() ) ); + logger.info( "Application tables initialized" ); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java new file mode 100644 index 0000000..b663934 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.core.datastax; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.Map; + +public class CQLUtils { + + public static String getFormattedReplication(String strategy, String strategyOptions) throws JsonProcessingException { + + Map<String, String> replicationSettings = new HashMap<>(); + replicationSettings.put("class", strategy); + String[] strategyOptionsSplit = strategyOptions.split(","); + for ( String option : strategyOptionsSplit){ + String[] splitOptions = option.split(":"); + replicationSettings.put(splitOptions[0], splitOptions[1]); + } + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(replicationSettings).replace("\"", "'"); + } + + + public static void createColumnFamily(){ + + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java index eeca763..1b39cb8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.usergrid.persistence.core.datastax; @@ -5,12 +23,12 @@ import com.datastax.driver.core.Session; import com.google.inject.Inject; import com.google.inject.Provider; -public class DatastaxSessionProvider implements Provider<Session> { +public class DataStaxSessionProvider implements Provider<Session> { private final DataStaxCluster dataStaxCluster; @Inject - public DatastaxSessionProvider( final DataStaxCluster dataStaxCluster){ + public DataStaxSessionProvider( final DataStaxCluster dataStaxCluster ){ this.dataStaxCluster = dataStaxCluster; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java index 0582c4e..ffe61e6 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java @@ -21,8 +21,6 @@ package org.apache.usergrid.persistence.core.datastax.impl; import com.datastax.driver.core.*; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.persistence.core.astyanax.CassandraFig; @@ -30,15 +28,11 @@ import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @Singleton -public class DatastaxClusterImpl implements DataStaxCluster { +public class DataStaxClusterImpl implements DataStaxCluster { - private static final Logger logger = LoggerFactory.getLogger( DatastaxClusterImpl.class ); + private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class ); private final CassandraFig cassandraFig; @@ -47,7 +41,7 @@ public class DatastaxClusterImpl implements DataStaxCluster { private Session clusterSession; @Inject - public DatastaxClusterImpl( final CassandraFig cassandraFig ) throws Exception { + public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception { this.cassandraFig = cassandraFig; ConsistencyLevel defaultConsistencyLevel; @@ -99,15 +93,12 @@ public class DatastaxClusterImpl implements DataStaxCluster { } this.cluster = datastaxCluster.build(); - this.clusterSession = cluster.connect(); - logger.info("Initialized datastax client cluster. Hosts={}, Idle Timeout={}s, Request Timeout={}s", + logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s, Request Timeout={}s", cluster.getMetadata().getAllHosts().toString(), cluster.getConfiguration().getPoolingOptions().getIdleTimeoutSeconds(), cluster.getConfiguration().getPoolingOptions().getPoolTimeoutMillis() / 1000); - logger.info("Creating keyspaces if they do not already exist."); - createKeyspaces(); - this.applicationSession = cluster.connect( "\""+cassandraFig.getApplicationKeyspace()+"\"" ); + @@ -115,55 +106,26 @@ public class DatastaxClusterImpl implements DataStaxCluster { } public Cluster getCluster(){ + return cluster; } public Session getClusterSession(){ - return clusterSession; - } - - public Session getApplicationSession(){ - return applicationSession; - } - - private void createKeyspaces() throws Exception{ - - final String createApplicationKeyspace = String.format( - "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s", - cassandraFig.getApplicationKeyspace(), - getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() ) - - ); - - final String createLocksKeyspace = String.format( - "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s", - cassandraFig.getLocksKeyspace(), - getFormattedReplication( - cassandraFig.getLocksKeyspaceStrategy(), - cassandraFig.getLocksKeyspaceReplication() - ) - ); - - logger.info("Creating application keyspace with the following CQL: {}", createApplicationKeyspace); - clusterSession.execute(createApplicationKeyspace); - - logger.info("Creating locks keyspace with the following CQL: {}", createLocksKeyspace); - clusterSession.execute(createLocksKeyspace); + if ( clusterSession == null || clusterSession.isClosed() ){ + clusterSession = cluster.connect(); + } + return clusterSession; } - private String getFormattedReplication(String strategy, String strategyOptions) throws JsonProcessingException{ + public Session getApplicationSession(){ - Map<String, String> replicationSettings = new HashMap<>(); - replicationSettings.put("class", strategy); - String[] strategyOptionsSplit = strategyOptions.split(","); - for ( String option : strategyOptionsSplit){ - String[] splitOptions = option.split(":"); - replicationSettings.put(splitOptions[0], splitOptions[1]); + if ( applicationSession == null || applicationSession.isClosed() ){ + applicationSession = cluster.connect( "\""+cassandraFig.getApplicationKeyspace()+"\"" ); } - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(replicationSettings).replace("\"", "'"); + return applicationSession; } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java index a06a6e7..460efa5 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java @@ -19,9 +19,12 @@ package org.apache.usergrid.persistence.core.guice; +import com.datastax.driver.core.Session; import com.netflix.astyanax.Keyspace; import org.apache.usergrid.persistence.core.astyanax.*; -import org.apache.usergrid.persistence.core.datastax.impl.DatastaxClusterImpl; +import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; +import org.apache.usergrid.persistence.core.datastax.DataStaxSessionProvider; +import org.apache.usergrid.persistence.core.datastax.impl.DataStaxClusterImpl; import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.core.consistency.TimeService; @@ -63,7 +66,10 @@ public class CommonModule extends AbstractModule { bind(CassandraCluster.class).to(CassandraClusterImpl.class).asEagerSingleton(); // bind our Datastax cluster - bind(DatastaxClusterImpl.class).asEagerSingleton(); + bind(DataStaxCluster.class).to(DataStaxClusterImpl.class).asEagerSingleton(); + + // bind our Session to the DataStaxSessionProvider + bind(Session.class).toProvider(DataStaxSessionProvider.class).asEagerSingleton(); // bind our keyspace to the AstyanaxKeyspaceProvider bind(Keyspace.class).toProvider(AstyanaxKeyspaceProvider.class).asEagerSingleton(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java index db694fe..e7e0cb5 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java @@ -20,22 +20,21 @@ package org.apache.usergrid.persistence.core.migration.schema; import java.util.Collection; -import java.util.List; -import java.util.Map; import java.util.Set; +import com.datastax.driver.core.Session; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; +import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils; -import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; import com.netflix.astyanax.ddl.ColumnFamilyDefinition; import com.netflix.astyanax.ddl.KeyspaceDefinition; @@ -51,18 +50,20 @@ public class MigrationManagerImpl implements MigrationManager { private static final Logger logger = LoggerFactory.getLogger( MigrationManagerImpl.class ); + private final CassandraFig cassandraFig; private final Set<Migration> migrations; private final Keyspace keyspace; - - private final MigrationManagerFig fig; + private final DataStaxCluster dataStaxCluster; @Inject - public MigrationManagerImpl( final Keyspace keyspace, final Set<Migration> migrations, - MigrationManagerFig fig ) { + public MigrationManagerImpl( final CassandraFig cassandraFig, final Keyspace keyspace, + final Set<Migration> migrations, final DataStaxCluster dataStaxCluster) { + + this.cassandraFig = cassandraFig; this.keyspace = keyspace; this.migrations = migrations; - this.fig = fig; + this.dataStaxCluster = dataStaxCluster; } @@ -72,7 +73,7 @@ public class MigrationManagerImpl implements MigrationManager { try { - testAndCreateKeyspace(); + createOrUpdateKeyspace(); for ( Migration migration : migrations ) { @@ -116,86 +117,54 @@ public class MigrationManagerImpl implements MigrationManager { logger.info( "Created column family {}", columnFamily.getColumnFamily().getName() ); - waitForMigration(); + waitForSchemaAgreement(); } /** - * Check if they keyspace exists. If it doesn't create it + * Execute CQL to create the keyspace if it does not already exists. Always update the keyspace with the + * configured strategy options to allow for real time replication updates. + * + * @throws Exception */ - private void testAndCreateKeyspace() throws ConnectionException { - - - KeyspaceDefinition keyspaceDefinition = null; - - try { - keyspaceDefinition = keyspace.describeKeyspace(); - - }catch( NotFoundException nfe){ - //if we execute this immediately after a drop keyspace in 1.2.x, Cassandra is returning the NFE instead of a BadRequestException - //swallow and log, then continue to create the keyspaces. - logger.info( "Received a NotFoundException when attempting to describe keyspace. It does not exist" ); - } - catch(Exception e){ - AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra", e); - } - - - if ( keyspaceDefinition != null ) { - return; - } - + private void createOrUpdateKeyspace() throws Exception { - ImmutableMap.Builder<String, Object> strategyOptions = getKeySpaceProps(); + Session clusterSession = dataStaxCluster.getClusterSession(); + final String createApplicationKeyspace = String.format( + "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s", + cassandraFig.getApplicationKeyspace(), + CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() ) - ImmutableMap<String, Object> options = - ImmutableMap.<String, Object>builder().put( "strategy_class", fig.getStrategyClass() ) - .put( "strategy_options", strategyOptions.build() ).build(); + ); + final String updateApplicationKeyspace = String.format( + "ALTER KEYSPACE \"%s\" WITH replication = %s", + cassandraFig.getApplicationKeyspace(), + CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() ) + ); - keyspace.createKeyspace( options ); + logger.info("Creating application keyspace with the following CQL: {}", createApplicationKeyspace); + clusterSession.execute(createApplicationKeyspace); + logger.info("Updating application keyspace with the following CQL: {}", updateApplicationKeyspace); + clusterSession.execute(updateApplicationKeyspace); - strategyOptions.toString(); + // this session pool is only used when running database setup so close it when finished to clear resources + clusterSession.close(); - logger.info( "Created keyspace {} with options {}", keyspace.getKeyspaceName(), options.toString() ); - - waitForMigration(); + waitForSchemaAgreement(); } /** - * Get keyspace properties + * Wait until all Cassandra nodes agree on the schema. Sleeps 100ms between checks. + * */ - private ImmutableMap.Builder<String, Object> getKeySpaceProps() { - ImmutableMap.Builder<String, Object> keyspaceProps = ImmutableMap.<String, Object>builder(); - - String optionString = fig.getStrategyOptions(); - - if(optionString == null){ - return keyspaceProps; - } - - - - for ( String key : optionString.split( "," ) ) { - - final String[] options = key.split( ":" ); - - keyspaceProps.put( options[0], options[1] ); - } - - return keyspaceProps; - } - - - private void waitForMigration() throws ConnectionException { + private void waitForSchemaAgreement() { while ( true ) { - final Map<String, List<String>> versions = keyspace.describeSchemaVersions(); - - if ( versions != null && versions.size() == 1 ) { + if( dataStaxCluster.getCluster().getMetadata().checkSchemaAgreement() ){ return; } @@ -208,4 +177,6 @@ public class MigrationManagerImpl implements MigrationManager { } } } + + }
