Updated Branches: refs/heads/trunk fab27bd59 -> f388c9d69
SSTable/SSTableReader cleanup patch by yukim; reviewed by jbellis for CASSANDRA-6355 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f388c9d6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f388c9d6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f388c9d6 Branch: refs/heads/trunk Commit: f388c9d69b855f0c3b146864717a971034fd3dc5 Parents: fab27bd Author: Yuki Morishita <[email protected]> Authored: Fri Nov 15 15:23:55 2013 -0600 Committer: Yuki Morishita <[email protected]> Committed: Fri Nov 15 15:23:55 2013 -0600 ---------------------------------------------------------------------- .../cassandra/db/CollationController.java | 5 +- .../apache/cassandra/db/ColumnFamilyStore.java | 4 +- .../db/compaction/CompactionController.java | 3 +- .../cassandra/db/compaction/CompactionTask.java | 6 +- .../compaction/LeveledCompactionStrategy.java | 3 +- .../db/compaction/LeveledManifest.java | 14 ++-- .../cassandra/db/compaction/Upgrader.java | 9 +-- .../apache/cassandra/io/sstable/Component.java | 6 -- .../cassandra/io/sstable/KeyIterator.java | 2 +- .../apache/cassandra/io/sstable/SSTable.java | 55 ++------------- .../cassandra/io/sstable/SSTableMetadata.java | 2 +- .../cassandra/io/sstable/SSTableReader.java | 73 ++++++++++++++------ .../cassandra/io/sstable/SSTableWriter.java | 12 ++-- .../io/util/DataIntegrityMetadata.java | 4 +- .../LongLeveledCompactionStrategyTest.java | 3 +- .../cassandra/db/ColumnFamilyStoreTest.java | 2 +- .../LeveledCompactionStrategyTest.java | 5 +- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- 18 files changed, 88 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 758d523..9896fde 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -27,7 +27,6 @@ import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.CounterColumnType; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.tracing.Tracing; @@ -99,7 +98,7 @@ public class CollationController QueryFilter reducedFilter = new QueryFilter(filter.key, filter.cfName, namesFilter.withUpdatedColumns(filterColumns), filter.timestamp); /* add the SSTables on disk */ - Collections.sort(view.sstables, SSTable.maxTimestampComparator); + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); // read sorted sstables long mostRecentRowTombstone = Long.MIN_VALUE; @@ -219,7 +218,7 @@ public class CollationController * In othere words, iterating in maxTimestamp order allow to do our mostRecentTombstone elimination * in one pass, and minimize the number of sstables for which we read a rowTombstone. */ - Collections.sort(view.sstables, SSTable.maxTimestampComparator); + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); List<SSTableReader> skippedSSTables = null; long mostRecentRowTombstone = Long.MIN_VALUE; long minTimestamp = Long.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 73704d4..1b8a1bf 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -422,7 +422,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Descriptor desc = sstableFiles.getKey(); Set<Component> components = sstableFiles.getValue(); - if (components.contains(Component.COMPACTED_MARKER) || desc.temporary) + if (desc.temporary) { SSTable.delete(desc, components); continue; @@ -1010,7 +1010,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (operation != OperationType.CLEANUP || isIndex()) { - return SSTable.getTotalBytes(sstables); + return SSTableReader.getTotalBytes(sstables); } // cleanup size estimation only counts bytes for keys local to this node http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 201cd0a..dc7730c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.AlwaysPresentFilter; @@ -118,7 +117,7 @@ public class CompactionController // we still need to keep candidates that might shadow something in a // non-candidate sstable. And if we remove a sstable from the candidates, we // must take it's timestamp into account (hence the sorting below). - Collections.sort(candidates, SSTable.maxTimestampComparator); + Collections.sort(candidates, SSTableReader.maxTimestampComparator); Iterator<SSTableReader> iterator = candidates.iterator(); while (iterator.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index bc419ad..f4cc500 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -118,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask long totalkeysWritten = 0; long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize()); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableSize()); long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : {}", keysPerSSTable); @@ -244,8 +244,8 @@ public class CompactionTask extends AbstractCompactionTask // log a bunch of statistics about the result and save to system table compaction_history long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); + long startsize = SSTableReader.getTotalBytes(toCompact); + long endsize = SSTableReader.getTotalBytes(sstables); double ratio = (double) endsize / (double) startsize; StringBuilder builder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index e992003..3eb5e91 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -32,7 +32,6 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -234,7 +233,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem } totalLength = length; - Collections.sort(this.sstables, SSTable.sstableComparator); + Collections.sort(this.sstables, SSTableReader.sstableComparator); sstableIterator = this.sstables.iterator(); assert sstableIterator.hasNext(); // caller should check intersecting first currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 63ad2e4..2b79493 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -151,13 +151,13 @@ public class LeveledManifest minLevel = Math.min(minLevel, ssTableReader.getSSTableLevel()); add(ssTableReader); } - lastCompactedKeys[minLevel] = SSTable.sstableOrdering.max(added).last; + lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last; } public synchronized void repairOverlappingSSTables(int level) { SSTableReader previous = null; - Collections.sort(generations[level], SSTable.sstableComparator); + Collections.sort(generations[level], SSTableReader.sstableComparator); List<SSTableReader> outOfOrderSSTables = new ArrayList<SSTableReader>(); for (SSTableReader current : generations[level]) { @@ -264,7 +264,7 @@ public class LeveledManifest // we want to calculate score excluding compacting ones Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables); Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting()); - double score = (double)SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i); + double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i); logger.debug("Compaction score for level {} is {}", i, score); if (score > 1.001) @@ -454,7 +454,7 @@ public class LeveledManifest } // leave everything in L0 if we didn't end up with a full sstable's worth of data - if (SSTable.getTotalBytes(candidates) > maxSSTableSizeInBytes) + if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes) { // add sstables from L1 that overlap candidates // if the overlapping ones are already busy in a compaction, leave it out. @@ -468,7 +468,7 @@ public class LeveledManifest } // for non-L0 compactions, pick up where we left off last time - Collections.sort(generations[level], SSTable.sstableComparator); + Collections.sort(generations[level], SSTableReader.sstableComparator); int start = 0; // handles case where the prior compaction touched the very last range for (int i = 0; i < generations[level].size(); i++) { @@ -499,7 +499,7 @@ public class LeveledManifest private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates) { List<SSTableReader> ageSortedCandidates = new ArrayList<SSTableReader>(candidates); - Collections.sort(ageSortedCandidates, SSTable.maxTimestampComparator); + Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator); return ageSortedCandidates; } @@ -557,7 +557,7 @@ public class LeveledManifest } int newLevel; - if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTable.getTotalBytes(sstables) < maxSSTableSizeInBytes) + if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTableReader.getTotalBytes(sstables) < maxSSTableSizeInBytes) { newLevel = 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index b98c2ae..383ff00 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -25,13 +25,6 @@ import com.google.common.base.Throwables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; -import org.apache.cassandra.db.compaction.AbstractCompactionIterable; -import org.apache.cassandra.db.compaction.CompactionIterable; -import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.CompactionTask; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.OutputHandler; @@ -63,7 +56,7 @@ public class Upgrader this.strategy = cfs.getCompactionStrategy(); long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize()); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableSize()); this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 599e0ba..4635251 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -43,10 +43,6 @@ public class Component PRIMARY_INDEX("Index.db"), // serialized bloom filter for the row keys in the sstable FILTER("Filter.db"), - // 0-length file that is created when an sstable is ready to be deleted - // @deprecated: deletion of compacted file is based on the lineag information stored in the compacted sstabl - // metadata. This ensure we can guarantee never using a sstable and some of its parents, even in case of failure. - COMPACTED_MARKER("Compacted"), // file to hold information about uncompressed data length, chunk offsets etc. COMPRESSION_INFO("CompressionInfo.db"), // statistical metadata about the content of the sstable @@ -81,7 +77,6 @@ public class Component public final static Component DATA = new Component(Type.DATA); public final static Component PRIMARY_INDEX = new Component(Type.PRIMARY_INDEX); public final static Component FILTER = new Component(Type.FILTER); - public final static Component COMPACTED_MARKER = new Component(Type.COMPACTED_MARKER); public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO); public final static Component STATS = new Component(Type.STATS); public final static Component DIGEST = new Component(Type.DIGEST); @@ -133,7 +128,6 @@ public class Component case DATA: component = Component.DATA; break; case PRIMARY_INDEX: component = Component.PRIMARY_INDEX; break; case FILTER: component = Component.FILTER; break; - case COMPACTED_MARKER: component = Component.COMPACTED_MARKER; break; case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break; case STATS: component = Component.STATS; break; case DIGEST: component = Component.DIGEST; break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index f4f7ee5..0c36f62 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -35,7 +35,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close public KeyIterator(Descriptor desc) { - File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX)); + File path = new File(desc.filenameFor(Component.PRIMARY_INDEX)); in = RandomAccessReader.open(path); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index e469b33..c13c423 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArraySet; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.io.Files; import org.slf4j.Logger; @@ -57,27 +56,10 @@ public abstract class SSTable { static final Logger logger = LoggerFactory.getLogger(SSTable.class); - // TODO: replace with 'Component' objects - public static final String COMPONENT_DATA = Component.Type.DATA.repr; - public static final String COMPONENT_INDEX = Component.Type.PRIMARY_INDEX.repr; - public static final String COMPONENT_FILTER = Component.Type.FILTER.repr; - public static final String COMPONENT_STATS = Component.Type.STATS.repr; - public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr; - public static final String TEMPFILE_MARKER = "tmp"; public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100; - public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - long ts1 = o1.getMaxTimestamp(); - long ts2 = o2.getMaxTimestamp(); - return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1)); - } - }; - public final Descriptor descriptor; protected final Set<Component> components; public final CFMetaData metadata; @@ -101,26 +83,13 @@ public abstract class SSTable assert partitioner != null; this.descriptor = descriptor; - Set<Component> dataComponents = new HashSet<Component>(components); - for (Component component : components) - assert component.type != Component.Type.COMPACTED_MARKER; - + Set<Component> dataComponents = new HashSet<>(components); this.compression = dataComponents.contains(Component.COMPRESSION_INFO); - this.components = new CopyOnWriteArraySet<Component>(dataComponents); + this.components = new CopyOnWriteArraySet<>(dataComponents); this.metadata = metadata; this.partitioner = partitioner; } - public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return o1.first.compareTo(o2.first); - } - }; - - public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); - /** * We use a ReferenceQueue to manage deleting files that have been compacted * and for which no more SSTable references exist. But this is not guaranteed @@ -139,15 +108,11 @@ public abstract class SSTable FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA)); for (Component component : components) { - if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY)) + if (component.equals(Component.DATA) || component.equals(Component.SUMMARY)) continue; FileUtils.deleteWithConfirm(desc.filenameFor(component)); } - // remove the COMPACTED_MARKER component last if it exists - // Note: newly created sstable should not have a marker, but we keep this for now to make sure - // we don't leave older marker around - FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER)); FileUtils.delete(desc.filenameFor(Component.SUMMARY)); logger.debug("Deleted {}", desc); @@ -167,12 +132,12 @@ public abstract class SSTable public String getFilename() { - return descriptor.filenameFor(COMPONENT_DATA); + return descriptor.filenameFor(Component.DATA); } public String getIndexFilename() { - return descriptor.filenameFor(COMPONENT_INDEX); + return descriptor.filenameFor(Component.PRIMARY_INDEX); } public String getColumnFamilyName() @@ -262,16 +227,6 @@ public abstract class SSTable return estimatedRows; } - public static long getTotalBytes(Iterable<SSTableReader> sstables) - { - long sum = 0; - for (SSTableReader sstable : sstables) - { - sum += sstable.onDiskLength(); - } - return sum; - } - public long bytesOnDisk() { long bytes = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java index 140e08b..8ddfdd7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java @@ -435,7 +435,7 @@ public class SSTableMetadata public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException { logger.debug("Load metadata for {}", descriptor); - File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS)); + File statsFile = new File(descriptor.filenameFor(Component.STATS)); if (!statsFile.exists()) { logger.debug("No sstable stats for {}", descriptor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index c5b61d9..7fd9ca6 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; @@ -67,8 +68,28 @@ public class SSTableReader extends SSTable implements Closeable private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); + public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + long ts1 = o1.getMaxTimestamp(); + long ts2 = o2.getMaxTimestamp(); + return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1)); + } + }; + + public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return o1.first.compareTo(o2.first); + } + }; + + public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); + /** - * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound + * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created * later than maxDataAge. * @@ -168,7 +189,7 @@ public class SSTableReader extends SSTable implements Closeable SegmentedFile.Builder dbuilder = sstable.compression ? new CompressedSegmentedFile.Builder() : new BufferedSegmentedFile.Builder(); - if (!loadSummary(sstable, ibuilder, dbuilder, sstable.metadata)) + if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata)) sstable.buildSummary(false, ibuilder, dbuilder, false); sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA)); @@ -218,7 +239,7 @@ public class SSTableReader extends SSTable implements Closeable assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(COMPONENT_DATA)).length()); + logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left; @@ -247,7 +268,7 @@ public class SSTableReader extends SSTable implements Closeable final CFMetaData metadata, final IPartitioner partitioner) { - final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); + final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>(); ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors()); for (final Map.Entry<Descriptor, Set<Component>> entry : entries) @@ -365,6 +386,16 @@ public class SSTableReader extends SSTable implements Closeable this.bf = bloomFilter; } + public static long getTotalBytes(Iterable<SSTableReader> sstables) + { + long sum = 0; + for (SSTableReader sstable : sstables) + { + sum += sstable.onDiskLength(); + } + return sum; + } + /** * Clean up all opened resources. * @@ -441,18 +472,18 @@ public class SSTableReader extends SSTable implements Closeable ? SegmentedFile.getCompressedBuilder() : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata); + boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata); if (recreateBloomFilter || !summaryLoaded) buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded); ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk - saveSummary(this, ibuilder, dbuilder); + saveSummary(ibuilder, dbuilder); } - private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException - { + private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException + { // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); @@ -505,27 +536,27 @@ public class SSTableReader extends SSTable implements Closeable last = getMinimalKey(last); } - public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata) + public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata) { - File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); - if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists()) + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); + if (!descriptor.version.offHeapSummaries || !summariesFile.exists()) return false; DataInputStream iStream = null; try { iStream = new DataInputStream(new FileInputStream(summariesFile)); - reader.indexSummary = IndexSummary.serializer.deserialize(iStream, reader.partitioner); - if (reader.indexSummary.getIndexInterval() != metadata.getIndexInterval()) + indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner); + if (indexSummary.getIndexInterval() != metadata.getIndexInterval()) { iStream.close(); logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.", - reader.toString(), reader.indexSummary.getIndexInterval(), metadata.getIndexInterval()); + toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval()); FileUtils.deleteWithConfirm(summariesFile); return false; } - reader.first = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); - reader.last = reader.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); dbuilder.deserializeBounds(iStream); } @@ -544,9 +575,9 @@ public class SSTableReader extends SSTable implements Closeable return true; } - public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) { - File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); + File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); if (summariesFile.exists()) summariesFile.delete(); @@ -554,9 +585,9 @@ public class SSTableReader extends SSTable implements Closeable try { oStream = new DataOutputStream(new FileOutputStream(summariesFile)); - IndexSummary.serializer.serialize(reader.indexSummary, oStream); - ByteBufferUtil.writeWithLength(reader.first.key, oStream); - ByteBufferUtil.writeWithLength(reader.last.key, oStream); + IndexSummary.serializer.serialize(indexSummary, oStream); + ByteBufferUtil.writeWithLength(first.key, oStream); + ByteBufferUtil.writeWithLength(last.key, oStream); ibuilder.serializeBounds(oStream); dbuilder.serializeBounds(oStream); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 70c0b42..b5d50cf 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -313,8 +313,8 @@ public class SSTableWriter extends SSTable SSTableMetadata sstableMetadata = p.right; // finalize in-memory state for the reader - SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX)); - SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA)); + SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX)); + SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA)); SSTableReader sstable = SSTableReader.internalOpen(newdesc, components, metadata, @@ -328,7 +328,7 @@ public class SSTableWriter extends SSTable sstable.first = getMinimalKey(first); sstable.last = getMinimalKey(last); // try to save the summaries to disk - SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder); + sstable.saveSummary(iwriter.builder, dbuilder); iwriter = null; dbuilder = null; return sstable; @@ -355,7 +355,7 @@ public class SSTableWriter extends SSTable private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata, Set<Integer> ancestors) { - SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), true); + SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true); try { SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream); @@ -411,7 +411,7 @@ public class SSTableWriter extends SSTable IndexWriter(long keyCount) { - indexFile = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_INDEX)), + indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), !metadata.populateIoCacheOnFlush()); builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); summary = new IndexSummaryBuilder(keyCount, metadata.getIndexInterval()); @@ -446,7 +446,7 @@ public class SSTableWriter extends SSTable { if (components.contains(Component.FILTER)) { - String path = descriptor.filenameFor(SSTable.COMPONENT_FILTER); + String path = descriptor.filenameFor(Component.FILTER); try { // bloom filter http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index f334d08..0606941 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -145,10 +145,10 @@ public class DataIntegrityMetadata byte[] bytes = digest.digest(); if (bytes == null) return; - SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(SSTable.COMPONENT_DIGEST)), true); + SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(Component.DIGEST)), true); // Writting output compatible with sha1sum Descriptor newdesc = descriptor.asTemporary(false); - String[] tmp = newdesc.filenameFor(SSTable.COMPONENT_DATA).split(Pattern.quote(File.separator)); + String[] tmp = newdesc.filenameFor(Component.DATA).split(Pattern.quote(File.separator)); String dataFileName = tmp[tmp.length - 1]; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index c6b6eb0..0eb44d0 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -31,7 +31,6 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -106,7 +105,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader { List<SSTableReader> sstables = manifest.getLevel(level); // score check - assert (double) SSTable.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00; + assert (double) SSTableReader.getTotalBytes(sstables) / manifest.maxBytesForLevel(level) < 1.00; // overlap check for levels greater than 0 if (level > 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 32bc7df..7f7d5c9 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -938,7 +938,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader cfs.clearUnsafe(); assertEquals(0, cfs.getSSTables().size()); - new File(ssTables.iterator().next().descriptor.filenameFor(SSTable.COMPONENT_STATS)).delete(); + new File(ssTables.iterator().next().descriptor.filenameFor(Component.STATS)).delete(); cfs.loadNewSSTables(); // Add another column with a lower timestamp http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index b60f6d9..a008de1 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -20,9 +20,7 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.UUID; @@ -39,7 +37,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; @@ -145,7 +142,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader scanner.next(); // scanner.getCurrentPosition should be equal to total bytes of L1 sstables - assert scanner.getCurrentPosition() == SSTable.getTotalBytes(sstables); + assert scanner.getCurrentPosition() == SSTableReader.getTotalBytes(sstables); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/f388c9d6/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 36d8fbe..b771e72 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -278,7 +278,7 @@ public class SSTableReaderTest extends SchemaLoader SegmentedFile.Builder dbuilder = sstable.compression ? SegmentedFile.getCompressedBuilder() : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - SSTableReader.saveSummary(sstable, ibuilder, dbuilder); + sstable.saveSummary(ibuilder, dbuilder); SSTableReader reopened = SSTableReader.open(sstable.descriptor); assert reopened.first.token instanceof LocalToken;
