Author: gdusbabek
Date: Fri Sep 3 20:25:06 2010
New Revision: 992454
URL: http://svn.apache.org/viewvc?rev=992454&view=rev
Log:
avro update ks and cf implementations. patch by gdusbabek, reviewed by stuhood.
CASSANDRA-1285
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/test/system/test_avro_system.py
Modified: cassandra/trunk/interface/cassandra.genavro
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=992454&r1=992453&r2=992454&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Fri Sep 3 20:25:06 2010
@@ -332,6 +332,12 @@ protocol Cassandra {
string system_rename_keyspace(string old_name, string new_name)
throws InvalidRequestException;
+
+ string system_update_column_family(CfDef cf_def)
+ throws InvalidRequestException;
+
+ string system_update_keyspace(KsDef ks_def)
+ throws InvalidRequestException;
array<string> describe_keyspaces();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=992454&r1=992453&r2=992454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri
Sep 3 20:25:06 2010
@@ -43,8 +43,11 @@ import org.apache.avro.util.Utf8;
import org.apache.cassandra.avro.InvalidRequestException;
import org.apache.cassandra.db.migration.DropKeyspace;
import org.apache.cassandra.db.migration.RenameKeyspace;
+import org.apache.cassandra.db.migration.UpdateColumnFamily;
+import org.apache.cassandra.db.migration.UpdateKeyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -709,6 +712,87 @@ public class CassandraServer implements
}
@Override
+ public CharSequence system_update_column_family(CfDef cf_def) throws
AvroRemoteException, InvalidRequestException
+ {
+ checkKeyspaceAndLoginAuthorized(Permission.WRITE);
+
+ if (cf_def.keyspace == null || cf_def.name == null)
+ throw newInvalidRequestException("Keyspace and CF name must be
set.");
+
+ CFMetaData oldCfm =
DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace.toString(),
cf_def.name.toString()));
+ if (oldCfm == null)
+ throw newInvalidRequestException("Could not find column family
definition to modify.");
+
+ try
+ {
+ CFMetaData newCfm = oldCfm.apply(cf_def);
+ UpdateColumnFamily update = new UpdateColumnFamily(oldCfm, newCfm);
+ applyMigrationOnStage(update);
+ return DatabaseDescriptor.getDefsVersion().toString();
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex =
newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex =
newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
+ @Override
+ public CharSequence system_update_keyspace(KsDef ks_def) throws
AvroRemoteException, InvalidRequestException
+ {
+ checkKeyspaceAndLoginAuthorized(Permission.WRITE);
+
+ if (ks_def.cf_defs != null && ks_def.cf_defs.size() > 0)
+ throw newInvalidRequestException("Keyspace update must not contain
any column family definitions.");
+
+ if (StorageService.instance.getLiveNodes().size() <
ks_def.replication_factor)
+ throw newInvalidRequestException("Not enough live nodes to support
this keyspace");
+ if (DatabaseDescriptor.getTableDefinition(ks_def.name.toString()) ==
null)
+ throw newInvalidRequestException("Keyspace does not exist.");
+
+ try
+ {
+ // convert Map<CharSequence, CharSequence> to Map<String, String>
+ Map<String, String> strategyOptions = null;
+ if (ks_def.strategy_options != null &&
!ks_def.strategy_options.isEmpty())
+ {
+ strategyOptions = new HashMap<String, String>();
+ for (Map.Entry<CharSequence, CharSequence> option :
ks_def.strategy_options.entrySet())
+ {
+ strategyOptions.put(option.getKey().toString(),
option.getValue().toString());
+ }
+ }
+
+ KSMetaData ksm = new KSMetaData(
+ ks_def.name.toString(),
+ (Class<? extends AbstractReplicationStrategy>)
FBUtilities.<AbstractReplicationStrategy>classForName(ks_def.strategy_class.toString(),
"keyspace replication strategy"),
+ strategyOptions,
+ ks_def.replication_factor);
+ applyMigrationOnStage(new UpdateKeyspace(ksm));
+ return DatabaseDescriptor.getDefsVersion().toString();
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex =
newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex =
newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
+ @Override
public GenericArray<CharSequence> describe_keyspaces() throws
AvroRemoteException
{
Set<String> keyspaces = DatabaseDescriptor.getTables();
@@ -802,35 +886,6 @@ public class CassandraServer implements
DatabaseDescriptor.getComparator(validate),
ColumnDefinition.fromColumnDefs((Iterable<ColumnDef>) cf_def.column_metadata));
}
-
- private CfDef convertToCfDef(CFMetaData cfMetadata) throws
InvalidRequestException
- {
- CfDef cfDef = new CfDef();
- if (cfMetadata.subcolumnComparator != null)
- {
- cfDef.subcomparator_type =
cfMetadata.subcolumnComparator.getClass().getName();
- cfDef.column_type = "Super";
- }
- cfDef.keyspace = cfMetadata.tableName;
- cfDef.name = cfMetadata.cfName;
- cfDef.clock_type = cfMetadata.clockType.name();
- cfDef.column_type = cfMetadata.cfType.name();
- cfDef.comment = cfMetadata.comment;
- cfDef.comparator_type = cfMetadata.comparator.getClass().getName();
-
- GenericArray<ColumnDef> column_metadata = new
GenericData.Array<ColumnDef>(cfMetadata.column_metadata.size(),
Schema.createArray(ColumnDef.SCHEMA$));
- for (ColumnDefinition col_definition :
cfMetadata.column_metadata.values())
- {
- ColumnDef cdef = new ColumnDef();
- cdef.name = ByteBuffer.wrap(col_definition.name);
- cdef.validation_class =
col_definition.validator.getClass().getName();
- cdef.index_name = col_definition.index_name;
- cdef.index_type =
IndexType.valueOf(col_definition.index_type.name());
- column_metadata.add(cdef);
- }
- cfDef.column_metadata = column_metadata;
- return cfDef;
- }
@Override
public KsDef describe_keyspace(CharSequence keyspace) throws
AvroRemoteException, NotFoundException
@@ -852,7 +907,7 @@ public class CassandraServer implements
GenericArray<CfDef> cfDefs = new
GenericData.Array<CfDef>(ksMetadata.cfMetaData().size(),
Schema.createArray(CfDef.SCHEMA$));
for (CFMetaData cfm : ksMetadata.cfMetaData().values())
{
- cfDefs.add(convertToCfDef(cfm));
+ cfDefs.add(CFMetaData.convertToAvro(cfm));
}
ksDef.cf_defs = cfDefs;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=992454&r1=992453&r2=992454&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri
Sep 3 20:25:06 2010
@@ -338,6 +338,48 @@ public final class CFMetaData
return validator;
}
+ public CFMetaData apply(org.apache.cassandra.avro.CfDef cf_def) throws
ConfigurationException
+ {
+ // validate.
+ if (cf_def.id != cfId)
+ throw new ConfigurationException(String.format("ids do not match.
%d, %d", cf_def.id, cfId));
+ if (!cf_def.keyspace.toString().equals(tableName))
+ throw new ConfigurationException(String.format("keyspaces do not
match. %s, %s", cf_def.keyspace, tableName));
+ if (!cf_def.name.toString().equals(cfName))
+ throw new ConfigurationException("names do not match.");
+ if (!cf_def.column_type.toString().equals(cfType.name()))
+ throw new ConfigurationException("types do not match.");
+ if (!cf_def.clock_type.toString().equals(clockType.name()))
+ throw new ConfigurationException("clock types do not match.");
+ if (comparator !=
DatabaseDescriptor.getComparator(cf_def.comparator_type.toString()))
+ throw new ConfigurationException("comparators do not match.");
+ if (cf_def.subcomparator_type == null ||
cf_def.subcomparator_type.equals(""))
+ {
+ if (subcolumnComparator != null)
+ throw new ConfigurationException("subcolumncomparators do not
match.");
+ // else, it's null and we're good.
+ }
+ else if (subcolumnComparator !=
DatabaseDescriptor.getComparator(cf_def.subcomparator_type.toString()))
+ throw new ConfigurationException("subcolumncomparators do not
match.");
+
+ return new CFMetaData(tableName,
+ cfName,
+ cfType,
+ clockType,
+ comparator,
+ subcolumnComparator,
+ reconciler,
+ cf_def.comment == null ? "" :
cf_def.comment.toString(),
+ cf_def.row_cache_size,
+ cf_def.preload_row_cache,
+ cf_def.key_cache_size,
+ cf_def.read_repair_chance,
+ cf_def.gc_grace_seconds,
+
DatabaseDescriptor.getComparator(cf_def.default_validation_class == null ?
(String)null : cf_def.default_validation_class.toString()),
+ cfId,
+ column_metadata);
+ }
+
// merges some final fields from this CFM with modifiable fields from
CfDef into a new CFMetaData.
public CFMetaData apply(org.apache.cassandra.thrift.CfDef cf_def) throws
ConfigurationException
{
@@ -402,7 +444,7 @@ public final class CFMetaData
def.setRead_repair_chance(cfm.readRepairChance);
def.setGc_grace_seconds(cfm.gcGraceSeconds);
def.setDefault_validation_class(cfm.defaultValidator.getClass().getName());
- List< org.apache.cassandra.thrift.ColumnDef> column_meta = new
ArrayList< org.apache.cassandra.thrift.ColumnDef>();
+ List<org.apache.cassandra.thrift.ColumnDef> column_meta = new
ArrayList< org.apache.cassandra.thrift.ColumnDef>(cfm.column_metadata.size());
for (ColumnDefinition cd : cfm.column_metadata.values())
{
org.apache.cassandra.thrift.ColumnDef tcd = new
org.apache.cassandra.thrift.ColumnDef();
@@ -415,4 +457,41 @@ public final class CFMetaData
def.setColumn_metadata(column_meta);
return def;
}
+
+ // converts CFM to avro CfDef
+ public static org.apache.cassandra.avro.CfDef convertToAvro(CFMetaData cfm)
+ {
+ org.apache.cassandra.avro.CfDef def = new
org.apache.cassandra.avro.CfDef();
+ def.name = cfm.cfName;
+ def.keyspace = cfm.tableName;
+ def.id = cfm.cfId;
+ def.column_type = cfm.cfType.name();
+ def.clock_type = cfm.clockType.name();
+ def.comparator_type = cfm.comparator.getClass().getName();
+ if (cfm.subcolumnComparator != null)
+ {
+ def.subcomparator_type =
cfm.subcolumnComparator.getClass().getName();
+ def.column_type = "Super";
+ }
+ def.reconciler = cfm.reconciler == null ? "" :
cfm.reconciler.getClass().getName();
+ def.comment = cfm.comment == null ? "" : cfm.comment;
+ def.row_cache_size = cfm.rowCacheSize;
+ def.preload_row_cache = cfm.preloadRowCache;
+ def.key_cache_size = cfm.keyCacheSize;
+ def.read_repair_chance = cfm.readRepairChance;
+ def.gc_grace_seconds = cfm.gcGraceSeconds;
+ def.default_validation_class =
cfm.defaultValidator.getClass().getName();
+ List<org.apache.cassandra.avro.ColumnDef> column_meta = new
ArrayList<org.apache.cassandra.avro.ColumnDef>(cfm.column_metadata.size());
+ for (ColumnDefinition cd : cfm.column_metadata.values())
+ {
+ org.apache.cassandra.avro.ColumnDef tcd = new
org.apache.cassandra.avro.ColumnDef();
+ tcd.index_name = cd.index_name;
+ tcd.index_type =
org.apache.cassandra.avro.IndexType.valueOf(cd.index_type.name());
+ tcd.name = ByteBuffer.wrap(cd.name);
+ tcd.validation_class = cd.validator.getClass().getName();
+ column_meta.add(tcd);
+ }
+ def.column_metadata = column_meta;
+ return def;
+ }
}
Modified: cassandra/trunk/test/system/test_avro_system.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_system.py?rev=992454&r1=992453&r2=992454&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_system.py (original)
+++ cassandra/trunk/test/system/test_avro_system.py Fri Sep 3 20:25:06 2010
@@ -40,6 +40,26 @@ class TestSystemOperations(AvroTester):
s = self.client.request('system_add_keyspace', {'ks_def' : keyspace})
assert isinstance(s, unicode), 'returned type is %s, (not
\'unicode\')' % type(s)
+ self.client.request('set_keyspace', {'keyspace' : 'CreateKeyspace'})
+
+ # modify invalid
+ modified_keyspace = {'name': 'CreateKeyspace',
+ 'strategy_class':
'org.apache.cassandra.locator.OldNetworkTopologyStrategy',
+ 'strategy_options': {},
+ 'replication_factor': 2,
+ 'cf_defs': []}
+ avro_utils.assert_raises(AvroRemoteException,
+ self.client.request,
+ 'system_update_keyspace',
+ {'ks_def': modified_keyspace})
+
+ # modify valid
+ modified_keyspace['replication_factor'] = 1
+ self.client.request('system_update_keyspace', {'ks_def':
modified_keyspace})
+ modks = self.client.request('describe_keyspace', {'keyspace':
'CreateKeyspace'})
+ assert modks['replication_factor'] ==
modified_keyspace['replication_factor']
+ assert modks['strategy_class'] == modified_keyspace['strategy_class']
+
# rename
self.client.request('set_keyspace', {'keyspace' : 'CreateKeyspace'})
s = self.client.request(
@@ -57,6 +77,7 @@ class TestSystemOperations(AvroTester):
self.client.request,
'describe_keyspace',
{'keyspace' : 'RenameKeyspace'})
+
def test_system_column_family_operations(self):
"adding, renaming, and removing column families"
self.client.request('set_keyspace', {'keyspace': 'Keyspace1'})
@@ -70,6 +91,7 @@ class TestSystemOperations(AvroTester):
cfDef['keyspace'] = 'Keyspace1'
cfDef['name'] = 'NewColumnFamily'
cfDef['column_metadata'] = [columnDef]
+
s = self.client.request('system_add_column_family', {'cf_def' : cfDef})
assert isinstance(s, unicode), \
'returned type is %s, (not \'unicode\')' % type(s)
@@ -77,7 +99,27 @@ class TestSystemOperations(AvroTester):
ks1 = self.client.request(
'describe_keyspace', {'keyspace' : 'Keyspace1'})
assert 'NewColumnFamily' in [x['name'] for x in ks1['cf_defs']]
+ cfDef = [x for x in ks1['cf_defs'] if x['name']=='NewColumnFamily'][0]
+ assert cfDef['id'] > 1000, str(cfid)
+ # modify invalid
+ cfDef['comparator_type'] = 'LongType'
+ avro_utils.assert_raises(AvroRemoteException,
+ self.client.request,
+ 'system_update_column_family',
+ {'cf_def': cfDef})
+
+ # modify valid
+ cfDef['comparator_type'] = 'BytesType' # revert back to old value.
+ cfDef['row_cache_size'] = 25
+ cfDef['gc_grace_seconds'] = 1
+ self.client.request('system_update_column_family', {'cf_def': cfDef})
+ ks1 = self.client.request('describe_keyspace', {'keyspace':
'Keyspace1'})
+ server_cf = [x for x in ks1['cf_defs'] if
x['name']=='NewColumnFamily'][0]
+ assert server_cf
+ assert server_cf['row_cache_size'] == 25
+ assert server_cf['gc_grace_seconds'] == 1
+
# rename
self.client.request('system_rename_column_family',
{'old_name' : 'NewColumnFamily', 'new_name': 'RenameColumnFamily'})