Validate levels when building LeveledScanner to avoid overlaps with orphaned sstables
Patch by Paulo Motta; reviewed by marcuse for CASSANDRA-9935 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc75de69 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc75de69 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc75de69 Branch: refs/heads/cassandra-2.2 Commit: cc75de6912ca5d72b5465e69b2b571af11bde3d3 Parents: c43cf8d Author: Paulo Motta <pauloricard...@gmail.com> Authored: Mon Apr 18 22:42:19 2016 -0300 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Apr 26 08:21:05 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../compaction/LeveledCompactionStrategy.java | 15 ++++- .../db/compaction/LeveledManifest.java | 15 ++++- .../LongLeveledCompactionStrategyTest.java | 62 ++++++++++++++++++++ 4 files changed, 90 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f6a4db..d170def 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,8 @@ * cqlsh COPY FROM fails with []{} chars in UDT/tuple fields/values (CASSANDRA-11633) * clqsh: COPY FROM throws TypeError with Cython extensions enabled (CASSANDRA-11574) * cqlsh: COPY FROM ignores NULL values in conversion (CASSANDRA-11549) + * Validate levels when building LeveledScanner to avoid overlaps with orphaned sstables (CASSANDRA-9935) + 2.1.14 * (cqlsh) Fix potential COPY deadlock when parent process is terminating child http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 8afe6b6..ad39e04 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -160,10 +160,23 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { + Set<SSTableReader>[] sstablesPerLevel = manifest.getSStablesPerLevelSnapshot(); + Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create(); for (SSTableReader sstable : sstables) { - byLevel.get(sstable.getSSTableLevel()).add(sstable); + int level = sstable.getSSTableLevel(); + // if an sstable is not on the manifest, it was recently added or removed + // so we add it to level -1 and create exclusive scanners for it - see below (#9935) + if (level >= sstablesPerLevel.length || !sstablesPerLevel[level].contains(sstable)) + { + logger.warn("Live sstable {} from level {} is not on corresponding level in the leveled manifest." + + " This is not a problem per se, but may indicate an orphaned sstable due to a failed" + + " compaction not cleaned up properly.", + sstable.getFilename(), level); + level = -1; + } + byLevel.get(level).add(sstable); } List<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>(sstables.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 622d68b..11af6c4 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -26,6 +26,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.slf4j.Logger; @@ -142,8 +143,6 @@ public class LeveledManifest } } - - public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added) { assert !removed.isEmpty(); // use add() instead of promote when adding new sstables @@ -470,7 +469,7 @@ public class LeveledManifest } @VisibleForTesting - public int remove(SSTableReader reader) + public synchronized int remove(SSTableReader reader) { int level = reader.getSSTableLevel(); assert level >= 0 : reader + " not present in manifest: "+level; @@ -669,6 +668,16 @@ public class LeveledManifest return ageSortedCandidates; } + public synchronized Set<SSTableReader>[] getSStablesPerLevelSnapshot() + { + Set<SSTableReader>[] sstablesPerLevel = new Set[generations.length]; + for (int i = 0; i < generations.length; i++) + { + sstablesPerLevel[i] = new HashSet<>(generations[i]); + } + return sstablesPerLevel; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc75de69/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index fa6a31b..b3dfb3c 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -34,6 +34,8 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -226,4 +228,64 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader } } } + + @Test + public void testLeveledScanner() throws Exception + { + testParallelLeveledCompaction(); + String ksname = "Keyspace1"; + String cfname = "StandardLeveled"; + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname); + store.disableAutoCompaction(); + + WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) store.getCompactionStrategy()); + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) strategy.getWrappedStrategies().get(1); + + ByteBuffer value = ByteBuffer.wrap(new byte[10 * 1024]); // 10 KB value + + // Adds 10 partitions + for (int r = 0; r < 10; r++) + { + DecoratedKey key = Util.dk(String.valueOf(r)); + Mutation rm = new Mutation(ksname, key.getKey()); + for (int c = 0; c < 10; c++) + { + rm.add(cfname, Util.cellname("column" + c), value, 0); + } + rm.apply(); + } + + //Flush sstable + store.forceBlockingFlush(); + + Collection<SSTableReader> allSSTables = store.getSSTables(); + for (SSTableReader sstable : allSSTables) + { + if (sstable.getSSTableLevel() == 0) + { + System.out.println("Mutating L0-SSTABLE level to L1 to simulate a bug: " + sstable.getFilename()); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1); + sstable.reloadSSTableMetadata(); + } + } + + try (AbstractCompactionStrategy.ScannerList scannerList = lcs.getScanners(allSSTables)) + { + //Verify that leveled scanners will always iterate in ascending order (CASSANDRA-9935) + for (ISSTableScanner scanner : scannerList.scanners) + { + DecoratedKey lastKey = null; + while (scanner.hasNext()) + { + OnDiskAtomIterator row = scanner.next(); + if (lastKey != null) + { + assertTrue("row " + row.getKey() + " received out of order wrt " + lastKey, row.getKey().compareTo(lastKey) >= 0); + } + lastKey = row.getKey(); + } + } + } + } }