Updated Branches: refs/heads/trunk 7ddb5c7a4 -> 71f5d91ab
remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4487 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/71f5d91a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/71f5d91a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/71f5d91a Branch: refs/heads/trunk Commit: 71f5d91ab7825196990a2744cf3e40e654917d33 Parents: 7ddb5c7 Author: Pavel Yaskevich <xe...@apache.org> Authored: Wed Aug 15 14:00:28 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Fri Aug 17 01:54:13 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + interface/cassandra.thrift | 7 ++- src/java/org/apache/cassandra/cli/CliClient.java | 49 +-------------- src/java/org/apache/cassandra/cli/CliMain.java | 2 +- .../org/apache/cassandra/cql/QueryProcessor.java | 50 +-------------- .../org/apache/cassandra/cql3/CQLStatement.java | 5 +- .../org/apache/cassandra/cql3/QueryProcessor.java | 10 +-- .../cql3/statements/CreateKeyspaceStatement.java | 3 +- .../cql3/statements/DropKeyspaceStatement.java | 3 +- .../cql3/statements/SchemaAlteringStatement.java | 46 +------------ .../apache/cassandra/thrift/CassandraServer.java | 16 ----- .../cassandra/transport/messages/ErrorMessage.java | 4 - .../cassandra/transport/messages/QueryMessage.java | 4 +- 13 files changed, 24 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 75de54e..39c92b1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -37,6 +37,7 @@ * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038) * (cql3) Add support for 2ndary indexes (CASSANDRA-3680) * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477) + * remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487) 1.1.4 http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 5e933d7..1f735e6 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -158,7 +158,12 @@ exception AuthorizationException { 1: required string why } -/** schemas are not in agreement across all nodes */ +/** + * NOTE: This up outdated exception left for backward compatibility reasons, + * no actual schema agreement validation is done starting from Cassandra 1.2 + * + * schemas are not in agreement across all nodes + */ exception SchemaDisagreementException { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java index f2f492a..176f70a 100644 --- a/src/java/org/apache/cassandra/cli/CliClient.java +++ b/src/java/org/apache/cassandra/cli/CliClient.java @@ -198,7 +198,7 @@ public class CliClient } // Execute a CLI Statement - public void executeCLIStatement(String statement) throws CharacterCodingException, TException, TimedOutException, NotFoundException, NoSuchFieldException, InvalidRequestException, UnavailableException, InstantiationException, IllegalAccessException, ClassNotFoundException, SchemaDisagreementException + public void executeCLIStatement(String statement) throws CharacterCodingException, TException, TimedOutException, NotFoundException, NoSuchFieldException, InvalidRequestException, UnavailableException, InstantiationException, IllegalAccessException, ClassNotFoundException { Tree tree = CliCompiler.compileQuery(statement); try @@ -1006,7 +1006,6 @@ public class CliClient { String mySchemaVersion = thriftClient.system_add_keyspace(updateKsDefAttributes(statement, ksDef)); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName)); } @@ -1037,7 +1036,6 @@ public class CliClient { String mySchemaVersion = thriftClient.system_add_column_family(updateCfDefAttributes(statement, cfDef)); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace)); } catch (InvalidRequestException e) @@ -1068,7 +1066,6 @@ public class CliClient String mySchemaVersion = thriftClient.system_update_keyspace(updatedKsDef); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); keyspacesMap.remove(keyspaceName); getKSMetaData(keySpace); } @@ -1103,7 +1100,6 @@ public class CliClient String mySchemaVersion = thriftClient.system_update_column_family(updateCfDefAttributes(statement, cfDef)); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace)); } catch (InvalidRequestException e) @@ -1293,7 +1289,6 @@ public class CliClient String keyspaceName = CliCompiler.getKeySpace(statement, thriftClient.describe_keyspaces()); String version = thriftClient.system_drop_keyspace(keyspaceName); sessionState.out.println(version); - validateSchemaIsSettled(version); if (keyspaceName.equals(keySpace)) //we just deleted the keyspace we were authenticated too keySpace = null; @@ -1316,7 +1311,6 @@ public class CliClient String cfName = CliCompiler.getColumnFamily(statement, keyspacesMap.get(keySpace).cf_defs); String mySchemaVersion = thriftClient.system_drop_column_family(cfName); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); } private void executeList(Tree statement) @@ -1466,7 +1460,6 @@ public class CliClient String mySchemaVersion = thriftClient.system_update_column_family(cfDef); sessionState.out.println(mySchemaVersion); - validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace)); } @@ -2898,46 +2891,6 @@ public class CliClient } } - /** validates schema is propagated to all nodes */ - private void validateSchemaIsSettled(String currentVersionId) - { - sessionState.out.println("Waiting for schema agreement..."); - Map<String, List<String>> versions = null; - - long limit = System.currentTimeMillis() + sessionState.schema_mwt; - boolean inAgreement = false; - outer: - while (limit - System.currentTimeMillis() >= 0 && !inAgreement) - { - try - { - versions = thriftClient.describe_schema_versions(); // getting schema version for nodes of the ring - } - catch (Exception e) - { - sessionState.err.println((e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage()); - continue; - } - - for (String version : versions.keySet()) - { - if (!version.equals(currentVersionId) && !version.equals(StorageProxy.UNREACHABLE)) - continue outer; - } - inAgreement = true; - } - - if (versions.containsKey(StorageProxy.UNREACHABLE)) - sessionState.err.printf("Warning: unreachable nodes %s", Joiner.on(", ").join(versions.get(StorageProxy.UNREACHABLE))); - if (!inAgreement) - { - sessionState.err.printf("The schema has not settled in %d seconds; further migrations are ill-advised until it does.%nVersions are %s%n", - sessionState.schema_mwt / 1000, FBUtilities.toString(versions)); - System.exit(-1); - } - sessionState.out.println("... schemas agree across the cluster"); - } - private static class CfDefNamesComparator implements Comparator<CfDef> { public int compare(CfDef a, CfDef b) http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cli/CliMain.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliMain.java b/src/java/org/apache/cassandra/cli/CliMain.java index ff73aea..ac7a3c3 100644 --- a/src/java/org/apache/cassandra/cli/CliMain.java +++ b/src/java/org/apache/cassandra/cli/CliMain.java @@ -206,7 +206,7 @@ public class CliMain completer.setCandidateStrings(strs); } - public static void processStatement(String query) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, SchemaDisagreementException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + public static void processStatement(String query) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException { cliClient.executeCLIStatement(query); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java index b23cf44..5cd2d1d 100644 --- a/src/java/org/apache/cassandra/cql/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java @@ -62,8 +62,6 @@ public class QueryProcessor private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class); - private static final long timeLimitForSchemaAgreement = 10 * 1000; - public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME); private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<ByteBuffer> variables) @@ -399,13 +397,6 @@ public class QueryProcessor throw new InvalidRequestException("range finish must come after start in traversal order"); } - // Copypasta from CassandraServer (where it is private). - private static void validateSchemaAgreement() throws SchemaDisagreementException - { - if (describeSchemaVersions().size() > 1) - throw new SchemaDisagreementException(); - } - private static Map<String, List<String>> describeSchemaVersions() { // unreachable hosts don't count towards disagreement @@ -414,7 +405,7 @@ public class QueryProcessor } public static CqlResult processStatement(CQLStatement statement,ClientState clientState, List<ByteBuffer> variables ) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { String keyspace = null; @@ -663,7 +654,6 @@ public class QueryProcessor create.validate(); ThriftValidation.validateKeyspaceNotSystem(create.getName()); clientState.hasKeyspaceSchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { @@ -672,7 +662,6 @@ public class QueryProcessor create.getStrategyOptions()); ThriftValidation.validateKeyspaceNotYetExisting(ksm.name); MigrationManager.announceNewKeyspace(ksm); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -687,12 +676,10 @@ public class QueryProcessor case CREATE_COLUMNFAMILY: CreateColumnFamilyStatement createCf = (CreateColumnFamilyStatement)statement.statement; clientState.hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { MigrationManager.announceNewColumnFamily(createCf.getCFMetaData(keyspace, variables)); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -707,7 +694,6 @@ public class QueryProcessor case CREATE_INDEX: CreateIndexStatement createIdx = (CreateIndexStatement)statement.statement; clientState.hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); CFMetaData oldCfm = Schema.instance.getCFMetaData(keyspace, createIdx.getColumnFamily()); if (oldCfm == null) throw new InvalidRequestException("No such column family: " + createIdx.getColumnFamily()); @@ -737,7 +723,6 @@ public class QueryProcessor { cfm.addDefaultIndexNames(); MigrationManager.announceColumnFamilyUpdate(cfm); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -752,12 +737,10 @@ public class QueryProcessor case DROP_INDEX: DropIndexStatement dropIdx = (DropIndexStatement)statement.statement; clientState.hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { MigrationManager.announceColumnFamilyUpdate(dropIdx.generateCFMetadataUpdate(clientState.getKeyspace())); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -779,12 +762,10 @@ public class QueryProcessor String deleteKeyspace = (String)statement.statement; ThriftValidation.validateKeyspaceNotSystem(deleteKeyspace); clientState.hasKeyspaceSchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { MigrationManager.announceKeyspaceDrop(deleteKeyspace); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -799,12 +780,10 @@ public class QueryProcessor case DROP_COLUMNFAMILY: String deleteColumnFamily = (String)statement.statement; clientState.hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { MigrationManager.announceColumnFamilyDrop(keyspace, deleteColumnFamily); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -821,12 +800,10 @@ public class QueryProcessor validateColumnFamily(keyspace, alterTable.columnFamily); clientState.hasColumnFamilyAccess(alterTable.columnFamily, Permission.WRITE); - validateSchemaAgreement(); try { MigrationManager.announceColumnFamilyUpdate(alterTable.getCFMetaData(keyspace)); - validateSchemaIsSettled(); } catch (ConfigurationException e) { @@ -842,7 +819,7 @@ public class QueryProcessor } public static CqlResult process(String queryString, ClientState clientState) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { logger.trace("CQL QUERY: {}", queryString); return processStatement(getStatement(queryString), clientState, new ArrayList<ByteBuffer>(0)); @@ -866,7 +843,7 @@ public class QueryProcessor } public static CqlResult processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { // Check to see if there are any bound variables to verify if (!(variables.isEmpty() && (statement.boundTerms == 0))) @@ -948,27 +925,6 @@ public class QueryProcessor } } - private static void validateSchemaIsSettled() throws SchemaDisagreementException - { - long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement; - - outer: - while (limit - System.currentTimeMillis() >= 0) - { - String currentVersionId = Schema.instance.getVersion().toString(); - for (String version : describeSchemaVersions().keySet()) - { - if (!version.equals(currentVersionId)) - continue outer; - } - - // schemas agree - return; - } - - throw new SchemaDisagreementException(); - } - private static void validateCountOperation(SelectStatement select) throws InvalidRequestException { if (select.isWildcard()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql3/CQLStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java index 5cb6288..c1daa52 100644 --- a/src/java/org/apache/cassandra/cql3/CQLStatement.java +++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; @@ -47,7 +46,7 @@ public interface CQLStatement * * @param state the current client state */ - public void validate(ClientState state) throws InvalidRequestException, SchemaDisagreementException; + public void validate(ClientState state) throws InvalidRequestException; /** * Execute the statement and return the resulting result or null if there is no result. @@ -56,5 +55,5 @@ public interface CQLStatement * @param variables the values for bounded variables. The implementation * can assume that each bound term have a corresponding value. */ - public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException; + public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 55c9450..d5eb486 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -100,7 +100,7 @@ public class QueryProcessor } private static ResultMessage processStatement(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { statement.checkAccess(clientState); statement.validate(clientState); @@ -109,7 +109,7 @@ public class QueryProcessor } public static ResultMessage process(String queryString, ClientState clientState) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { logger.trace("CQL QUERY: {}", queryString); return processStatement(getStatement(queryString, clientState).statement, clientState, Collections.<ByteBuffer>emptyList()); @@ -138,10 +138,6 @@ public class QueryProcessor { throw new RuntimeException(e); } - catch (SchemaDisagreementException e) - { - throw new RuntimeException(e); - } } public static UntypedResultSet resultify(String query, Row row) @@ -175,7 +171,7 @@ public class QueryProcessor } public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables) - throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + throws UnavailableException, InvalidRequestException, TimedOutException { // Check to see if there are any bound variables to verify if (!(variables.isEmpty() && (statement.getBoundsTerms() == 0))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java index 9e53f23..262c6aa 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java @@ -29,7 +29,6 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.thrift.ThriftValidation; /** A <code>CREATE KEYSPACE</code> statement parsed from a CQL query. */ @@ -62,7 +61,7 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement * @throws InvalidRequestException if arguments are missing or unacceptable */ @Override - public void validate(ClientState state) throws InvalidRequestException, SchemaDisagreementException + public void validate(ClientState state) throws InvalidRequestException { super.validate(state); ThriftValidation.validateKeyspaceNotSystem(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java index 9c3b345..af9b0a2 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java @@ -22,7 +22,6 @@ import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.thrift.ThriftValidation; public class DropKeyspaceStatement extends SchemaAlteringStatement @@ -36,7 +35,7 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement } @Override - public void validate(ClientState state) throws InvalidRequestException, SchemaDisagreementException + public void validate(ClientState state) throws InvalidRequestException { super.validate(state); ThriftValidation.validateKeyspaceNotSystem(keyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index c393eb2..8d85351 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@ -30,7 +30,6 @@ import org.apache.cassandra.cql3.CFName; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import com.google.common.base.Predicates; import com.google.common.collect.Maps; @@ -79,12 +78,10 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL } @Override - public void validate(ClientState state) throws InvalidRequestException, SchemaDisagreementException - { - validateSchemaAgreement(); - } + public void validate(ClientState state) throws InvalidRequestException + {} - public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, SchemaDisagreementException + public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException { try { @@ -96,42 +93,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL ex.initCause(e); throw ex; } - validateSchemaIsSettled(); - return null; - } - // Copypasta from CassandraServer (where it is private). - private static void validateSchemaAgreement() throws SchemaDisagreementException - { - if (describeSchemaVersions().size() > 1) - throw new SchemaDisagreementException(); - } - - private static Map<String, List<String>> describeSchemaVersions() - { - // unreachable hosts don't count towards disagreement - return Maps.filterKeys(StorageProxy.describeSchemaVersions(), - Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); - } - - private static void validateSchemaIsSettled() throws SchemaDisagreementException - { - long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement; - - outer: - while (limit - System.currentTimeMillis() >= 0) - { - String currentVersionId = Schema.instance.getVersion().toString(); - for (String version : describeSchemaVersions().keySet()) - { - if (!version.equals(currentVersionId)) - continue outer; - } - - // schemas agree - return; - } - - throw new SchemaDisagreementException(); + return null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 0b5205a..1437cb8 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -918,8 +918,6 @@ public class CassandraServer implements Cassandra.Iface logger.debug("add_column_family"); state().hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); - try { cf_def.unsetId(); // explicitly ignore any id set by client (Hector likes to set zero) @@ -945,7 +943,6 @@ public class CassandraServer implements Cassandra.Iface ClientState cState = state(); cState.hasColumnFamilySchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { @@ -966,7 +963,6 @@ public class CassandraServer implements Cassandra.Iface logger.debug("add_keyspace"); ThriftValidation.validateKeyspaceNotSystem(ks_def.name); state().hasKeyspaceSchemaAccess(Permission.WRITE); - validateSchemaAgreement(); ThriftValidation.validateKeyspaceNotYetExisting(ks_def.name); // generate a meaningful error if the user setup keyspace and/or column definition incorrectly @@ -1005,7 +1001,6 @@ public class CassandraServer implements Cassandra.Iface logger.debug("drop_keyspace"); ThriftValidation.validateKeyspaceNotSystem(keyspace); state().hasKeyspaceSchemaAccess(Permission.WRITE); - validateSchemaAgreement(); try { @@ -1032,7 +1027,6 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateTable(ks_def.name); if (ks_def.getCf_defs() != null && ks_def.getCf_defs().size() > 0) throw new InvalidRequestException("Keyspace update must not contain any column family definitions."); - validateSchemaAgreement(); try { @@ -1057,7 +1051,6 @@ public class CassandraServer implements Cassandra.Iface CFMetaData oldCfm = Schema.instance.getCFMetaData(cf_def.keyspace, cf_def.name); if (oldCfm == null) throw new InvalidRequestException("Could not find column family definition to modify."); - validateSchemaAgreement(); try { @@ -1075,15 +1068,6 @@ public class CassandraServer implements Cassandra.Iface } } - private void validateSchemaAgreement() throws SchemaDisagreementException - { - // unreachable hosts don't count towards disagreement - Map<String, List<String>> versions = Maps.filterKeys(StorageProxy.describeSchemaVersions(), - Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); - if (versions.size() > 1) - throw new SchemaDisagreementException(); - } - public void truncate(String cfname) throws InvalidRequestException, UnavailableException, TimedOutException, TException { ClientState cState = state(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 1cec3fc..7204f97 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -27,7 +27,6 @@ import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.thrift.AuthenticationException; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; @@ -40,7 +39,6 @@ import org.apache.cassandra.thrift.UnavailableException; * 0x0002: Authentication error * 0x0100: Unavailable exception * 0x0101: Timeout exception - * 0x0102: Schema disagreement exception * 0x0200: Request exception */ public class ErrorMessage extends Message.Response @@ -81,8 +79,6 @@ public class ErrorMessage extends Message.Response return new ErrorMessage(0x0101, msg); else if (t instanceof UnavailableException) return new ErrorMessage(0x0100, msg); - else if (t instanceof SchemaDisagreementException) - return new ErrorMessage(0x0102, msg); else if (t instanceof InvalidRequestException) return new ErrorMessage(0x0200, msg); else if (t instanceof ProtocolException) http://git-wip-us.apache.org/repos/asf/cassandra/blob/71f5d91a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 6fcdc67..2aefcb8 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -22,7 +22,6 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.transport.*; import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; @@ -68,8 +67,7 @@ public class QueryMessage extends Message.Request { if (!((e instanceof UnavailableException) || (e instanceof InvalidRequestException) - || (e instanceof TimedOutException) - || (e instanceof SchemaDisagreementException))) + || (e instanceof TimedOutException))) { logger.error("Unexpected error during query", e); }