Author: jbellis
Date: Wed Feb 23 15:09:28 2011
New Revision: 1073768
URL: http://svn.apache.org/viewvc?rev=1073768&view=rev
Log:
add validateSchemaAgreement call + synchronization to schema modification calls
patch by jbellis; reviewed by gdusbabek for CASSANDRA-2222
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073768&r1=1073767&r2=1073768&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Feb 23 15:09:28 2011
@@ -21,6 +21,8 @@
* fix BRAF performance when seeking to EOF (CASSANDRA-2218)
* check for memtable flush_after_mins exceeded every 10s (CASSANDRA-2183)
* fix cache saving on Windows (CASSANDRA-2207)
+ * add validateSchemaAgreement call + synchronization to schema
+ modification operations (CASSANDRA-2222)
0.7.2
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1073768&r1=1073767&r2=1073768&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Feb 23 15:09:28 2011
@@ -616,17 +616,18 @@ public class StorageProxy implements Sto
}
hosts.add(host.getHostAddress());
}
+
+ // we're done: the results map is ready to return to the client. the
rest is just debug logging:
if (results.get(UNREACHABLE) != null)
logger.debug("Hosts not in agreement. Didn't get a response from
everybody: " + StringUtils.join(results.get(UNREACHABLE), ","));
- // check for version disagreement. log the hosts that don't agree.
for (Map.Entry<String, List<String>> entry : results.entrySet())
{
+ // check for version disagreement. log the hosts that don't agree.
if (entry.getKey().equals(UNREACHABLE) ||
entry.getKey().equals(myVersion))
continue;
for (String host : entry.getValue())
logger.debug("%s disagrees (%s)", host, entry.getKey());
}
-
if (results.size() == 1)
logger.debug("Schemas are in agreement.");
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1073768&r1=1073767&r2=1073768&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
Wed Feb 23 15:09:28 2011
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -661,11 +663,13 @@ public class CassandraServer implements
}
}
- public String system_add_column_family(CfDef cf_def) throws
InvalidRequestException, TException
+ public synchronized String system_add_column_family(CfDef cf_def) throws
InvalidRequestException, TException
{
logger.debug("add_column_family");
state().hasColumnFamilyListAccess(Permission.WRITE);
ThriftValidation.validateCfDef(cf_def);
+ validateSchemaAgreement();
+
try
{
applyMigrationOnStage(new
AddColumnFamily(convertToCFMetaData(cf_def)));
@@ -685,10 +689,11 @@ public class CassandraServer implements
}
}
- public String system_drop_column_family(String column_family) throws
InvalidRequestException, TException
+ public synchronized String system_drop_column_family(String column_family)
throws InvalidRequestException, TException
{
logger.debug("drop_column_family");
state().hasColumnFamilyListAccess(Permission.WRITE);
+ validateSchemaAgreement();
try
{
@@ -709,10 +714,11 @@ public class CassandraServer implements
}
}
- public String system_add_keyspace(KsDef ks_def) throws
InvalidRequestException, TException
+ public synchronized String system_add_keyspace(KsDef ks_def) throws
InvalidRequestException, TException
{
logger.debug("add_keyspace");
state().hasKeyspaceListAccess(Permission.WRITE);
+ validateSchemaAgreement();
// generate a meaningful error if the user setup keyspace and/or
column definition incorrectly
for (CfDef cf : ks_def.cf_defs)
@@ -754,10 +760,11 @@ public class CassandraServer implements
}
}
- public String system_drop_keyspace(String keyspace) throws
InvalidRequestException, TException
+ public synchronized String system_drop_keyspace(String keyspace) throws
InvalidRequestException, TException
{
logger.debug("drop_keyspace");
state().hasKeyspaceListAccess(Permission.WRITE);
+ validateSchemaAgreement();
try
{
@@ -779,15 +786,15 @@ public class CassandraServer implements
}
/** update an existing keyspace, but do not allow column family
modifications. */
- public String system_update_keyspace(KsDef ks_def) throws
InvalidRequestException, TException
+ public synchronized String system_update_keyspace(KsDef ks_def) throws
InvalidRequestException, TException
{
logger.debug("update_keyspace");
state().hasKeyspaceListAccess(Permission.WRITE);
-
ThriftValidation.validateTable(ks_def.name);
if (ks_def.getCf_defs() != null && ks_def.getCf_defs().size() > 0)
throw new InvalidRequestException("Keyspace update must not
contain any column family definitions.");
-
+ validateSchemaAgreement();
+
try
{
KSMetaData ksm = new KSMetaData(
@@ -812,18 +819,17 @@ public class CassandraServer implements
}
}
- public String system_update_column_family(CfDef cf_def) throws
InvalidRequestException, TException
+ public synchronized String system_update_column_family(CfDef cf_def)
throws InvalidRequestException, TException
{
logger.debug("update_column_family");
state().hasColumnFamilyListAccess(Permission.WRITE);
-
if (cf_def.keyspace == null || cf_def.name == null)
throw new InvalidRequestException("Keyspace and CF name must be
set.");
-
CFMetaData oldCfm =
DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace,
cf_def.name));
if (oldCfm == null)
throw new InvalidRequestException("Could not find column family
definition to modify.");
-
+ validateSchemaAgreement();
+
try
{
// ideally, apply() would happen on the stage with the
@@ -846,6 +852,15 @@ public class CassandraServer implements
}
}
+ private void validateSchemaAgreement() throws InvalidRequestException
+ {
+ // unreachable hosts don't count towards disagreement
+ Map<String, List<String>> versions =
Maps.filterKeys(StorageProxy.describeSchemaVersions(),
+
Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
+ if (versions.size() > 1)
+ throw new InvalidRequestException("Cluster schema does not yet
agree");
+ }
+
// @see CFMetaData.applyImplicitDefaults().
private CFMetaData convertToCFMetaData(CfDef cf_def) throws
InvalidRequestException, ConfigurationException
{