http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java deleted file mode 100644 index 1bc7a24..0000000 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ /dev/null @@ -1,2055 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.sstable; - -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; -import com.google.common.collect.Ordering; -import com.google.common.primitives.Longs; -import com.google.common.util.concurrent.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; -import com.clearspring.analytics.stream.cardinality.ICardinality; -import org.apache.cassandra.cache.CachingOptions; -import org.apache.cassandra.cache.InstrumentingCache; -import org.apache.cassandra.cache.KeyCacheKey; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DataTracker; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.compaction.ICompactionScanner; -import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.LocalPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.compress.CompressedRandomAccessReader; -import org.apache.cassandra.io.compress.CompressedThrottledReader; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; -import org.apache.cassandra.io.sstable.metadata.MetadataComponent; -import org.apache.cassandra.io.sstable.metadata.MetadataType; -import org.apache.cassandra.io.sstable.metadata.StatsMetadata; -import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; -import org.apache.cassandra.io.util.BufferedSegmentedFile; -import org.apache.cassandra.io.util.CompressedSegmentedFile; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.ICompressedFile; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SegmentedFile; -import org.apache.cassandra.io.util.ThrottledReader; -import org.apache.cassandra.metrics.RestorableMeter; -import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.EstimatedHistogram; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.FilterFactory; -import org.apache.cassandra.utils.IFilter; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.OpOrder; - -import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; - -/** - * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. - * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. - */ -public class SSTableReader extends SSTable -{ - private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); - - private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); - private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); - - public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - long ts1 = o1.getMaxTimestamp(); - long ts2 = o2.getMaxTimestamp(); - return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1)); - } - }; - - public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return o1.first.compareTo(o2.first); - } - }; - - public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); - - /** - * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound - * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created - * later than maxDataAge. - * - * The field is not serialized to disk, so relying on it for more than what truncate does is not advised. - * - * When a new sstable is flushed, maxDataAge is set to the time of creation. - * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables. - * - * The age is in milliseconds since epoc and is local to this host. - */ - public final long maxDataAge; - - public enum OpenReason - { - NORMAL, - EARLY, - METADATA_CHANGE - } - - public final OpenReason openReason; - - // indexfile and datafile: might be null before a call to load() - private SegmentedFile ifile; - private SegmentedFile dfile; - - private IndexSummary indexSummary; - private IFilter bf; - - private InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache; - - private final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker(); - - private final AtomicInteger references = new AtomicInteger(1); - // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted, - // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone - private final AtomicBoolean isCompacted = new AtomicBoolean(false); - private final AtomicBoolean isSuspect = new AtomicBoolean(false); - - // not final since we need to be able to change level on a file. - private volatile StatsMetadata sstableMetadata; - - private final AtomicLong keyCacheHit = new AtomicLong(0); - private final AtomicLong keyCacheRequest = new AtomicLong(0); - - /** - * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources, - * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple. - * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed. - * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources. - */ - private Object replaceLock = new Object(); - private SSTableReader replacedBy; - private SSTableReader replaces; - private SSTableDeletingTask deletingTask; - private Runnable runOnClose; - - @VisibleForTesting - public RestorableMeter readMeter; - private ScheduledFuture readMeterSyncFuture; - - /** - * Calculate approximate key count. - * If cardinality estimator is available on all given sstables, then this method use them to estimate - * key count. - * If not, then this uses index summaries. - * - * @param sstables SSTables to calculate key count - * @return estimated key count - */ - public static long getApproximateKeyCount(Collection<SSTableReader> sstables) - { - long count = -1; - - // check if cardinality estimator is available for all SSTables - boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>() - { - public boolean apply(SSTableReader sstable) - { - return sstable.descriptor.version.newStatsFile; - } - }); - - // if it is, load them to estimate key count - if (cardinalityAvailable) - { - boolean failed = false; - ICardinality cardinality = null; - for (SSTableReader sstable : sstables) - { - try - { - CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION); - if (cardinality == null) - cardinality = metadata.cardinalityEstimator; - else - cardinality = cardinality.merge(metadata.cardinalityEstimator); - } - catch (IOException e) - { - logger.warn("Reading cardinality from Statistics.db failed.", e); - failed = true; - break; - } - catch (CardinalityMergeException e) - { - logger.warn("Cardinality merge failed.", e); - failed = true; - break; - } - } - if (cardinality != null && !failed) - count = cardinality.cardinality(); - } - - // if something went wrong above or cardinality is not available, calculate using index summary - if (count < 0) - { - for (SSTableReader sstable : sstables) - count += sstable.estimatedKeys(); - } - return count; - } - - public static SSTableReader open(Descriptor descriptor) throws IOException - { - CFMetaData metadata; - if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)) - { - int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR); - String parentName = descriptor.cfname.substring(0, i); - CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName); - ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1)); - metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def)); - } - else - { - metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname); - } - return open(descriptor, metadata); - } - - public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException - { - IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR) - ? new LocalPartitioner(metadata.getKeyValidator()) - : StorageService.getPartitioner(); - return open(desc, componentsFor(desc), metadata, p); - } - - public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException - { - return open(descriptor, components, metadata, partitioner, true); - } - - public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException - { - return open(descriptor, components, metadata, StorageService.getPartitioner(), false); - } - - /** - * Open SSTable reader to be used in batch mode(such as sstableloader). - * - * @param descriptor - * @param components - * @param metadata - * @param partitioner - * @return opened SSTableReader - * @throws IOException - */ - public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException - { - // Minimum components without which we can't do anything - assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - - Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, - EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); - ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); - StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); - - // Check if sstable is created using same partitioner. - // Partitioner can be null, which indicates older version of sstable or no stats available. - // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); - if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) - { - logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, validationMetadata.partitioner, partitionerName)); - System.exit(1); - } - - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = new SSTableReader(descriptor, - components, - metadata, - partitioner, - System.currentTimeMillis(), - statsMetadata, - OpenReason.NORMAL); - - // special implementation of load to use non-pooled SegmentedFile builders - SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); - SegmentedFile.Builder dbuilder = sstable.compression - ? new CompressedSegmentedFile.Builder(null) - : new BufferedSegmentedFile.Builder(); - if (!sstable.loadSummary(ibuilder, dbuilder)) - sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL); - sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)); - sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA)); - sstable.bf = FilterFactory.AlwaysPresent; - - return sstable; - } - - private static SSTableReader open(Descriptor descriptor, - Set<Component> components, - CFMetaData metadata, - IPartitioner partitioner, - boolean validate) throws IOException - { - // Minimum components without which we can't do anything - assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor; - assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - - Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, - EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); - ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); - StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); - - // Check if sstable is created using same partitioner. - // Partitioner can be null, which indicates older version of sstable or no stats available. - // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); - if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) - { - logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", - descriptor, validationMetadata.partitioner, partitionerName)); - System.exit(1); - } - - logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = new SSTableReader(descriptor, - components, - metadata, - partitioner, - System.currentTimeMillis(), - statsMetadata, - OpenReason.NORMAL); - - // load index and filter - long start = System.nanoTime(); - sstable.load(validationMetadata); - logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - - if (validate) - sstable.validate(); - - if (sstable.getKeyCache() != null) - logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity()); - - return sstable; - } - - public static void logOpenException(Descriptor descriptor, IOException e) - { - if (e instanceof FileNotFoundException) - logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage()); - else - logger.error("Corrupt sstable {}; skipped", descriptor, e); - } - - public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final CFMetaData metadata, - final IPartitioner partitioner) - { - final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>(); - - ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors()); - for (final Map.Entry<Descriptor, Set<Component>> entry : entries) - { - Runnable runnable = new Runnable() - { - public void run() - { - SSTableReader sstable; - try - { - sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner); - } - catch (IOException ex) - { - logger.error("Corrupt sstable {}; skipped", entry, ex); - return; - } - sstables.add(sstable); - } - }; - executor.submit(runnable); - } - - executor.shutdown(); - try - { - executor.awaitTermination(7, TimeUnit.DAYS); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - return sstables; - - } - - /** - * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). - */ - static SSTableReader internalOpen(Descriptor desc, - Set<Component> components, - CFMetaData metadata, - IPartitioner partitioner, - SegmentedFile ifile, - SegmentedFile dfile, - IndexSummary isummary, - IFilter bf, - long maxDataAge, - StatsMetadata sstableMetadata, - OpenReason openReason) - { - assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; - return new SSTableReader(desc, - components, - metadata, - partitioner, - ifile, dfile, - isummary, - bf, - maxDataAge, - sstableMetadata, - openReason); - } - - - private SSTableReader(final Descriptor desc, - Set<Component> components, - CFMetaData metadata, - IPartitioner partitioner, - long maxDataAge, - StatsMetadata sstableMetadata, - OpenReason openReason) - { - super(desc, components, metadata, partitioner); - this.sstableMetadata = sstableMetadata; - this.maxDataAge = maxDataAge; - this.openReason = openReason; - - deletingTask = new SSTableDeletingTask(this); - - // Don't track read rates for tables in the system keyspace. Also don't track reads for special operations (like early open) - // this is to avoid overflowing the executor queue (see CASSANDRA-8066) - if (Keyspace.SYSTEM_KS.equals(desc.ksname) || openReason != OpenReason.NORMAL) - { - readMeter = null; - readMeterSyncFuture = null; - return; - } - - readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); - // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now - readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable() - { - public void run() - { - if (!isCompacted.get()) - { - meterSyncThrottle.acquire(); - SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter); - } - } - }, 1, 5, TimeUnit.MINUTES); - } - - private SSTableReader(Descriptor desc, - Set<Component> components, - CFMetaData metadata, - IPartitioner partitioner, - SegmentedFile ifile, - SegmentedFile dfile, - IndexSummary indexSummary, - IFilter bloomFilter, - long maxDataAge, - StatsMetadata sstableMetadata, - OpenReason openReason) - { - this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); - - this.ifile = ifile; - this.dfile = dfile; - this.indexSummary = indexSummary; - this.bf = bloomFilter; - } - - public static long getTotalBytes(Iterable<SSTableReader> sstables) - { - long sum = 0; - for (SSTableReader sstable : sstables) - { - sum += sstable.onDiskLength(); - } - return sum; - } - - private void tidy(boolean release) - { - if (readMeterSyncFuture != null) - readMeterSyncFuture.cancel(false); - - if (references.get() != 0) - { - throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)"); - } - - synchronized (replaceLock) - { - boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false; - - if (replacedBy != null) - { - closeBf = replacedBy.bf != bf; - closeSummary = replacedBy.indexSummary != indexSummary; - closeFiles = replacedBy.dfile != dfile; - // if the replacement sstablereader uses a different path, clean up our paths - deleteFiles = !dfile.path.equals(replacedBy.dfile.path); - } - - if (replaces != null) - { - closeBf &= replaces.bf != bf; - closeSummary &= replaces.indexSummary != indexSummary; - closeFiles &= replaces.dfile != dfile; - deleteFiles &= !dfile.path.equals(replaces.dfile.path); - } - - boolean deleteAll = false; - if (release && isCompacted.get()) - { - assert replacedBy == null; - if (replaces != null) - { - replaces.replacedBy = null; - replaces.deletingTask = deletingTask; - replaces.markObsolete(); - } - else - { - deleteAll = true; - } - } - else - { - if (replaces != null) - replaces.replacedBy = replacedBy; - if (replacedBy != null) - replacedBy.replaces = replaces; - } - - scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll); - } - } - - private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll) - { - if (references.get() != 0) - throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)"); - - final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId); - final OpOrder.Barrier barrier; - if (cfs != null) - { - barrier = cfs.readOrdering.newBarrier(); - barrier.issue(); - } - else - barrier = null; - - StorageService.tasks.execute(new Runnable() - { - public void run() - { - if (barrier != null) - barrier.await(); - if (closeBf) - bf.close(); - if (closeSummary) - indexSummary.close(); - if (closeFiles) - { - ifile.cleanup(); - dfile.cleanup(); - } - if (runOnClose != null) - runOnClose.run(); - if (deleteAll) - { - /** - * Do the OS a favour and suggest (using fadvice call) that we - * don't want to see pages of this SSTable in memory anymore. - * - * NOTE: We can't use madvice in java because it requires the address of - * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it - */ - dropPageCache(); - deletingTask.run(); - } - else if (deleteFiles) - { - FileUtils.deleteWithConfirm(new File(dfile.path)); - FileUtils.deleteWithConfirm(new File(ifile.path)); - } - } - }); - } - - public boolean equals(Object that) - { - return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor); - } - - public int hashCode() - { - return this.descriptor.hashCode(); - } - - public String getFilename() - { - return dfile.path; - } - - public String getIndexFilename() - { - return ifile.path; - } - - public void setTrackedBy(DataTracker tracker) - { - deletingTask.setTracker(tracker); - // under normal operation we can do this at any time, but SSTR is also used outside C* proper, - // e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache - // here when we know we're being wired into the rest of the server infrastructure. - keyCache = CacheService.instance.keyCache; - } - - private void load(ValidationMetadata validation) throws IOException - { - if (metadata.getBloomFilterFpChance() == 1.0) - { - // bf is disabled. - load(false, true); - bf = FilterFactory.AlwaysPresent; - } - else if (!components.contains(Component.FILTER) || validation == null) - { - // bf is enabled, but filter component is missing. - load(true, true); - } - else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance()) - { - // bf fp chance in sstable metadata and it has changed since compaction. - load(true, true); - } - else - { - // bf is enabled and fp chance matches the currently configured value. - load(false, true); - loadBloomFilter(); - } - } - - /** - * Load bloom filter from Filter.db file. - * - * @throws IOException - */ - private void loadBloomFilter() throws IOException - { - DataInputStream stream = null; - try - { - stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))); - bf = FilterFactory.deserialize(stream, true); - } - finally - { - FileUtils.closeQuietly(stream); - } - } - - /** - * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter. - * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can - * avoid persisting it to disk by setting this to false - */ - private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException - { - SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - - boolean summaryLoaded = loadSummary(ibuilder, dbuilder); - if (recreateBloomFilter || !summaryLoaded) - buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL); - - ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); - dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); - if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk - saveSummary(ibuilder, dbuilder); - } - - /** - * Build index summary(and optionally bloom filter) by reading through Index.db file. - * - * @param recreateBloomFilter true if recreate bloom filter - * @param ibuilder - * @param dbuilder - * @param summaryLoaded true if index summary is already loaded and not need to build again - * @throws IOException - */ - private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException - { - // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - - try - { - long indexSize = primaryIndex.length(); - long histogramCount = sstableMetadata.estimatedRowSize.count(); - long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed() - ? histogramCount - : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional - - if (recreateBloomFilter) - bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true); - - IndexSummaryBuilder summaryBuilder = null; - if (!summaryLoaded) - summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel); - - long indexPosition; - while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) - { - ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); - RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version); - DecoratedKey decoratedKey = partitioner.decorateKey(key); - if (first == null) - first = decoratedKey; - last = decoratedKey; - - if (recreateBloomFilter) - bf.add(decoratedKey.getKey()); - - // if summary was already read from disk we don't want to re-populate it using primary index - if (!summaryLoaded) - { - summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); - ibuilder.addPotentialBoundary(indexPosition); - dbuilder.addPotentialBoundary(indexEntry.position); - } - } - - if (!summaryLoaded) - indexSummary = summaryBuilder.build(partitioner); - } - finally - { - FileUtils.closeQuietly(primaryIndex); - } - - first = getMinimalKey(first); - last = getMinimalKey(last); - } - - /** - * Load index summary from Summary.db file if it exists. - * - * if loaded index summary has different index interval from current value stored in schema, - * then Summary.db file will be deleted and this returns false to rebuild summary. - * - * @param ibuilder - * @param dbuilder - * @return true if index summary is loaded successfully from Summary.db file. - */ - public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) - { - File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); - if (!summariesFile.exists()) - return false; - - DataInputStream iStream = null; - try - { - iStream = new DataInputStream(new FileInputStream(summariesFile)); - indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel, metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); - first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); - last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); - ibuilder.deserializeBounds(iStream); - dbuilder.deserializeBounds(iStream); - } - catch (IOException e) - { - logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage()); - // corrupted; delete it and fall back to creating a new summary - FileUtils.closeQuietly(iStream); - // delete it and fall back to creating a new summary - FileUtils.deleteWithConfirm(summariesFile); - return false; - } - finally - { - FileUtils.closeQuietly(iStream); - } - - return true; - } - - /** - * Save index summary to Summary.db file. - * - * @param ibuilder - * @param dbuilder - */ - public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) - { - saveSummary(ibuilder, dbuilder, indexSummary); - } - - private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary) - { - File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY)); - if (summariesFile.exists()) - FileUtils.deleteWithConfirm(summariesFile); - - DataOutputStreamAndChannel oStream = null; - try - { - oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile)); - IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel); - ByteBufferUtil.writeWithLength(first.getKey(), oStream); - ByteBufferUtil.writeWithLength(last.getKey(), oStream); - ibuilder.serializeBounds(oStream); - dbuilder.serializeBounds(oStream); - } - catch (IOException e) - { - logger.debug("Cannot save SSTable Summary: ", e); - - // corrupted hence delete it and let it load it now. - if (summariesFile.exists()) - FileUtils.deleteWithConfirm(summariesFile); - } - finally - { - FileUtils.closeQuietly(oStream); - } - } - - public void setReplacedBy(SSTableReader replacement) - { - synchronized (replaceLock) - { - assert replacedBy == null; - replacedBy = replacement; - replacement.replaces = this; - replacement.replaceLock = replaceLock; - } - } - - public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose) - { - synchronized (replaceLock) - { - assert replacedBy == null; - - if (newStart.compareTo(this.first) > 0) - { - if (newStart.compareTo(this.last) > 0) - { - this.runOnClose = new Runnable() - { - public void run() - { - CLibrary.trySkipCache(dfile.path, 0, 0); - CLibrary.trySkipCache(ifile.path, 0, 0); - runOnClose.run(); - } - }; - } - else - { - final long dataStart = getPosition(newStart, Operator.GE).position; - final long indexStart = getIndexScanPosition(newStart); - this.runOnClose = new Runnable() - { - public void run() - { - CLibrary.trySkipCache(dfile.path, 0, dataStart); - CLibrary.trySkipCache(ifile.path, 0, indexStart); - runOnClose.run(); - } - }; - } - } - - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata, - openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE); - replacement.readMeterSyncFuture = this.readMeterSyncFuture; - replacement.readMeter = this.readMeter; - replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last; - replacement.last = this.last; - setReplacedBy(replacement); - return replacement; - } - } - - /** - * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will - * be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have - * its DeletingTask removed, and have its periodic read-meter sync task cancelled. - * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader - * @return a new SSTableReader - * @throws IOException - */ - public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException - { - synchronized (replaceLock) - { - assert replacedBy == null; - - int minIndexInterval = metadata.getMinIndexInterval(); - int maxIndexInterval = metadata.getMaxIndexInterval(); - double effectiveInterval = indexSummary.getEffectiveIndexInterval(); - - IndexSummary newSummary; - long oldSize = bytesOnDisk(); - - // We have to rebuild the summary from the on-disk primary index in three cases: - // 1. The sampling level went up, so we need to read more entries off disk - // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary - // at full sampling (and consequently at any other sampling level) - // 3. The max_index_interval was lowered, forcing us to raise the sampling level - if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval) - { - newSummary = buildSummaryAtLevel(samplingLevel); - } - else if (samplingLevel < indexSummary.getSamplingLevel()) - { - // we can use the existing index summary to make a smaller one - newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); - - SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); - SegmentedFile.Builder dbuilder = compression - ? SegmentedFile.getCompressedBuilder() - : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - saveSummary(ibuilder, dbuilder, newSummary); - } - else - { - throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " + - "no adjustments to min/max_index_interval"); - } - - long newSize = bytesOnDisk(); - StorageMetrics.load.inc(newSize - oldSize); - parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize); - - SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata, - openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE); - replacement.readMeterSyncFuture = this.readMeterSyncFuture; - replacement.readMeter = this.readMeter; - replacement.first = this.first; - replacement.last = this.last; - setReplacedBy(replacement); - return replacement; - } - } - - private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException - { - // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); - try - { - long indexSize = primaryIndex.length(); - IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel); - - long indexPosition; - while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) - { - summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); - RowIndexEntry.Serializer.skip(primaryIndex); - } - - return summaryBuilder.build(partitioner); - } - finally - { - FileUtils.closeQuietly(primaryIndex); - } - } - - public int getIndexSummarySamplingLevel() - { - return indexSummary.getSamplingLevel(); - } - - public long getIndexSummaryOffHeapSize() - { - return indexSummary.getOffHeapSize(); - } - - public int getMinIndexInterval() - { - return indexSummary.getMinIndexInterval(); - } - - public double getEffectiveIndexInterval() - { - return indexSummary.getEffectiveIndexInterval(); - } - - public void releaseSummary() - { - indexSummary.close(); - indexSummary = null; - } - - private void validate() - { - if (this.first.compareTo(this.last) > 0) - throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last)); - } - - /** - * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away, - * modulo downsampling of the index summary). - */ - public long getIndexScanPosition(RowPosition key) - { - return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary); - } - - private static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary) - { - if (binarySearchResult == -1) - return -1; - else - return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult)); - } - - private static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult) - { - if (binarySearchResult < 0) - { - // binary search gives us the first index _greater_ than the key searched for, - // i.e., its insertion position - int greaterThan = (binarySearchResult + 1) * -1; - if (greaterThan == 0) - return -1; - return greaterThan - 1; - } - else - { - return binarySearchResult; - } - } - - /** - * Returns the compression metadata for this sstable. - * @throws IllegalStateException if the sstable is not compressed - */ - public CompressionMetadata getCompressionMetadata() - { - if (!compression) - throw new IllegalStateException(this + " is not compressed"); - - CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata(); - - //We need the parent cf metadata - String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; - cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName)); - - return cmd; - } - - /** - * For testing purposes only. - */ - public void forceFilterFailures() - { - bf = FilterFactory.AlwaysPresent; - } - - public IFilter getBloomFilter() - { - return bf; - } - - public long getBloomFilterSerializedSize() - { - return bf.serializedSize(); - } - - /** - * @return An estimate of the number of keys in this SSTable based on the index summary. - */ - public long estimatedKeys() - { - return indexSummary.getEstimatedKeyCount(); - } - - /** - * @param ranges - * @return An estimate of the number of keys for given ranges in this SSTable. - */ - public long estimatedKeysForRanges(Collection<Range<Token>> ranges) - { - long sampleKeyCount = 0; - List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges); - for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes) - sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1); - - // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling - long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel(); - return Math.max(1, estimatedKeys); - } - - /** - * Returns the number of entries in the IndexSummary. At full sampling, this is approximately 1/INDEX_INTERVALth of - * the keys in this SSTable. - */ - public int getIndexSummarySize() - { - return indexSummary.size(); - } - - /** - * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling. - */ - public int getMaxIndexSummarySize() - { - return indexSummary.getMaxNumberOfEntries(); - } - - /** - * Returns the key for the index summary entry at `index`. - */ - public byte[] getIndexSummaryKey(int index) - { - return indexSummary.getKey(index); - } - - private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges) - { - // use the index to determine a minimal section for each range - List<Pair<Integer,Integer>> positions = new ArrayList<>(); - - for (Range<Token> range : Range.normalize(ranges)) - { - RowPosition leftPosition = range.left.maxKeyBound(); - RowPosition rightPosition = range.right.maxKeyBound(); - - int left = summary.binarySearch(leftPosition); - if (left < 0) - left = (left + 1) * -1; - else - // left range are start exclusive - left = left + 1; - if (left == summary.size()) - // left is past the end of the sampling - continue; - - int right = Range.isWrapAround(range.left, range.right) - ? summary.size() - 1 - : summary.binarySearch(rightPosition); - if (right < 0) - { - // range are end inclusive so we use the previous index from what binarySearch give us - // since that will be the last index we will return - right = (right + 1) * -1; - if (right == 0) - // Means the first key is already stricly greater that the right bound - continue; - right--; - } - - if (left > right) - // empty range - continue; - positions.add(Pair.create(left, right)); - } - return positions; - } - - public Iterable<DecoratedKey> getKeySamples(final Range<Token> range) - { - final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range)); - - if (indexRanges.isEmpty()) - return Collections.emptyList(); - - return new Iterable<DecoratedKey>() - { - public Iterator<DecoratedKey> iterator() - { - return new Iterator<DecoratedKey>() - { - private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator(); - private Pair<Integer, Integer> current; - private int idx; - - public boolean hasNext() - { - if (current == null || idx > current.right) - { - if (rangeIter.hasNext()) - { - current = rangeIter.next(); - idx = current.left; - return true; - } - return false; - } - - return true; - } - - public DecoratedKey next() - { - byte[] bytes = indexSummary.getKey(idx++); - return partitioner.decorateKey(ByteBuffer.wrap(bytes)); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - - /** - * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges. - * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable. - */ - public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges) - { - // use the index to determine a minimal section for each range - List<Pair<Long,Long>> positions = new ArrayList<>(); - for (Range<Token> range : Range.normalize(ranges)) - { - AbstractBounds<RowPosition> keyRange = range.toRowBounds(); - RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT); - long left = idxLeft == null ? -1 : idxLeft.position; - if (left == -1) - // left is past the end of the file - continue; - RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT); - long right = idxRight == null ? -1 : idxRight.position; - if (right == -1 || Range.isWrapAround(range.left, range.right)) - // right is past the end of the file, or it wraps - right = uncompressedLength(); - if (left == right) - // empty range - continue; - positions.add(Pair.create(left, right)); - } - return positions; - } - - public void invalidateCacheKey(DecoratedKey key) - { - KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); - keyCache.remove(cacheKey); - } - - public void cacheKey(DecoratedKey key, RowIndexEntry info) - { - CachingOptions caching = metadata.getCaching(); - - if (!caching.keyCache.isEnabled() - || keyCache == null - || keyCache.getCapacity() == 0) - { - return; - } - - KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); - logger.trace("Adding cache entry for {} -> {}", cacheKey, info); - keyCache.put(cacheKey, info); - } - - public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) - { - return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats); - } - - private RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats) - { - if (keyCache != null && keyCache.getCapacity() > 0) { - if (updateStats) - { - RowIndexEntry cachedEntry = keyCache.get(unifiedKey); - keyCacheRequest.incrementAndGet(); - if (cachedEntry != null) - keyCacheHit.incrementAndGet(); - return cachedEntry; - } - else - { - return keyCache.getInternal(unifiedKey); - } - } - return null; - } - - /** - * Get position updating key cache and stats. - * @see #getPosition(org.apache.cassandra.db.RowPosition, org.apache.cassandra.io.sstable.SSTableReader.Operator, boolean) - */ - public RowIndexEntry getPosition(RowPosition key, Operator op) - { - return getPosition(key, op, true); - } - - /** - * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to - * allow key selection by token bounds but only if op != * EQ - * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. - * @param updateCacheAndStats true if updating stats and cache - * @return The index entry corresponding to the key, or null if the key is not present - */ - public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats) - { - // first, check bloom filter - if (op == Operator.EQ) - { - assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key - if (!bf.isPresent(((DecoratedKey)key).getKey())) - { - Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation); - return null; - } - } - - // next, the key cache (only make sense for valid row key) - if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) - { - DecoratedKey decoratedKey = (DecoratedKey)key; - KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey()); - RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats); - if (cachedPosition != null) - { - Tracing.trace("Key cache hit for sstable {}", descriptor.generation); - return cachedPosition; - } - } - - // check the smallest and greatest keys in the sstable to see if it can't be present - if (first.compareTo(key) > 0 || last.compareTo(key) < 0) - { - if (op == Operator.EQ && updateCacheAndStats) - bloomFilterTracker.addFalsePositive(); - - if (op.apply(1) < 0) - { - Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation); - return null; - } - } - - int binarySearchResult = indexSummary.binarySearch(key); - long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary); - int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult); - - // if we matched the -1th position, we'll start at the first position - sampledPosition = sampledPosition == -1 ? 0 : sampledPosition; - - int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex); - - // scan the on-disk index, starting at the nearest sampled position. - // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present - // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the - // next index position because the searched key can be greater the last key of the index interval checked if it - // is lesser than the first key of next interval (and in that case we must return the position of the first key - // of the next interval). - int i = 0; - Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); - while (segments.hasNext() && i <= effectiveInterval) - { - FileDataInput in = segments.next(); - try - { - while (!in.isEOF() && i <= effectiveInterval) - { - i++; - - ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - - boolean opSatisfied; // did we find an appropriate position for the op requested - boolean exactMatch; // is the current position an exact match for the key, suitable for caching - - // Compare raw keys if possible for performance, otherwise compare decorated keys. - if (op == Operator.EQ) - { - opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); - } - else - { - DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); - int comparison = indexDecoratedKey.compareTo(key); - int v = op.apply(comparison); - opSatisfied = (v == 0); - exactMatch = (comparison == 0); - if (v < 0) - { - Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); - return null; - } - } - - if (opSatisfied) - { - // read data position from index entry - RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(in, descriptor.version); - if (exactMatch && updateCacheAndStats) - { - assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key - DecoratedKey decoratedKey = (DecoratedKey)key; - - if (logger.isTraceEnabled()) - { - // expensive sanity check! see CASSANDRA-4687 - FileDataInput fdi = dfile.getSegment(indexEntry.position); - DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); - if (!keyInDisk.equals(key)) - throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); - fdi.close(); - } - - // store exact match for the key - cacheKey(decoratedKey, indexEntry); - } - if (op == Operator.EQ && updateCacheAndStats) - bloomFilterTracker.addTruePositive(); - Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); - return indexEntry; - } - - RowIndexEntry.Serializer.skip(in); - } - } - catch (IOException e) - { - markSuspect(); - throw new CorruptSSTableException(e, in.getPath()); - } - finally - { - FileUtils.closeQuietly(in); - } - } - - if (op == Operator.EQ && updateCacheAndStats) - bloomFilterTracker.addFalsePositive(); - Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation); - return null; - } - - /** - * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. - */ - public DecoratedKey firstKeyBeyond(RowPosition token) - { - long sampledPosition = getIndexScanPosition(token); - if (sampledPosition == -1) - sampledPosition = 0; - - Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); - while (segments.hasNext()) - { - FileDataInput in = segments.next(); - try - { - while (!in.isEOF()) - { - ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); - if (indexDecoratedKey.compareTo(token) > 0) - return indexDecoratedKey; - - RowIndexEntry.Serializer.skip(in); - } - } - catch (IOException e) - { - markSuspect(); - throw new CorruptSSTableException(e, in.getPath()); - } - finally - { - FileUtils.closeQuietly(in); - } - } - - return null; - } - - /** - * @return The length in bytes of the data for this SSTable. For - * compressed files, this is not the same thing as the on disk size (see - * onDiskLength()) - */ - public long uncompressedLength() - { - return dfile.length; - } - - /** - * @return The length in bytes of the on disk size for this SSTable. For - * compressed files, this is not the same thing as the data length (see - * length()) - */ - public long onDiskLength() - { - return dfile.onDiskLength; - } - - public boolean acquireReference() - { - while (true) - { - int n = references.get(); - if (n <= 0) - return false; - if (references.compareAndSet(n, n + 1)) - return true; - } - } - - /** - * Release reference to this SSTableReader. - * If there is no one referring to this SSTable, and is marked as compacted, - * all resources are cleaned up and files are deleted eventually. - */ - public void releaseReference() - { - if (references.decrementAndGet() == 0) - tidy(true); - assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path; - } - - /** - * Mark the sstable as obsolete, i.e., compacted into newer sstables. - * - * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere - * except for threads holding a reference. - * - * @return true if the this is the first time the file was marked obsolete. Calling this - * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize). - */ - public boolean markObsolete() - { - if (logger.isDebugEnabled()) - logger.debug("Marking {} compacted", getFilename()); - - synchronized (replaceLock) - { - assert replacedBy == null; - } - return !isCompacted.getAndSet(true); - } - - public boolean isMarkedCompacted() - { - return isCompacted.get(); - } - - public void markSuspect() - { - if (logger.isDebugEnabled()) - logger.debug("Marking {} as a suspect for blacklisting.", getFilename()); - - isSuspect.getAndSet(true); - } - - public boolean isMarkedSuspect() - { - return isSuspect.get(); - } - - /** - * - * @param dataRange filter to use when reading the columns - * @return A Scanner for seeking over the rows of the SSTable. - */ - public SSTableScanner getScanner(DataRange dataRange) - { - return new SSTableScanner(this, dataRange, null); - } - - /** - * I/O SSTableScanner - * @return A Scanner for seeking over the rows of the SSTable. - */ - public SSTableScanner getScanner() - { - return getScanner((RateLimiter) null); - } - - public SSTableScanner getScanner(RateLimiter limiter) - { - return new SSTableScanner(this, DataRange.allData(partitioner), limiter); - } - - /** - * Direct I/O SSTableScanner over a defined range of tokens. - * - * @param range the range of keys to cover - * @return A Scanner for seeking over the rows of the SSTable. - */ - public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter) - { - if (range == null) - return getScanner(limiter); - return getScanner(Collections.singletonList(range), limiter); - } - - /** - * Direct I/O SSTableScanner over a defined collection of ranges of tokens. - * - * @param ranges the range of keys to cover - * @return A Scanner for seeking over the rows of the SSTable. - */ - public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) - { - // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) - List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges)); - if (positions.isEmpty()) - return new EmptyCompactionScanner(getFilename()); - else - return new SSTableScanner(this, ranges, limiter); - } - - public FileDataInput getFileDataInput(long position) - { - return dfile.getSegment(position); - } - - /** - * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time). - * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable. - * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host - * @return True iff this sstable contains data that's newer than the given age parameter. - */ - public boolean newSince(long age) - { - return maxDataAge > age; - } - - public void createLinks(String snapshotDirectoryPath) - { - for (Component component : components) - { - File sourceFile = new File(descriptor.filenameFor(component)); - File targetLink = new File(snapshotDirectoryPath, sourceFile.getName()); - FileUtils.createHardLink(sourceFile, targetLink); - } - } - - public boolean isRepaired() - { - return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE; - } - - public SSTableReader getCurrentReplacement() - { - synchronized (replaceLock) - { - SSTableReader cur = this, next = replacedBy; - while (next != null) - { - cur = next; - next = next.replacedBy; - } - return cur; - } - } - - /** - * TODO: Move someplace reusable - */ - public abstract static class Operator - { - public static final Operator EQ = new Equals(); - public static final Operator GE = new GreaterThanOrEqualTo(); - public static final Operator GT = new GreaterThan(); - - /** - * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs. - * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward. - */ - public abstract int apply(int comparison); - - final static class Equals extends Operator - { - public int apply(int comparison) { return -comparison; } - } - - final static class GreaterThanOrEqualTo extends Operator - { - public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; } - } - - final static class GreaterThan extends Operator - { - public int apply(int comparison) { return comparison > 0 ? 0 : 1; } - } - } - - public long getBloomFilterFalsePositiveCount() - { - return bloomFilterTracker.getFalsePositiveCount(); - } - - public long getRecentBloomFilterFalsePositiveCount() - { - return bloomFilterTracker.getRecentFalsePositiveCount(); - } - - public long getBloomFilterTruePositiveCount() - { - return bloomFilterTracker.getTruePositiveCount(); - } - - public long getRecentBloomFilterTruePositiveCount() - { - return bloomFilterTracker.getRecentTruePositiveCount(); - } - - public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache() - { - return keyCache; - } - - public EstimatedHistogram getEstimatedRowSize() - { - return sstableMetadata.estimatedRowSize; - } - - public EstimatedHistogram getEstimatedColumnCount() - { - return sstableMetadata.estimatedColumnCount; - } - - public double getEstimatedDroppableTombstoneRatio(int gcBefore) - { - return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore); - } - - public double getDroppableTombstonesBefore(int gcBefore) - { - return sstableMetadata.getDroppableTombstonesBefore(gcBefore); - } - - public double getCompressionRatio() - { - return sstableMetadata.compressionRatio; - } - - public ReplayPosition getReplayPosition() - { - return sstableMetadata.replayPosition; - } - - public long getMinTimestamp() - { - return sstableMetadata.minTimestamp; - } - - public long getMaxTimestamp() - { - return sstableMetadata.maxTimestamp; - } - - public Set<Integer> getAncestors() - { - try - { - CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION); - return compactionMetadata.ancestors; - } - catch (IOException e) - { - SSTableReader.logOpenException(descriptor, e); - return Collections.emptySet(); - } - } - - public int getSSTableLevel() - { - return sstableMetadata.sstableLevel; - } - - /** - * Reloads the sstable metadata from disk. - * - * Called after level is changed on sstable, for example if the sstable is dropped to L0 - * - * Might be possible to remove in future versions - * - * @throws IOException - */ - public void reloadSSTableMetadata() throws IOException - { - this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS); - } - - public StatsMetadata getSSTableMetadata() - { - return sstableMetadata; - } - - public RandomAccessReader openDataReader(RateLimiter limiter) - { - assert limiter != null; - return compression - ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter) - : ThrottledReader.open(new File(getFilename()), limiter); - } - - public RandomAccessReader openDataReader() - { - return compression - ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata()) - : RandomAccessReader.open(new File(getFilename())); - } - - public RandomAccessReader openIndexReader() - { - return RandomAccessReader.open(new File(getIndexFilename())); - } - - /** - * @param component component to get timestamp. - * @return last modified time for given component. 0 if given component does not exist or IO error occurs. - */ - public long getCreationTimeFor(Component component) - { - return new File(descriptor.filenameFor(component)).lastModified(); - } - - /** - * @return Number of key cache hit - */ - public long getKeyCacheHit() - { - return keyCacheHit.get(); - } - - /** - * @return Number of key cache request - */ - public long getKeyCacheRequest() - { - return keyCacheRequest.get(); - } - - /** - * @param sstables - * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false. - */ - public static boolean acquireReferences(Iterable<SSTableReader> sstables) - { - SSTableReader failed = null; - for (SSTableReader sstable : sstables) - { - if (!sstable.acquireReference()) - { - failed = sstable; - break; - } - } - - if (failed == null) - return true; - - for (SSTableReader sstable : sstables) - { - if (sstable == failed) - break; - sstable.releaseReference(); - } - return false; - } - - public static void releaseReferences(Iterable<SSTableReader> sstables) - { - for (SSTableReader sstable : sstables) - { - sstable.releaseReference(); - } - } - - private void dropPageCache() - { - dropPageCache(dfile.path); - dropPageCache(ifile.path); - } - - private void dropPageCache(String filePath) - { - RandomAccessFile file = null; - - try - { - file = new RandomAccessFile(filePath, "r"); - - int fd = CLibrary.getfd(file.getFD()); - - if (fd > 0) - { - if (logger.isDebugEnabled()) - logger.debug(String.format("Dropping page cache of file %s.", filePath)); - - CLibrary.trySkipCache(fd, 0, 0); - } - } - catch (IOException e) - { - // we don't care if cache cleanup fails - } - finally - { - FileUtils.closeQuietly(file); - } - } - - /** - * Increment the total row read count and read rate for this SSTable. This should not be incremented for range - * slice queries, row cache hits, or non-query reads, like compaction. - */ - public void incrementReadCount() - { - if (readMeter != null) - readMeter.mark(); - } - - protected class EmptyCompactionScanner implements ICompactionScanner - { - private final String filename; - - public EmptyCompactionScanner(String filename) - { - this.filename = filename; - } - - public long getLengthInBytes() - { - return 0; - } - - public long getCurrentPosition() - { - return 0; - } - - public String getBackingFiles() - { - return filename; - } - - public boolean hasNext() - { - return false; - } - - public OnDiskAtomIterator next() - { - return null; - } - - public void close() throws IOException { } - - public void remove() { } - } - - public static class SizeComparator implements Comparator<SSTableReader> - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 76677ac..18825cb 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -35,6 +35,8 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.FBUtilities;
