Author: gdusbabek
Date: Fri Nov 19 15:28:07 2010
New Revision: 1036895

URL: http://svn.apache.org/viewvc?rev=1036895&view=rev
Log:
perform index maintenance outside of migration locks during CF update. patch by 
gdusbabek, reviewe by jbellis. CASSANDRA-1715

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/test/system/test_thrift_server.py

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri 
Nov 19 15:28:07 2010
@@ -171,48 +171,40 @@ public class ColumnFamilyStore implement
         if (!memops.isModified())
             memops = new 
DefaultDouble(metadata.getMemtableOperationsInMillions());
         
-        // reset the memtable with new settings.
-        try
-        {
-            forceBlockingFlush();
-        }
-        catch (InterruptedException ex)
-        {
-            throw new RuntimeException(ex);
-        }
-        catch (ExecutionException ex)
-        {
-            throw new IOError(ex.getCause());
-        }
-        
         ssTables.updateCacheSizes();
         
         // figure out what needs to be added and dropped.
-        Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
-        Set<ColumnDefinition> indexesToAdd = new HashSet<ColumnDefinition>();
+        final Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
+        final Set<ColumnDefinition> indexesToAdd = new 
HashSet<ColumnDefinition>();
         
         for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
-        {
             if (!indexedColumns.containsKey(cdef.name))
                 indexesToAdd.add(cdef);
-        }
         for (ByteBuffer indexName : indexedColumns.keySet())
-        {
             if (!metadata.getColumn_metadata().containsKey(indexName))
                 indexesToDrop.add(indexName);
-        }
-        // drop indexes no longer needed.
-        for (ByteBuffer indexName : indexesToDrop)
+        // future: if/when we have modifiable settings for secondary indexes, 
they'll need to be handled here.
+        
+        final Runnable indexMaintenance = new Runnable() 
         {
-            ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
-            assert indexCfs != null;
-            SystemTable.setIndexRemoved(metadata.tableName, metadata.cfName);
-            indexCfs.removeAllSSTables();
-        }
-        // add new indexes.
-        for (ColumnDefinition info : indexesToAdd)
-            if (info.getIndexType() != null)
-                addIndex(info);
+            public void run() 
+            {
+                // drop indexes no longer needed.
+                for (ByteBuffer indexName : indexesToDrop)
+                {
+                    ColumnFamilyStore indexCfs = 
indexedColumns.remove(indexName);
+                    assert indexCfs != null;
+                    SystemTable.setIndexRemoved(metadata.tableName, 
metadata.cfName);
+                    indexCfs.removeAllSSTables();
+                }
+                // add new indexes.
+                for (ColumnDefinition info : indexesToAdd)
+                    if (info.getIndexType() != null)
+                        addIndex(info);        
+            }
+        };
+        // reset the memtable with new settings.
+        maybeSwitchMemtable(memtable, true, indexMaintenance);
     }
 
     private ColumnFamilyStore(Table table, String columnFamilyName, 
IPartitioner partitioner, int generation, CFMetaData metadata)
@@ -616,7 +608,7 @@ public class ColumnFamilyStore implement
     }
 
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
-    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog)
+    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog, final Runnable postFlush)
     {
         /*
          * If we can get the writelock, that means no new updates can come in 
and
@@ -671,6 +663,8 @@ public class ColumnFamilyStore implement
                         // the log header with "you can discard anything 
written before the context" is not valid
                         
CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
                     }
+                    if (postFlush != null)
+                        postFlush.run();
                 }
             });
         }
@@ -702,7 +696,7 @@ public class ColumnFamilyStore implement
         if (memtable.isClean())
             return null;
 
-        return maybeSwitchMemtable(memtable, true);
+        return maybeSwitchMemtable(memtable, true, null);
     }
 
     public void forceBlockingFlush() throws ExecutionException, 
InterruptedException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19 
15:28:07 2010
@@ -448,7 +448,7 @@ public class Table
         // flush memtables that got filled up outside the readlock 
(maybeSwitchMemtable acquires writeLock).
         // usually mTF will be empty and this will be a no-op.
         for (Memtable memtable : memtablesToFlush)
-            memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog);
+            memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog, null);
     }
 
     private static List<Memtable> addFullMemtable(List<Memtable> 
memtablesToFlush, Memtable fullMemtable)
@@ -594,7 +594,7 @@ public class Table
 
                 // during index build, we do flush index memtables separately 
from master; otherwise we could OOM
                 for (Memtable memtable : memtablesToFlush)
-                    memtable.cfs.maybeSwitchMemtable(memtable, false);
+                    memtable.cfs.maybeSwitchMemtable(memtable, false, null);
             }
 
             try

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1036895&r1=1036894&r2=1036895&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Fri Nov 19 15:28:07 2010
@@ -1329,12 +1329,16 @@ class TestMutations(ThriftTester):
         modified_cf = CfDef('Keyspace1', 'ToBeIndexed', 
column_metadata=[modified_cd])
         modified_cf.id = cfid
         client.system_update_column_family(modified_cf)
+        
         ks1 = client.describe_keyspace('Keyspace1')
         server_cf = [x for x in ks1.cf_defs if x.name=='ToBeIndexed'][0]
         assert server_cf
         assert server_cf.column_metadata[0].index_type == 
modified_cd.index_type
         assert server_cf.column_metadata[0].index_name == 
modified_cd.index_name
-
+        
+        # sleep a bit to give time for the index to build.
+        time.sleep(0.1)
+        
         # simple query on one index expression
         cp = ColumnParent('ToBeIndexed')
         sp = SlicePredicate(slice_range=SliceRange('', ''))


Reply via email to