Author: jbellis
Date: Fri Dec  2 15:47:07 2011
New Revision: 1209543

URL: http://svn.apache.org/viewvc?rev=1209543&view=rev
Log:
fix adding to leveled manifest after streaming
patch by jbellis; reviewed by Ben Coverston and tested by Joe Siegrist for 
CASSANDRA-3536

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/DataTracker.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1209543&r1=1209542&r2=1209543&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Dec  2 15:47:07 2011
@@ -1,4 +1,5 @@
 1.0.6
+ * fix adding to leveled manifest after streaming (CASSANDRA-3536)
  * add command to stop compactions (CASSANDRA-1740)
  * filter out unavailable cipher suites when using encryption (CASSANDRA-3178)
  * (HADOOP) add old-style api support for CFIF and CFRR (CASSANDRA-2799)

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1209543&r1=1209542&r2=1209543&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Fri Dec  2 15:47:07 2011
@@ -230,7 +230,7 @@ public class ColumnFamilyStore implement
         data = new DataTracker(this);
         Set<DecoratedKey> savedKeys = keyCache.readSaved();
         Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, 
columnFamilyName, false, false).entrySet();
-        data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, 
metadata, this.partitioner));
+        data.addInitialSSTables(SSTableReader.batchOpen(entries, savedKeys, 
data, metadata, this.partitioner));
 
         // compaction strategy should be created after the CFS has been 
prepared
         this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
@@ -916,7 +916,7 @@ public class ColumnFamilyStore implement
     public void addSSTable(SSTableReader sstable)
     {
         assert sstable.getColumnFamilyName().equals(columnFamily);
-        data.addStreamedSSTable(sstable);
+        data.addSSTables(Arrays.asList(sstable));
         CompactionManager.instance.submitBackground(this);
     }
 
@@ -971,6 +971,7 @@ public class ColumnFamilyStore implement
 
     public void markCompacted(Collection<SSTableReader> sstables)
     {
+        assert !sstables.isEmpty();
         data.markCompacted(sstables);
     }
 

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/DataTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/DataTracker.java?rev=1209543&r1=1209542&r2=1209543&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/DataTracker.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/DataTracker.java
 Fri Dec  2 15:47:07 2011
@@ -225,28 +225,36 @@ public class DataTracker
     public void markCompacted(Collection<SSTableReader> sstables)
     {
         replace(sstables, Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList());
     }
 
     public void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements)
     {
         replace(sstables, replacements);
+        notifySSTablesChanged(sstables, replacements);
     }
 
-    public void addSSTables(Collection<SSTableReader> sstables)
+    public void addInitialSSTables(Collection<SSTableReader> sstables)
     {
         replace(Collections.<SSTableReader>emptyList(), sstables);
+        // no notifications or backup necessary
     }
 
-    public void addStreamedSSTable(SSTableReader sstable)
+    public void addSSTables(Collection<SSTableReader> sstables)
     {
-        addSSTables(Arrays.asList(sstable));
-        incrementallyBackup(sstable);
-        notifyAdded(sstable);
+        replace(Collections.<SSTableReader>emptyList(), sstables);
+        for (SSTableReader sstable : sstables)
+        {
+            incrementallyBackup(sstable);
+            notifyAdded(sstable);
+        }
     }
 
     public void removeAllSSTables()
     {
-        replace(getSSTables(), Collections.<SSTableReader>emptyList());
+        List<SSTableReader> sstables = getSSTables();
+        replace(sstables, Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList());
     }
 
     /** (Re)initializes the tracker, purging all references. */
@@ -272,7 +280,6 @@ public class DataTracker
         addNewSSTablesSize(replacements);
         removeOldSSTablesSize(oldSSTables);
 
-        notifySSTablesChanged(replacements, oldSSTables);
         cfstore.updateCacheSizes();
     }
 
@@ -473,7 +480,7 @@ public class DataTracker
         return (double) falseCount / (trueCount + falseCount);
     }
 
-    public void notifySSTablesChanged(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
+    public void notifySSTablesChanged(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added)
     {
         for (INotificationConsumer subscriber : subscribers)
         {

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1209543&r1=1209542&r2=1209543&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Fri Dec  2 15:47:07 2011
@@ -937,7 +937,8 @@ public class CompactionManager implement
                             if (!sstable.newSince(truncatedAt))
                                 truncatedSSTables.add(sstable);
                         }
-                        cfs.markCompacted(truncatedSSTables);
+                        if (!truncatedSSTables.isEmpty())
+                            cfs.markCompacted(truncatedSSTables);
                     }
                 }
                 finally

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1209543&r1=1209542&r2=1209543&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
 Fri Dec  2 15:47:07 2011
@@ -153,9 +153,10 @@ public class LeveledManifest
 
     public synchronized void promote(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added)
     {
+        assert !Iterables.isEmpty(removed); // use add() instead of promote 
when adding new sstables
         logDistribution();
         if (logger.isDebugEnabled())
-            logger.debug((Iterables.isEmpty(added) ? "Removing [" : "Replacing 
[") + toString(removed) + "]");
+            logger.debug("Replacing [" + toString(removed) + "]");
 
         // the level for the added sstables is the max of the removed ones,
         // plus one if the removed were all on the same level


Reply via email to