Repository: cassandra Updated Branches: refs/heads/trunk 3a6bcb5ae -> 8e115dac9
Fixes to index summary resampling on old sstable formats Patch by Tyler Hobbs; reviewed by Benedict Elliot Smith for CASSANDRA-8993 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ff25f0d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ff25f0d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ff25f0d Branch: refs/heads/trunk Commit: 7ff25f0df55bf492e741730473b94bcba8ac6c0b Parents: 93156d7 Author: Tyler Hobbs <[email protected]> Authored: Thu Mar 26 17:36:19 2015 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Thu Mar 26 17:36:19 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../cassandra/io/sstable/Downsampling.java | 22 ++--- .../cassandra/io/sstable/IndexSummary.java | 1 + .../io/sstable/IndexSummaryManager.java | 15 ++- .../cassandra/io/sstable/SSTableReader.java | 96 +++++++++++++++++--- .../cassandra/io/sstable/IndexSummaryTest.java | 2 +- 6 files changed, 108 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9bc314d..dba397c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,7 @@ 2.1.4 + * Avoid overwriting index summaries for sstables with an older format that + does not support downsampling; rebuild summaries on startup when this + is detected (CASSANDRA-8993) * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949) * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085) * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/Downsampling.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Downsampling.java b/src/java/org/apache/cassandra/io/sstable/Downsampling.java index 6842b25..8455d0b 100644 --- a/src/java/org/apache/cassandra/io/sstable/Downsampling.java +++ b/src/java/org/apache/cassandra/io/sstable/Downsampling.java @@ -79,8 +79,8 @@ public class Downsampling * Returns a list that can be used to translate current index summary indexes to their original index before * downsampling. (This repeats every `samplingLevel`, so that's how many entries we return.) * - * For example, if [7, 15] is returned, the current index summary entry at index 0 was originally - * at index 7, and the current index 1 was originally at index 15. + * For example, if [0, 64] is returned, the current index summary entry at index 0 was originally + * at index 0, and the current index 1 was originally at index 64. * * @param samplingLevel the current sampling level for the index summary * @@ -115,21 +115,11 @@ public class Downsampling */ public static int getEffectiveIndexIntervalAfterIndex(int index, int samplingLevel, int minIndexInterval) { - assert index >= -1; - List<Integer> originalIndexes = getOriginalIndexes(samplingLevel); - if (index == -1) - return originalIndexes.get(0) * minIndexInterval; - + assert index >= 0; index %= samplingLevel; - if (index == originalIndexes.size() - 1) - { - // account for partitions after the "last" entry as well as partitions before the "first" entry - return ((BASE_SAMPLING_LEVEL - originalIndexes.get(index)) + originalIndexes.get(0)) * minIndexInterval; - } - else - { - return (originalIndexes.get(index + 1) - originalIndexes.get(index)) * minIndexInterval; - } + List<Integer> originalIndexes = getOriginalIndexes(samplingLevel); + int nextEntryOriginalIndex = (index == originalIndexes.size() - 1) ? BASE_SAMPLING_LEVEL : originalIndexes.get(index + 1); + return (nextEntryOriginalIndex - originalIndexes.get(index)) * minIndexInterval; } public static int[] getStartPoints(int currentSamplingLevel, int newSamplingLevel) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index bad50b4..0ea0b48 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -86,6 +86,7 @@ public class IndexSummary extends WrappedSharedCloseable this.offsets = offsets; this.entries = entries; this.samplingLevel = samplingLevel; + assert samplingLevel > 0; } private IndexSummary(IndexSummary copy) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 4144c32..0c196ff 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -259,6 +259,17 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting)) total += sstable.getIndexSummaryOffHeapSize(); + List<SSTableReader> oldFormatSSTables = new ArrayList<>(); + for (SSTableReader sstable : nonCompacting) + { + // We can't change the sampling level of sstables with the old format, because the serialization format + // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.) + logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable); + if (!sstable.descriptor.version.hasSamplingLevel) + oldFormatSSTables.add(sstable); + } + nonCompacting.removeAll(oldFormatSSTables); + logger.debug("Beginning redistribution of index summaries for {} sstables with memory pool size {} MB; current spaced used is {} MB", nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0 / 1024.0); @@ -280,7 +291,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean Collections.sort(sstablesByHotness, new ReadRateComparator(readRates)); long remainingBytes = memoryPoolBytes; - for (SSTableReader sstable : compacting) + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables)) remainingBytes -= sstable.getIndexSummaryOffHeapSize(); logger.trace("Index summaries for compacting SSTables are using {} MB of space", @@ -288,7 +299,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec, remainingBytes); total = 0; - for (SSTableReader sstable : Iterables.concat(compacting, newSSTables)) + for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables)) total += sstable.getIndexSummaryOffHeapSize(); logger.debug("Completed resizing of index summaries; current approximate memory used: {} MB", total / 1024.0 / 1024.0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index f5eef09..8fd7b85 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -26,16 +26,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -708,13 +699,39 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); boolean summaryLoaded = loadSummary(ibuilder, dbuilder); + boolean builtSummary = false; if (recreateBloomFilter || !summaryLoaded) + { buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); + builtSummary = true; + } ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk + + // Check for an index summary that was downsampled even though the serialization format doesn't support + // that. If it was downsampled, rebuild it. See CASSANDRA-8993 for details. + if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel()) + { + indexSummary.close(); + ifile.close(); + dfile.close(); + + logger.info("Detected erroneously downsampled index summary; will rebuild summary at full sampling"); + FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY))); + ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + dbuilder = compression + ? SegmentedFile.getCompressedBuilder() + : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); + ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); + dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + saveSummary(ibuilder, dbuilder); + } + else if (saveSummaryIfCreated && builtSummary) + { saveSummary(ibuilder, dbuilder); + } } /** @@ -800,7 +817,9 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead try { iStream = new DataInputStream(new FileInputStream(summariesFile)); - indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel, metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); + indexSummary = IndexSummary.serializer.deserialize( + iStream, partitioner, descriptor.version.hasSamplingLevel, + metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); @@ -826,6 +845,57 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead } /** + * Validates that an index summary has full sampling, as expected when the serialization format does not support + * persisting the sampling level. + * @return true if the summary has full sampling, false otherwise + */ + private boolean validateSummarySamplingLevel() + { + // We need to check index summary entries against the index to verify that none of them were dropped due to + // downsampling. Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries (repeating that drop pattern + // for the remainder of the summary). Unfortunately, the first entry to be dropped is the entry at + // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL entries. + Iterator<FileDataInput> segments = ifile.iterator(0); + int i = 0; + int summaryEntriesChecked = 0; + int expectedIndexInterval = getMinIndexInterval(); + while (segments.hasNext()) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF()) + { + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + if (i % expectedIndexInterval == 0) + { + ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval)); + if (!summaryKey.equals(indexKey)) + return false; + summaryEntriesChecked++; + + if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL) + return true; + } + RowIndexEntry.Serializer.skip(in); + i++; + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + return true; + } + + /** * Save index summary to Summary.db file. * * @param ibuilder @@ -946,6 +1016,8 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead */ public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException { + assert descriptor.version.hasSamplingLevel; + synchronized (tidy.global) { assert openReason != OpenReason.EARLY; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java index 0760aa3..9ed5b32 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java @@ -257,7 +257,7 @@ public class IndexSummaryTest assertEquals(128, BASE_SAMPLING_LEVEL); assertEquals(Arrays.asList(0, 32, 64, 96), Downsampling.getOriginalIndexes(4)); assertEquals(Arrays.asList(0, 64), Downsampling.getOriginalIndexes(2)); - assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0)); + assertEquals(Arrays.asList(0), Downsampling.getOriginalIndexes(1)); } @Test
