Updated Branches: refs/heads/trunk 7b3349f6e -> 048741868
Save IndexSummary into new SSTable 'Summary' component patch by Vijay and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-2392 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04874186 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04874186 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04874186 Branch: refs/heads/trunk Commit: 04874186892c86a20181a2f64c5dc24285021b2c Parents: 7b3349f Author: Pavel Yaskevich <[email protected]> Authored: Mon Apr 23 16:28:14 2012 -0700 Committer: Pavel Yaskevich <[email protected]> Committed: Mon Apr 23 16:28:14 2012 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/io/sstable/Component.java | 6 +- .../apache/cassandra/io/sstable/IndexSummary.java | 44 ++++ .../org/apache/cassandra/io/sstable/SSTable.java | 4 +- .../apache/cassandra/io/sstable/SSTableReader.java | 154 ++++++++++----- .../apache/cassandra/io/sstable/SSTableWriter.java | 12 +- .../cassandra/io/util/MmappedSegmentedFile.java | 29 +++- .../apache/cassandra/io/util/SegmentedFile.java | 14 ++ 8 files changed, 204 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0ecba36..7538980 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060) * (CLI) jline version is bumped to 1.0 to properly support 'delete' key function (CASSANDRA-4132) + * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392) 1.1.1-dev http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 1f34222..45f2875 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -50,7 +50,9 @@ public class Component // statistical metadata about the content of the sstable STATS("Statistics.db"), // holds sha1 sum of the data file (to be checked by sha1sum) - DIGEST("Digest.sha1"); + DIGEST("Digest.sha1"), + // holds SSTable Index Summary and Boundaries + SUMMARY("Summary.db"); final String repr; Type(String repr) @@ -75,6 +77,7 @@ public class Component public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO, -1); public final static Component STATS = new Component(Type.STATS, -1); public final static Component DIGEST = new Component(Type.DIGEST, -1); + public final static Component SUMMARY = new Component(Type.SUMMARY, -1); public final Type type; public final int id; @@ -122,6 +125,7 @@ public class Component case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; case STATS: component = Component.STATS; break; case DIGEST: component = Component.DIGEST; break; + case SUMMARY: component = Component.SUMMARY; break; default: throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index e36bc90..3cac781 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -17,11 +17,17 @@ */ package org.apache.cassandra.io.sstable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; /** * Two approaches to building an IndexSummary: @@ -30,6 +36,7 @@ import org.apache.cassandra.db.DecoratedKey; */ public class IndexSummary { + public static final IndexSummarySerializer serializer = new IndexSummarySerializer(); private final ArrayList<Long> positions; private final ArrayList<DecoratedKey> keys; private long keysWritten = 0; @@ -44,6 +51,12 @@ public class IndexSummary keys = new ArrayList<DecoratedKey>((int)expectedEntries); } + private IndexSummary() + { + positions = new ArrayList<Long>(); + keys = new ArrayList<DecoratedKey>(); + } + public void incrementRowid() { keysWritten++; @@ -82,4 +95,35 @@ public class IndexSummary keys.trimToSize(); positions.trimToSize(); } + + public static class IndexSummarySerializer + { + public void serialize(IndexSummary t, DataOutput dos) throws IOException + { + assert t.keys.size() == t.positions.size() : "keysize and the position sizes are not same."; + dos.writeInt(DatabaseDescriptor.getIndexInterval()); + dos.writeInt(t.keys.size()); + for (int i = 0; i < t.keys.size(); i++) + { + dos.writeLong(t.positions.get(i)); + ByteBufferUtil.writeWithLength(t.keys.get(i).key, dos); + } + } + + public IndexSummary deserialize(DataInput dis) throws IOException + { + IndexSummary summary = new IndexSummary(); + if (dis.readInt() != DatabaseDescriptor.getIndexInterval()) + throw new IOException("Cannot read the saved summary because Index Interval changed."); + + int size = dis.readInt(); + for (int i = 0; i < size; i++) + { + long location = dis.readLong(); + ByteBuffer key = ByteBufferUtil.readWithLength(dis); + summary.addEntry(StorageService.getPartitioner().decorateKey(key), location); + } + return summary; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index a18a973..a5148eb 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -59,6 +59,7 @@ public abstract class SSTable public static final String COMPONENT_FILTER = Component.Type.FILTER.repr; public static final String COMPONENT_STATS = Component.Type.STATS.repr; public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr; + public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr; public static final String TEMPFILE_MARKER = "tmp"; @@ -135,13 +136,14 @@ public abstract class SSTable FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA)); for (Component component : components) { - if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER)) + if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY)) continue; FileUtils.deleteWithConfirm(desc.filenameFor(component)); } // remove the COMPACTED_MARKER component last if it exists FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER)); + FileUtils.delete(desc.filenameFor(Component.SUMMARY)); logger.debug("Deleted {}", desc); return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 6108f3f..68cee15 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -338,79 +338,125 @@ public class SSTableReader extends SSTable : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); - DecoratedKey left = null, right = null; + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + + // try to load summaries from the disk and check if we need + // to read primary index because we should re-create a BloomFilter or pre-load KeyCache + final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); + final boolean readIndex = recreatebloom || cacheLoading || !summaryLoaded; try { - long indexSize = input.length(); + long indexSize = primaryIndex.length(); long histogramCount = sstableMetadata.estimatedRowSize.count(); long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() ? histogramCount - : estimateRowsFromIndex(input); // statistics is supposed to be optional - indexSummary = new IndexSummary(estimatedKeys); + : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional if (recreatebloom) bf = LegacyBloomFilter.getFilter(estimatedKeys, 15); - while (true) - { - long indexPosition = input.getFilePointer(); - if (indexPosition == indexSize) - break; - - DecoratedKey decoratedKey = null; - int len = ByteBufferUtil.readShortLength(input); - - // when primary index file contains info other than data position, there is noway to determine - // the last key without deserializing index entry - boolean firstKey = left == null; - boolean lastKeyForUnpromoted = indexPosition + DBConstants.SHORT_SIZE + len + DBConstants.LONG_SIZE == indexSize; - boolean shouldAddEntry = indexSummary.shouldAddEntry(); - if (shouldAddEntry || cacheLoading || recreatebloom || firstKey || lastKeyForUnpromoted || descriptor.hasPromotedIndexes) - { - decoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.read(input, len)); - if (firstKey) - left = decoratedKey; - right = decoratedKey; - } - else - { - FileUtils.skipBytesFully(input, len); - } + if (!summaryLoaded) + indexSummary = new IndexSummary(estimatedKeys); - RowIndexEntry indexEntry = null; - if (decoratedKey != null) + long indexPosition; + while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor); + DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key); + if(null == first) + first = decoratedKey; + last = decoratedKey; + + if (recreatebloom) + bf.add(decoratedKey.key); + if (cacheLoading && keysToLoadInCache.contains(decoratedKey)) + cacheKey(decoratedKey, indexEntry); + + // if summary was already read from disk we don't want to re-populate it using primary index + if (!summaryLoaded) { - if (recreatebloom) - bf.add(decoratedKey.key); - if (shouldAddEntry) - indexSummary.addEntry(decoratedKey, indexPosition); - // if key cache could be used and we have key already pre-loaded - if (cacheLoading && keysToLoadInCache.contains(decoratedKey)) - { - indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor); - cacheKey(decoratedKey, indexEntry); - } + indexSummary.maybeAddEntry(decoratedKey, indexPosition); + ibuilder.addPotentialBoundary(indexPosition); + dbuilder.addPotentialBoundary(indexEntry.position); } - if (indexEntry == null) - indexEntry = RowIndexEntry.serializer.deserializePositionOnly(input, descriptor); - - indexSummary.incrementRowid(); - ibuilder.addPotentialBoundary(indexPosition); - dbuilder.addPotentialBoundary(indexEntry.position); } - indexSummary.complete(); } finally { - FileUtils.closeQuietly(input); + FileUtils.closeQuietly(primaryIndex); } - this.first = getMinimalKey(left); - this.last = getMinimalKey(right); - assert this.first.compareTo(this.last) <= 0: String.format("SSTable first key %s > last key %s", this.first, this.last); - + first = getMinimalKey(first); + last = getMinimalKey(last); + // finalize the load. + indexSummary.complete(); // finalize the state of the reader ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (readIndex) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); + } + + public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); + if (!summariesFile.exists()) + return false; + + DataInputStream iStream = null; + try + { + iStream = new DataInputStream(new FileInputStream(summariesFile)); + reader.indexSummary = IndexSummary.serializer.deserialize(iStream); + reader.first = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + reader.last = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + } + catch (IOException e) + { + logger.debug("Cannot deserialize SSTable Summary: ", e); + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + summariesFile.delete(); + + return false; + } + finally + { + FileUtils.closeQuietly(iStream); + } + + return true; + } + + public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); + if (summariesFile.exists()) + summariesFile.delete(); + + DataOutputStream oStream = null; + try + { + oStream = new DataOutputStream(new FileOutputStream(summariesFile)); + IndexSummary.serializer.serialize(reader.indexSummary, oStream); + ByteBufferUtil.writeWithLength(reader.first.key, oStream); + ByteBufferUtil.writeWithLength(reader.last.key, oStream); + ibuilder.serializeBounds(oStream); + dbuilder.serializeBounds(oStream); + } + catch (IOException e) + { + logger.debug("Cannot save SSTable Summary: ", e); + + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + summariesFile.delete(); + } + finally + { + FileUtils.closeQuietly(oStream); + } } /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index e46e407..1a225e4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -61,7 +61,12 @@ public class SSTableWriter extends SSTable private static Set<Component> components(CFMetaData metadata) { - Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)); + Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, + Component.FILTER, + Component.PRIMARY_INDEX, + Component.STATS, + Component.SUMMARY)); + if (metadata.compressionParameters().sstableCompressor != null) components.add(Component.COMPRESSION_INFO); else @@ -303,6 +308,8 @@ public class SSTableWriter extends SSTable sstableMetadata); sstable.first = getMinimalKey(first); sstable.last = getMinimalKey(last); + // try to save the summaries to disk + SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder); iwriter = null; dbuilder = null; return sstable; @@ -342,7 +349,8 @@ public class SSTableWriter extends SSTable try { // do -Data last because -Data present should mean the sstable was completely renamed before crash - for (Component component : Sets.difference(components, Collections.singleton(Component.DATA))) + // don't rename -Summary component as it is not created yet and created when SSTable is loaded. + for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component)); FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 3803963..ae81a08 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.io.util; +import java.io.DataInput; +import java.io.DataOutput; import java.io.File; import java.io.IOError; import java.io.IOException; @@ -147,7 +149,7 @@ public class MmappedSegmentedFile extends SegmentedFile static class Builder extends SegmentedFile.Builder { // planned segment boundaries - private final List<Long> boundaries; + private List<Long> boundaries; // offset of the open segment (first segment begins at 0). private long currentStart = 0; @@ -193,7 +195,8 @@ public class MmappedSegmentedFile extends SegmentedFile { long length = new File(path).length(); // add a sentinel value == length - boundaries.add(Long.valueOf(length)); + if (length != boundaries.get(boundaries.size() - 1)) + boundaries.add(length); // create the segments return new MmappedSegmentedFile(path, length, createSegments(path)); } @@ -226,5 +229,27 @@ public class MmappedSegmentedFile extends SegmentedFile } return segments; } + + @Override + public void serializeBounds(DataOutput dos) throws IOException + { + super.serializeBounds(dos); + dos.writeInt(boundaries.size()); + for (long position: boundaries) + dos.writeLong(position); + } + + @Override + public void deserializeBounds(DataInput dis) throws IOException + { + super.deserializeBounds(dis); + List<Long> temp = new ArrayList<Long>(); + + int size = dis.readInt(); + for (int i = 0; i < size; i++) + temp.add(dis.readLong()); + + boundaries = temp; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index fd8bfcd..03de78b 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.io.util; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOError; import java.io.IOException; import java.nio.MappedByteBuffer; @@ -24,6 +26,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.utils.Pair; /** @@ -106,6 +109,17 @@ public abstract class SegmentedFile * @param path The file on disk. */ public abstract SegmentedFile complete(String path); + + public void serializeBounds(DataOutput dos) throws IOException + { + dos.writeUTF(DatabaseDescriptor.getDiskAccessMode().name()); + } + + public void deserializeBounds(DataInput dis) throws IOException + { + if (!dis.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name())) + throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!"); + } } static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>
