Author: jbellis
Date: Wed May 11 14:19:20 2011
New Revision: 1101893

URL: http://svn.apache.org/viewvc?rev=1101893&view=rev
Log:
avoid attempting to replay mutationsfrom dropped keyspaces
patch by jbellis; reviewed by slebresne for CASSANDRA-2631

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1101893&r1=1101892&r2=1101893&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed May 11 14:19:20 2011
@@ -18,6 +18,7 @@
  * Fix regression where bootstrapping a node with no schema fails
    (CASSANDRA-2625)
  * Allow removing LocationInfo sstables (CASSANDRA-2632)
+ * avoid attempting to replay mutations from dropped keyspaces (CASSANDRA-2631)
 
 
 0.7.5

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1101893&r1=1101892&r2=1101893&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
 Wed May 11 14:19:20 2011
@@ -165,7 +165,7 @@ public class CommitLog
 
     public static void recover(File[] clogs) throws IOException
     {
-        Set<Table> tablesRecovered = new HashSet<Table>();
+        final Set<Table> tablesRecovered = new HashSet<Table>();
         List<Future<?>> futures = new ArrayList<Future<?>>();
         byte[] bytes = new byte[4096];
         Map<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, 
AtomicInteger>();
@@ -270,9 +270,7 @@ public class CommitLog
                                                     rm.getTable(),
                                                     
ByteBufferUtil.bytesToHex(rm.key()),
                                                     "{" + 
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
-                    final Table table = Table.open(rm.getTable());
-                    tablesRecovered.add(table);
-                    final Collection<ColumnFamily> columnFamilies = new 
ArrayList<ColumnFamily>(rm.getColumnFamilies());
+
                     final long entryLocation = reader.getFilePointer();
                     final CommitLogHeader finalHeader = clHeader;
                     final RowMutation frm = rm;
@@ -280,12 +278,15 @@ public class CommitLog
                     {
                         public void runMayThrow() throws IOException
                         {
+                            if 
(DatabaseDescriptor.getKSMetaData(frm.getTable()) == null)
+                                return;
+                            final Table table = Table.open(frm.getTable());
                             RowMutation newRm = new 
RowMutation(frm.getTable(), frm.key());
 
                             // Rebuild the row mutation, omitting column 
families that a) have already been flushed,
                             // b) are part of a cf that was dropped. Keep in 
mind that the cf.name() is suspect. do every
                             // thing based on the cfid instead.
-                            for (ColumnFamily columnFamily : columnFamilies)
+                            for (ColumnFamily columnFamily : 
frm.getColumnFamilies())
                             {
                                 if (CFMetaData.getCF(columnFamily.id()) == 
null)
                                     // null means the cf has been dropped
@@ -297,6 +298,7 @@ public class CommitLog
                             if (!newRm.isEmpty())
                             {
                                 Table.open(newRm.getTable()).apply(newRm, 
null, false);
+                                tablesRecovered.add(table);
                             }
                         }
                     };


Reply via email to