Make schema creation more robust so parallel test execution doesn't overload the datastax driver's control connection.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/320cf00d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/320cf00d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/320cf00d Branch: refs/heads/asf-site Commit: 320cf00d0acd51ed9efc1ecd3a99dd9e5b2b1ae4 Parents: fe197af Author: Michael Russo <[email protected]> Authored: Thu Aug 18 22:20:39 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Thu Aug 18 22:20:39 2016 -0700 ---------------------------------------------------------------------- .../collection/guice/TestCollectionModule.java | 22 +-------------- .../core/datastax/DataStaxCluster.java | 2 +- .../core/datastax/impl/DataStaxClusterImpl.java | 29 ++++++++++---------- .../migration/schema/MigrationManagerImpl.java | 25 ++++++++++++----- 4 files changed, 34 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/320cf00d/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java index dd618a9..5bef2e0 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java @@ -20,8 +20,6 @@ package org.apache.usergrid.persistence.collection.guice; -import com.google.inject.Guice; -import com.google.inject.Injector; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.guice.CommonModule; import org.apache.usergrid.persistence.core.guice.TestModule; @@ -30,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.TestMigrationDataProv import com.google.inject.TypeLiteral; -import java.util.HashMap; -import java.util.Map; public class TestCollectionModule extends TestModule { @@ -43,29 +39,13 @@ public class TestCollectionModule extends TestModule { install( new CollectionModule() { @Override public void configureMigrationProvider() { - //configure our migration data provider + //configure our migration data provider TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>(); bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).toInstance( migrationDataProvider ); } } ); - /** - * Test modules - */ -// install(new MaxMigrationModule()); - - } - - - private static Map<String, Injector> injectorsByName = new HashMap<>(); - - public static Injector getInjector( String name ) { - Injector i = injectorsByName.get( name ); - if ( i == null ) { - i = Guice.createInjector( new TestCollectionModule() ); - } - return i; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/320cf00d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java index 768a7a2..ea76f92 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java @@ -30,7 +30,7 @@ public interface DataStaxCluster { Session getApplicationSession(); - void createOrUpdateKeyspace() throws Exception; + void createApplicationKeyspace() throws Exception; void waitForSchemaAgreement(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/320cf00d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java index ceb5f33..3146b65 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java @@ -47,7 +47,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { this.cluster = buildCluster(); // always initialize the keyspaces - this.createOrUpdateKeyspace(); + this.createApplicationKeyspace(); logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s, Pool Timeout={}s", cluster.getMetadata().getAllHosts().toString(), @@ -95,27 +95,26 @@ public class DataStaxClusterImpl implements DataStaxCluster { * @throws Exception */ @Override - public void createOrUpdateKeyspace() throws Exception { + public void createApplicationKeyspace() throws Exception { - clusterSession = getClusterSession(); + boolean exists = getClusterSession().getCluster().getMetadata() + .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())) != null; + + if(exists){ + return; + } final String createApplicationKeyspace = String.format( "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s", CQLUtils.quote(cassandraFig.getApplicationKeyspace()), - CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() ) - - ); + CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()) - final String updateApplicationKeyspace = String.format( - "ALTER KEYSPACE %s WITH replication = %s", - CQLUtils.quote(cassandraFig.getApplicationKeyspace()), - CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() ) ); - clusterSession.execute(createApplicationKeyspace); - clusterSession.execute(updateApplicationKeyspace); + getClusterSession().execute(createApplicationKeyspace); + waitForSchemaAgreement(); - logger.info("Created/Updated keyspace: {}", cassandraFig.getApplicationKeyspace()); + logger.info("Created keyspace: {}", cassandraFig.getApplicationKeyspace()); } @@ -126,8 +125,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { public void waitForSchemaAgreement() { while ( true ) { - - if( getCluster().getMetadata().checkSchemaAgreement() ){ + if( getClusterSession().getCluster().getMetadata().checkSchemaAgreement() ){ return; } @@ -177,6 +175,7 @@ public class DataStaxClusterImpl implements DataStaxCluster { Cluster.Builder datastaxCluster = Cluster.builder() .withClusterName(cassandraFig.getClusterName()) .addContactPoints(cassandraFig.getHosts().split(",")) + .withMaxSchemaAgreementWaitSeconds(30) .withCompression(ProtocolOptions.Compression.LZ4) .withLoadBalancingPolicy(loadBalancingPolicy) .withPoolingOptions(poolingOptions) http://git-wip-us.apache.org/repos/asf/usergrid/blob/320cf00d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java index e95c2ae..d6f66c5 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.core.migration.schema; import java.util.Collection; import java.util.Set; +import com.datastax.driver.core.KeyspaceMetadata; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; @@ -72,7 +73,7 @@ public class MigrationManagerImpl implements MigrationManager { try { - dataStaxCluster.createOrUpdateKeyspace(); + dataStaxCluster.createApplicationKeyspace(); for ( Migration migration : migrations ) { @@ -93,6 +94,9 @@ public class MigrationManagerImpl implements MigrationManager { for (MultiTenantColumnFamilyDefinition cf : columnFamilies) { testAndCreateColumnFamilyDef(cf); } + // creation of tables happens with the datastax driver and it auto checks schema on schema queries + // the CF def creation uses Asytanax, so manually check the schema agreement + dataStaxCluster.waitForSchemaAgreement(); } @@ -107,8 +111,6 @@ public class MigrationManagerImpl implements MigrationManager { } - dataStaxCluster.waitForSchemaAgreement(); - } catch ( Throwable t ) { logger.error( "Unable to perform migration", t ); @@ -139,12 +141,21 @@ public class MigrationManagerImpl implements MigrationManager { private void createTable(TableDefinition tableDefinition ) throws Exception { - String CQL = CQLUtils.getTableCQL( cassandraFig, tableDefinition, CQLUtils.ACTION.CREATE ); - if (logger.isDebugEnabled()){ - logger.debug( CQL ); + KeyspaceMetadata keyspaceMetadata = dataStaxCluster.getClusterSession().getCluster().getMetadata() + .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())); + + boolean exists = keyspaceMetadata != null && keyspaceMetadata.getTable(tableDefinition.getTableName()) != null; + + if( exists ){ + return; + } + + String CQL = CQLUtils.getTableCQL(cassandraFig, tableDefinition, CQLUtils.ACTION.CREATE); + if (logger.isDebugEnabled()) { + logger.debug(CQL); } dataStaxCluster.getApplicationSession() - .execute( CQL ); + .execute(CQL); logger.info("Created table: {}", tableDefinition.getTableName());
