Author: jbellis
Date: Thu Oct 7 18:12:13 2010
New Revision: 1005551
URL: http://svn.apache.org/viewvc?rev=1005551&view=rev
Log:
Allow dynamic secondary index creation and destruction
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1532
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
cassandra/trunk/test/system/test_thrift_server.py
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct 7 18:12:13 2010
@@ -20,6 +20,7 @@ dev
* add cache save/load ability (CASSANDRA-1417)
* add StorageService.getDrainProgress (CASSANDRA-1588)
* Disallow bootstrap to an in-use token (CASSANDRA-1561)
+ * Allow dynamic secondary index creation and destruction (CASSANDRA-1532)
0.7-beta2
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu
Oct 7 18:12:13 2010
@@ -547,6 +547,20 @@ public final class CFMetaData
validateMinMaxCompactionThresholds(cf_def);
+ Map<byte[], ColumnDefinition> metadata = new HashMap<byte[],
ColumnDefinition>();
+ if (cf_def.column_metadata == null)
+ {
+ metadata = column_metadata;
+ }
+ else
+ {
+ for (org.apache.cassandra.thrift.ColumnDef def :
cf_def.column_metadata)
+ {
+ ColumnDefinition cd = new ColumnDefinition(def.name,
def.validation_class, def.index_type, def.index_name);
+ metadata.put(cd.name, cd);
+ }
+ }
+
return new CFMetaData(tableName,
cfName,
cfType,
@@ -564,7 +578,7 @@ public final class CFMetaData
cf_def.min_compaction_threshold,
cf_def.max_compaction_threshold,
cfId,
- column_metadata,
+ metadata,
rowCacheSavePeriodInSeconds,
keyCacheSavePeriodInSeconds);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Oct
7 18:12:13 2010
@@ -345,4 +345,20 @@ public class SystemTable
forceBlockingFlush(INDEX_CF);
}
+
+ public static void setIndexRemoved(String table, String indexName)
+ {
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
table.getBytes(UTF_8));
+ rm.delete(new QueryPath(INDEX_CF, null, indexName.getBytes(UTF_8)),
new TimestampClock(System.currentTimeMillis()));
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+
+ forceBlockingFlush(INDEX_CF);
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
Thu Oct 7 18:12:13 2010
@@ -1,10 +1,8 @@
package org.apache.cassandra.db.migration;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -12,6 +10,7 @@ import org.apache.cassandra.utils.UUIDGe
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* Licensed to the Apache Software Foundation (ASF) under one
@@ -51,10 +50,6 @@ public class UpdateColumnFamily extends
this.oldCfm = oldCfm;
this.newCfm = newCfm;
- // we'll allow this eventually.
- if (!oldCfm.column_metadata.equals(newCfm.column_metadata))
- throw new ConfigurationException("Column meta information is not
identical.");
-
// clone ksm but include the new cf def.
KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
rm = Migration.makeDefinitionMutation(newKsm, null, newVersion);
@@ -78,12 +73,30 @@ public class UpdateColumnFamily extends
void applyModels() throws IOException
{
- // all we really need to do is reload the cfstore.
+ logger.debug("Updating " + oldCfm + " to " + newCfm);
KSMetaData newKsm =
makeNewKeyspaceDefinition(DatabaseDescriptor.getTableDefinition(newCfm.tableName));
DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
if (!clientMode)
- Table.open(oldCfm.tableName).reloadCf(newCfm.cfId);
+ {
+ Table table = Table.open(oldCfm.tableName);
+ ColumnFamilyStore oldCfs =
table.getColumnFamilyStore(oldCfm.cfName);
+ table.reloadCf(newCfm.cfId);
+
+ // clean up obsolete index data files
+ for (Map.Entry<byte[], ColumnDefinition> entry :
oldCfm.column_metadata.entrySet())
+ {
+ byte[] column = entry.getKey();
+ ColumnDefinition def = entry.getValue();
+ if (def.index_type != null
+ && (!newCfm.column_metadata.containsKey(column) ||
newCfm.column_metadata.get(column).index_type == null))
+ {
+ ColumnFamilyStore indexCfs =
oldCfs.getIndexedColumnFamilyStore(column);
+ SystemTable.setIndexRemoved(table.name,
indexCfs.columnFamily);
+ indexCfs.removeAllSSTables();
+ }
+ }
+ }
}
public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1005551&r1=1005550&r2=1005551&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Thu Oct 7 18:12:13 2010
@@ -1364,6 +1364,46 @@ class TestMutations(ThriftTester):
assert 'NewColumnFamily' not in [x.name for x in ks1.cf_defs]
assert 'Standard1' in [x.name for x in ks1.cf_defs]
+ def test_dynamic_indexes_with_system_update_cf(self):
+ _set_keyspace('Keyspace1')
+ cd = ColumnDef('birthdate', 'BytesType', None, None)
+ newcf = CfDef('Keyspace1', 'ToBeIndexed',
default_validation_class='LongType', column_metadata=[cd])
+ client.system_add_column_family(newcf)
+
+ client.insert('key1', ColumnParent('ToBeIndexed'), Column('birthdate',
_i64(1), 0), ConsistencyLevel.ONE)
+ client.insert('key2', ColumnParent('ToBeIndexed'), Column('birthdate',
_i64(2), 0), ConsistencyLevel.ONE)
+ client.insert('key2', ColumnParent('ToBeIndexed'), Column('b',
_i64(2), 0), ConsistencyLevel.ONE)
+ client.insert('key3', ColumnParent('ToBeIndexed'), Column('birthdate',
_i64(3), 0), ConsistencyLevel.ONE)
+ client.insert('key3', ColumnParent('ToBeIndexed'), Column('b',
_i64(3), 0), ConsistencyLevel.ONE)
+
+ # Should fail without index
+ cp = ColumnParent('ToBeIndexed')
+ sp = SlicePredicate(slice_range=SliceRange('', ''))
+ clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ,
_i64(1))], '')
+ _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp,
ConsistencyLevel.ONE), InvalidRequestException)
+
+ # add an index on 'birthdate'
+ ks1 = client.describe_keyspace('Keyspace1')
+ cfid = [x.id for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
+ modified_cd = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, None)
+ modified_cf = CfDef('Keyspace1', 'ToBeIndexed',
column_metadata=[modified_cd])
+ modified_cf.id = cfid
+ client.system_update_column_family(modified_cf)
+ ks1 = client.describe_keyspace('Keyspace1')
+ server_cf = [x for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
+ assert server_cf
+ assert server_cf.column_metadata[0].index_type ==
modified_cd.index_type
+ assert server_cf.column_metadata[0].index_name ==
modified_cd.index_name
+
+ # simple query on one index expression
+ cp = ColumnParent('ToBeIndexed')
+ sp = SlicePredicate(slice_range=SliceRange('', ''))
+ clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ,
_i64(1))], '')
+ result = client.get_indexed_slices(cp, clause, sp,
ConsistencyLevel.ONE)
+ assert len(result) == 1, result
+ assert result[0].key == 'key1'
+ assert len(result[0].columns) == 1, result[0].columns
+
def test_system_super_column_family_operations(self):
_set_keyspace('Keyspace1')