Validate levels when building LeveledScanner to avoid overlaps with orphaned 
sstables

Patch by Paulo Motta; reviewed by marcuse for CASSANDRA-9935


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc75de69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc75de69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc75de69

Branch: refs/heads/cassandra-2.2
Commit: cc75de6912ca5d72b5465e69b2b571af11bde3d3
Parents: c43cf8d
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Mon Apr 18 22:42:19 2016 -0300
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Apr 26 08:21:05 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../compaction/LeveledCompactionStrategy.java   | 15 ++++-
 .../db/compaction/LeveledManifest.java          | 15 ++++-
 .../LongLeveledCompactionStrategyTest.java      | 62 ++++++++++++++++++++
 4 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f6a4db..d170def 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@
  * cqlsh COPY FROM fails with []{} chars in UDT/tuple fields/values 
(CASSANDRA-11633)
  * clqsh: COPY FROM throws TypeError with Cython extensions enabled 
(CASSANDRA-11574)
  * cqlsh: COPY FROM ignores NULL values in conversion (CASSANDRA-11549)
+ * Validate levels when building LeveledScanner to avoid overlaps with 
orphaned sstables (CASSANDRA-9935)
+
 
 2.1.14
  * (cqlsh) Fix potential COPY deadlock when parent process is terminating child

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 8afe6b6..ad39e04 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -160,10 +160,23 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy
 
     public ScannerList getScanners(Collection<SSTableReader> sstables, 
Range<Token> range)
     {
+        Set<SSTableReader>[] sstablesPerLevel = 
manifest.getSStablesPerLevelSnapshot();
+
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
         {
-            byLevel.get(sstable.getSSTableLevel()).add(sstable);
+            int level = sstable.getSSTableLevel();
+            // if an sstable is not on the manifest, it was recently added or 
removed
+            // so we add it to level -1 and create exclusive scanners for it - 
see below (#9935)
+            if (level >= sstablesPerLevel.length || 
!sstablesPerLevel[level].contains(sstable))
+            {
+                logger.warn("Live sstable {} from level {} is not on 
corresponding level in the leveled manifest." +
+                            " This is not a problem per se, but may indicate 
an orphaned sstable due to a failed" +
+                            " compaction not cleaned up properly.",
+                             sstable.getFilename(), level);
+                level = -1;
+            }
+            byLevel.get(level).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new 
ArrayList<ISSTableScanner>(sstables.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 622d68b..11af6c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
@@ -142,8 +143,6 @@ public class LeveledManifest
         }
     }
 
-
-
     public synchronized void replace(Collection<SSTableReader> removed, 
Collection<SSTableReader> added)
     {
         assert !removed.isEmpty(); // use add() instead of promote when adding 
new sstables
@@ -470,7 +469,7 @@ public class LeveledManifest
     }
 
     @VisibleForTesting
-    public int remove(SSTableReader reader)
+    public synchronized int remove(SSTableReader reader)
     {
         int level = reader.getSSTableLevel();
         assert level >= 0 : reader + " not present in manifest: "+level;
@@ -669,6 +668,16 @@ public class LeveledManifest
         return ageSortedCandidates;
     }
 
+    public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot()
+    {
+        Set<SSTableReader>[] sstablesPerLevel = new Set[generations.length];
+        for (int i = 0; i < generations.length; i++)
+        {
+            sstablesPerLevel[i] = new HashSet<>(generations[i]);
+        }
+        return sstablesPerLevel;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index fa6a31b..b3dfb3c 100644
--- 
a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ 
b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -226,4 +228,64 @@ public class LongLeveledCompactionStrategyTest extends 
SchemaLoader
             }
         }
     }
+
+    @Test
+    public void testLeveledScanner() throws Exception
+    {
+        testParallelLeveledCompaction();
+        String ksname = "Keyspace1";
+        String cfname = "StandardLeveled";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
+        store.disableAutoCompaction();
+
+        WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) 
store.getCompactionStrategy());
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) 
strategy.getWrappedStrategies().get(1);
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[10 * 1024]); // 10 KB value
+
+        // Adds 10 partitions
+        for (int r = 0; r < 10; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            Mutation rm = new Mutation(ksname, key.getKey());
+            for (int c = 0; c < 10; c++)
+            {
+                rm.add(cfname, Util.cellname("column" + c), value, 0);
+            }
+            rm.apply();
+        }
+
+        //Flush sstable
+        store.forceBlockingFlush();
+
+        Collection<SSTableReader> allSSTables = store.getSSTables();
+        for (SSTableReader sstable : allSSTables)
+        {
+            if (sstable.getSSTableLevel() == 0)
+            {
+                System.out.println("Mutating L0-SSTABLE level to L1 to 
simulate a bug: " + sstable.getFilename());
+                
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+                sstable.reloadSSTableMetadata();
+            }
+        }
+
+        try (AbstractCompactionStrategy.ScannerList scannerList = 
lcs.getScanners(allSSTables))
+        {
+            //Verify that leveled scanners will always iterate in ascending 
order (CASSANDRA-9935)
+            for (ISSTableScanner scanner : scannerList.scanners)
+            {
+                DecoratedKey lastKey = null;
+                while (scanner.hasNext())
+                {
+                    OnDiskAtomIterator row = scanner.next();
+                    if (lastKey != null)
+                    {
+                        assertTrue("row " + row.getKey() + " received out of 
order wrt " + lastKey, row.getKey().compareTo(lastKey) >= 0);
+                    }
+                    lastKey = row.getKey();
+                }
+            }
+        }
+    }
 }

Reply via email to