Author: gdusbabek
Date: Fri Nov 19 15:28:07 2010
New Revision: 1036895
URL: http://svn.apache.org/viewvc?rev=1036895&view=rev
Log:
perform index maintenance outside of migration locks during CF update. patch by
gdusbabek, reviewe by jbellis. CASSANDRA-1715
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/test/system/test_thrift_server.py
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Nov 19 15:28:07 2010
@@ -171,48 +171,40 @@ public class ColumnFamilyStore implement
if (!memops.isModified())
memops = new
DefaultDouble(metadata.getMemtableOperationsInMillions());
- // reset the memtable with new settings.
- try
- {
- forceBlockingFlush();
- }
- catch (InterruptedException ex)
- {
- throw new RuntimeException(ex);
- }
- catch (ExecutionException ex)
- {
- throw new IOError(ex.getCause());
- }
-
ssTables.updateCacheSizes();
// figure out what needs to be added and dropped.
- Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
- Set<ColumnDefinition> indexesToAdd = new HashSet<ColumnDefinition>();
+ final Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
+ final Set<ColumnDefinition> indexesToAdd = new
HashSet<ColumnDefinition>();
for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
- {
if (!indexedColumns.containsKey(cdef.name))
indexesToAdd.add(cdef);
- }
for (ByteBuffer indexName : indexedColumns.keySet())
- {
if (!metadata.getColumn_metadata().containsKey(indexName))
indexesToDrop.add(indexName);
- }
- // drop indexes no longer needed.
- for (ByteBuffer indexName : indexesToDrop)
+ // future: if/when we have modifiable settings for secondary indexes,
they'll need to be handled here.
+
+ final Runnable indexMaintenance = new Runnable()
{
- ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
- assert indexCfs != null;
- SystemTable.setIndexRemoved(metadata.tableName, metadata.cfName);
- indexCfs.removeAllSSTables();
- }
- // add new indexes.
- for (ColumnDefinition info : indexesToAdd)
- if (info.getIndexType() != null)
- addIndex(info);
+ public void run()
+ {
+ // drop indexes no longer needed.
+ for (ByteBuffer indexName : indexesToDrop)
+ {
+ ColumnFamilyStore indexCfs =
indexedColumns.remove(indexName);
+ assert indexCfs != null;
+ SystemTable.setIndexRemoved(metadata.tableName,
metadata.cfName);
+ indexCfs.removeAllSSTables();
+ }
+ // add new indexes.
+ for (ColumnDefinition info : indexesToAdd)
+ if (info.getIndexType() != null)
+ addIndex(info);
+ }
+ };
+ // reset the memtable with new settings.
+ maybeSwitchMemtable(memtable, true, indexMaintenance);
}
private ColumnFamilyStore(Table table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
@@ -616,7 +608,7 @@ public class ColumnFamilyStore implement
}
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
- Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
+ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog, final Runnable postFlush)
{
/*
* If we can get the writelock, that means no new updates can come in
and
@@ -671,6 +663,8 @@ public class ColumnFamilyStore implement
// the log header with "you can discard anything
written before the context" is not valid
CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
}
+ if (postFlush != null)
+ postFlush.run();
}
});
}
@@ -702,7 +696,7 @@ public class ColumnFamilyStore implement
if (memtable.isClean())
return null;
- return maybeSwitchMemtable(memtable, true);
+ return maybeSwitchMemtable(memtable, true, null);
}
public void forceBlockingFlush() throws ExecutionException,
InterruptedException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19
15:28:07 2010
@@ -448,7 +448,7 @@ public class Table
// flush memtables that got filled up outside the readlock
(maybeSwitchMemtable acquires writeLock).
// usually mTF will be empty and this will be a no-op.
for (Memtable memtable : memtablesToFlush)
- memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog);
+ memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog, null);
}
private static List<Memtable> addFullMemtable(List<Memtable>
memtablesToFlush, Memtable fullMemtable)
@@ -594,7 +594,7 @@ public class Table
// during index build, we do flush index memtables separately
from master; otherwise we could OOM
for (Memtable memtable : memtablesToFlush)
- memtable.cfs.maybeSwitchMemtable(memtable, false);
+ memtable.cfs.maybeSwitchMemtable(memtable, false, null);
}
try
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Fri Nov 19 15:28:07 2010
@@ -1329,12 +1329,16 @@ class TestMutations(ThriftTester):
modified_cf = CfDef('Keyspace1', 'ToBeIndexed',
column_metadata=[modified_cd])
modified_cf.id = cfid
client.system_update_column_family(modified_cf)
+
ks1 = client.describe_keyspace('Keyspace1')
server_cf = [x for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
assert server_cf
assert server_cf.column_metadata[0].index_type ==
modified_cd.index_type
assert server_cf.column_metadata[0].index_name ==
modified_cd.index_name
-
+
+ # sleep a bit to give time for the index to build.
+ time.sleep(0.1)
+
# simple query on one index expression
cp = ColumnParent('ToBeIndexed')
sp = SlicePredicate(slice_range=SliceRange('', ''))