Ensure compacted sstables are never used patch by slebresne; reviewed by jbellis for CASSANDRA-4436
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a6b7ec1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a6b7ec1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a6b7ec1 Branch: refs/heads/trunk Commit: 6a6b7ec1f00d8084fb29379111a798c09c35e6d6 Parents: cc0be1b Author: Sylvain Lebresne <[email protected]> Authored: Wed Jul 18 19:54:20 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Jul 26 17:46:19 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 32 ++++++++++-- .../org/apache/cassandra/io/sstable/Component.java | 2 + .../apache/cassandra/io/sstable/Descriptor.java | 10 +++- .../org/apache/cassandra/io/sstable/SSTable.java | 2 + .../cassandra/io/sstable/SSTableMetadata.java | 35 +++++++++---- .../apache/cassandra/io/sstable/SSTableReader.java | 39 +++++---------- 7 files changed, 80 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 169f66d..84db73d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * fix "Can't Modify Index Name" problem on CF update (CASSANDRA-4439) * Fix assertion error in getOverlappingSSTables during repair (CASSANDRA-4456) * fix nodetool's setcompactionthreshold command (CASSANDRA-4455) + * Ensure compacted files are never used, to avoid counter overcount (CASSANDRA-4436) Merged from 1.0: * allow dropping columns shadowed by not-yet-expired supercolumn or row tombstones in PrecompactedRow (CASSANDRA-4396) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b93adc1..a39530a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -234,8 +234,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (loadSSTables) { - Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true); - data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner)); + Directories.SSTableLister sstableFiles = directories.sstableLister().skipCompacted(true).skipTemporary(true); + Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), savedKeys, data, metadata, this.partitioner); + + // Filter non-compacted sstables, remove compacted ones + Set<Integer> compactedSSTables = new HashSet<Integer>(); + for (SSTableReader sstable : sstables) + compactedSSTables.addAll(sstable.getAncestors()); + + Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); + for (SSTableReader sstable : sstables) + { + if (compactedSSTables.contains(sstable.descriptor.generation)) + sstable.releaseReference(); // this amount to deleting the sstable + else + liveSSTables.add(sstable); + } + data.addInitialSSTables(liveSSTables); } // compaction strategy should be created after the CFS has been prepared @@ -492,7 +507,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(), data, metadata, partitioner); + reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(), metadata, partitioner); } catch (IOException e) { @@ -1969,9 +1984,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean ReplayPosition rp = ReplayPosition.getReplayPosition(sstables); SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp); - // get the max timestamp of the precompacted sstables + // Get the max timestamp of the precompacted sstables + // and adds generation of live ancestors for (SSTableReader sstable : sstables) + { sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp()); + sstableMetadataCollector.addAncestor(sstable.descriptor.generation); + for (Integer i : sstable.getAncestors()) + { + if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists()) + sstableMetadataCollector.addAncestor(i); + } + } return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, sstableMetadataCollector); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 4517244..a57f8c0 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -48,6 +48,8 @@ public class Component // serialized bloom filter for the row keys in the sstable FILTER("Filter.db"), // 0-length file that is created when an sstable is ready to be deleted + // @deprecated: deletion of compacted file is based on the lineag information stored in the compacted sstabl + // metadata. This ensure we can guarantee never using a sstable and some of its parents, even in case of failure. COMPACTED_MARKER("Compacted"), // file to hold information about uncompressed data length, chunk offsets etc. COMPRESSION_INFO("CompressionInfo.db"), http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 3c63c8b..07bd6e8 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -58,7 +58,8 @@ public class Descriptor // hb (1.0.3): records compression ration in metadata component // hc (1.0.4): records partitioner in metadata component // hd (1.0.10): includes row tombstones in maxtimestamp - public static final String CURRENT_VERSION = "hd"; + // he (1.0.11): includes row tombstones in maxtimestamp + public static final String CURRENT_VERSION = "he"; public final File directory; /** version has the following format: <code>[a-z]+</code> */ @@ -78,6 +79,7 @@ public class Descriptor public final boolean tracksMaxTimestamp; public final boolean hasCompressionRatio; public final boolean hasPartitioner; + public final boolean hasAncestors; /** * A descriptor that assumes CURRENT_VERSION. @@ -106,9 +108,15 @@ public class Descriptor tracksMaxTimestamp = version.compareTo("hd") >= 0; hasCompressionRatio = version.compareTo("hb") >= 0; hasPartitioner = version.compareTo("hc") >= 0; + hasAncestors = version.compareTo("he") >= 0; isLatestVersion = version.compareTo(CURRENT_VERSION) == 0; } + public Descriptor withGeneration(int newGeneration) + { + return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary); + } + public String filenameFor(Component component) { return filenameFor(component.name()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index aed3c62..9a29066 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -140,6 +140,8 @@ public abstract class SSTable FileUtils.deleteWithConfirm(desc.filenameFor(component)); } // remove the COMPACTED_MARKER component last if it exists + // Note: newly created sstable should not have a marker, but we keep this for now to make sure + // we don't leave older marker around FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER)); logger.debug("Deleted {}", desc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java index bf17c6f..147f2b2 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java @@ -19,12 +19,8 @@ package org.apache.cassandra.io.sstable; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +38,7 @@ import org.apache.cassandra.utils.EstimatedHistogram; * - max column timestamp * - compression ratio * - partitioner + * - generations of sstables from which this sstable was compacted, if any * * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor() * or createDefaultInstance() @@ -58,6 +55,7 @@ public class SSTableMetadata public final long maxTimestamp; public final double compressionRatio; public final String partitioner; + public final Set<Integer> ancestors; private SSTableMetadata() { @@ -66,10 +64,11 @@ public class SSTableMetadata ReplayPosition.NONE, Long.MIN_VALUE, Double.MIN_VALUE, - null); + null, + Collections.<Integer>emptySet()); } - private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr, String partitioner) + private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors) { this.estimatedRowSize = rowSizes; this.estimatedColumnCount = columnCounts; @@ -77,6 +76,7 @@ public class SSTableMetadata this.maxTimestamp = maxTimestamp; this.compressionRatio = cr; this.partitioner = partitioner; + this.ancestors = ancestors; } public static SSTableMetadata createDefaultInstance() @@ -108,6 +108,7 @@ public class SSTableMetadata protected ReplayPosition replayPosition = ReplayPosition.NONE; protected long maxTimestamp = Long.MIN_VALUE; protected double compressionRatio = Double.MIN_VALUE; + protected Set<Integer> ancestors = new HashSet<Integer>(); public void addRowSize(long rowSize) { @@ -140,7 +141,8 @@ public class SSTableMetadata replayPosition, maxTimestamp, compressionRatio, - partitioner); + partitioner, + ancestors); } public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize) @@ -160,6 +162,12 @@ public class SSTableMetadata this.replayPosition = replayPosition; return this; } + + public Collector addAncestor(int generation) + { + this.ancestors.add(generation); + return this; + } } public static class SSTableMetadataSerializer @@ -176,6 +184,9 @@ public class SSTableMetadata dos.writeLong(sstableStats.maxTimestamp); dos.writeDouble(sstableStats.compressionRatio); dos.writeUTF(sstableStats.partitioner); + dos.writeInt(sstableStats.ancestors.size()); + for (Integer g : sstableStats.ancestors) + dos.writeInt(g); } public SSTableMetadata deserialize(Descriptor descriptor) throws IOException @@ -213,7 +224,11 @@ public class SSTableMetadata ? dis.readDouble() : Double.MIN_VALUE; String partitioner = desc.hasPartitioner ? dis.readUTF() : null; - return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner); + int nbAncestors = desc.hasAncestors ? dis.readInt() : 0; + Set<Integer> ancestors = new HashSet<Integer>(nbAncestors); + for (int i = 0; i < nbAncestors; i++) + ancestors.add(dis.readInt()); + return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio, partitioner, ancestors); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a6b7ec1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 0aaa932..21dc71d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -140,23 +140,22 @@ public class SSTableReader extends SSTable public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException { - return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, StorageService.getPartitioner(), false); + return open(descriptor, components, Collections.<DecoratedKey>emptySet(), metadata, StorageService.getPartitioner(), false); } public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, partitioner); + return open(descriptor, components, Collections.<DecoratedKey>emptySet(), metadata, partitioner); } - public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException + public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, savedKeys, tracker, metadata, partitioner, true); + return open(descriptor, components, savedKeys, metadata, partitioner, true); } private static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, - DataTracker tracker, CFMetaData metadata, IPartitioner partitioner, boolean validate) throws IOException @@ -191,8 +190,6 @@ public class SSTableReader extends SSTable null, System.currentTimeMillis(), sstableMetadata); - sstable.setTrackedBy(tracker); - // versions before 'c' encoded keys as utf-16 before hashing to the filter if (descriptor.hasStringsInBloomFilter) { @@ -242,7 +239,7 @@ public class SSTableReader extends SSTable SSTableReader sstable; try { - sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner); + sstable = open(entry.getKey(), entry.getValue(), savedKeys, metadata, partitioner); } catch (IOException ex) { @@ -320,11 +317,8 @@ public class SSTableReader extends SSTable public void setTrackedBy(DataTracker tracker) { - if (tracker != null) - { - keyCache = CacheService.instance.keyCache; - deletingTask.setTracker(tracker); - } + keyCache = CacheService.instance.keyCache; + deletingTask.setTracker(tracker); } void loadBloomFilter() throws IOException @@ -827,19 +821,7 @@ public class SSTableReader extends SSTable if (logger.isDebugEnabled()) logger.debug("Marking " + getFilename() + " compacted"); - if (isCompacted.getAndSet(true)) - return false; - - try - { - if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile()) - throw new IOException("Compaction marker already exists"); - } - catch (IOException e) - { - throw new IOError(e); - } - return true; + return !isCompacted.getAndSet(true); } public void markSuspect() @@ -1015,6 +997,11 @@ public class SSTableReader extends SSTable return sstableMetadata.maxTimestamp; } + public Set<Integer> getAncestors() + { + return sstableMetadata.ancestors; + } + public RandomAccessReader openDataReader(boolean skipIOCache) throws IOException { return compression
