http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java new file mode 100644 index 0000000..eb43968 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.AbstractCell; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.OnDiskAtom; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.LazilyCompactedRow; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.FileDataInput; + +import java.util.Iterator; +import java.util.Set; + +/** + * Legacy bigtable format + */ +public class BigFormat implements SSTableFormat +{ + public static final BigFormat instance = new BigFormat(); + public static final BigVersion latestVersion = new BigVersion(BigVersion.current_version); + private static final SSTableReader.Factory readerFactory = new ReaderFactory(); + private static final SSTableWriter.Factory writerFactory = new WriterFactory(); + + private BigFormat() + { + + } + + @Override + public Version getLatestVersion() + { + return latestVersion; + } + + @Override + public Version getVersion(String version) + { + return new BigVersion(version); + } + + @Override + public SSTableWriter.Factory getWriterFactory() + { + return writerFactory; + } + + @Override + public SSTableReader.Factory getReaderFactory() + { + return readerFactory; + } + + @Override + public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version) + { + return AbstractCell.onDiskIterator(in, flag, expireBefore, version, cfm.comparator); + } + + @Override + public AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators) + { + return new LazilyCompactedRow(controller, onDiskAtomIterators); + } + + @Override + public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData cfMetaData) + { + return new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(cfMetaData.comparator)); + } + + static class WriterFactory extends SSTableWriter.Factory + { + @Override + public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + { + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + } + } + + static class ReaderFactory extends SSTableReader.Factory + { + @Override + public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason) + { + return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + } + } + + // versions are denoted as [major][minor]. Minor versions must be forward-compatible: + // new fields are allowed in e.g. the metadata component, but fields can't be removed + // or have their size changed. + // + // Minor versions were introduced with version "hb" for Cassandra 1.0.3; prior to that, + // we always incremented the major version. + static class BigVersion extends Version + { + public static final String current_version = "la"; + public static final String earliest_supported_version = "ja"; + + // ja (2.0.0): super columns are serialized as composites (note that there is no real format change, + // this is mostly a marker to know if we should expect super columns or not. We do need + // a major version bump however, because we should not allow streaming of super columns + // into this new format) + // tracks max local deletiontime in sstable metadata + // records bloom_filter_fp_chance in metadata component + // remove data size and column count from data file (CASSANDRA-4180) + // tracks max/min column values (according to comparator) + // jb (2.0.1): switch from crc32 to adler32 for compression checksums + // checksum the compressed data + // ka (2.1.0): new Statistics.db file format + // index summaries can be downsampled and the sampling level is persisted + // switch uncompressed checksums to adler32 + // tracks presense of legacy (local and remote) counter shards + // la (3.0.0): new file name format + + private final boolean isLatestVersion; + private final boolean hasPostCompressionAdlerChecksums; + private final boolean hasSamplingLevel; + private final boolean newStatsFile; + private final boolean hasAllAdlerChecksums; + private final boolean hasRepairedAt; + private final boolean tracksLegacyCounterShards; + private final boolean newFileName; + + public BigVersion(String version) + { + super(instance,version); + + isLatestVersion = version.compareTo(current_version) == 0; + hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0; + hasSamplingLevel = version.compareTo("ka") >= 0; + newStatsFile = version.compareTo("ka") >= 0; + hasAllAdlerChecksums = version.compareTo("ka") >= 0; + hasRepairedAt = version.compareTo("ka") >= 0; + tracksLegacyCounterShards = version.compareTo("ka") >= 0; + newFileName = version.compareTo("la") >= 0; + } + + @Override + public boolean isLatestVersion() + { + return isLatestVersion; + } + + @Override + public boolean hasPostCompressionAdlerChecksums() + { + return hasPostCompressionAdlerChecksums; + } + + @Override + public boolean hasSamplingLevel() + { + return hasSamplingLevel; + } + + @Override + public boolean hasNewStatsFile() + { + return newStatsFile; + } + + @Override + public boolean hasAllAdlerChecksums() + { + return hasAllAdlerChecksums; + } + + @Override + public boolean hasRepairedAt() + { + return hasRepairedAt; + } + + @Override + public boolean tracksLegacyCounterShards() + { + return tracksLegacyCounterShards; + } + + @Override + public boolean hasNewFileName() + { + return newFileName; + } + + @Override + public boolean isCompatible() + { + return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java new file mode 100644 index 0000000..2488f86 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. + */ +public class BigTableReader extends SSTableReader +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); + + BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason) + { + super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns) + { + return new SSTableNamesIterator(this, key, columns); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry ) + { + return new SSTableNamesIterator(this, input, key, columns, indexEntry); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse) + { + return new SSTableSliceIterator(this, key, slices, reverse); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry) + { + return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry); + } + /** + * + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter) + { + return new BigTableScanner(this, dataRange, limiter); + } + + + /** + * Direct I/O SSTableScanner over a defined collection of ranges of tokens. + * + * @param ranges the range of keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) + { + // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) + List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges)); + if (positions.isEmpty()) + return new EmptyCompactionScanner(getFilename()); + else + return new BigTableScanner(this, ranges, limiter); + } + + + /** + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param updateCacheAndStats true if updating stats and cache + * @return The index entry corresponding to the key, or null if the key is not present + */ + public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats) + { + // first, check bloom filter + if (op == Operator.EQ) + { + assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key + if (!bf.isPresent(((DecoratedKey)key).getKey())) + { + Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation); + return null; + } + } + + // next, the key cache (only make sense for valid row key) + if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) + { + DecoratedKey decoratedKey = (DecoratedKey)key; + KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey()); + RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats); + if (cachedPosition != null) + { + Tracing.trace("Key cache hit for sstable {}", descriptor.generation); + return cachedPosition; + } + } + + // check the smallest and greatest keys in the sstable to see if it can't be present + if (first.compareTo(key) > 0 || last.compareTo(key) < 0) + { + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + + if (op.apply(1) < 0) + { + Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation); + return null; + } + } + + int binarySearchResult = indexSummary.binarySearch(key); + long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary); + int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult); + + // if we matched the -1th position, we'll start at the first position + sampledPosition = sampledPosition == -1 ? 0 : sampledPosition; + + int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex); + + // scan the on-disk index, starting at the nearest sampled position. + // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present + // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the + // next index position because the searched key can be greater the last key of the index interval checked if it + // is lesser than the first key of next interval (and in that case we must return the position of the first key + // of the next interval). + int i = 0; + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); + while (segments.hasNext() && i <= effectiveInterval) + { + FileDataInput in = segments.next(); + try + { + while (!in.isEOF() && i <= effectiveInterval) + { + i++; + + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + + boolean opSatisfied; // did we find an appropriate position for the op requested + boolean exactMatch; // is the current position an exact match for the key, suitable for caching + + // Compare raw keys if possible for performance, otherwise compare decorated keys. + if (op == Operator.EQ) + { + opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); + } + else + { + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + int comparison = indexDecoratedKey.compareTo(key); + int v = op.apply(comparison); + opSatisfied = (v == 0); + exactMatch = (comparison == 0); + if (v < 0) + { + Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); + return null; + } + } + + if (opSatisfied) + { + // read data position from index entry + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version); + if (exactMatch && updateCacheAndStats) + { + assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key + DecoratedKey decoratedKey = (DecoratedKey)key; + + if (logger.isTraceEnabled()) + { + // expensive sanity check! see CASSANDRA-4687 + FileDataInput fdi = dfile.getSegment(indexEntry.position); + DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + if (!keyInDisk.equals(key)) + throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); + fdi.close(); + } + + // store exact match for the key + cacheKey(decoratedKey, indexEntry); + } + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addTruePositive(); + Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); + return indexEntry; + } + + RowIndexEntry.Serializer.skip(in); + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, in.getPath()); + } + finally + { + FileUtils.closeQuietly(in); + } + } + + if (op == SSTableReader.Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation); + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java new file mode 100644 index 0000000..c1fc079 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.AbstractIterator; +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; +import org.apache.cassandra.db.columniterator.LazyColumnIterator; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class BigTableScanner implements ICompactionScanner +{ + protected final RandomAccessReader dfile; + protected final RandomAccessReader ifile; + public final SSTableReader sstable; + + private final Iterator<AbstractBounds<RowPosition>> rangeIterator; + private AbstractBounds<RowPosition> currentRange; + + private final DataRange dataRange; + private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + + protected Iterator<OnDiskAtomIterator> iterator; + + /** + * @param sstable SSTable to scan; must not be null + * @param dataRange a single range to scan; must not be null + * @param limiter background i/o RateLimiter; may be null + */ + public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + { + assert sstable != null; + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = dataRange; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2); + if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum(sstable.partitioner)) + { + // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and + // 2) the part that comes before the wrap-around + boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey(), sstable.partitioner)); + boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner)); + } + else + { + boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey(), sstable.partitioner)); + } + this.rangeIterator = boundsList.iterator(); + } + + /** + * @param sstable SSTable to scan; must not be null + * @param tokenRanges A set of token ranges to scan + * @param limiter background i/o RateLimiter; may be null + */ + public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) + { + assert sstable != null; + + this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); + this.ifile = sstable.openIndexReader(); + this.sstable = sstable; + this.dataRange = null; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + List<Range<Token>> normalized = Range.normalize(tokenRanges); + List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size()); + for (Range<Token> range : normalized) + boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner), + range.right.maxKeyBound(sstable.partitioner), + sstable.partitioner)); + + this.rangeIterator = boundsList.iterator(); + } + + private void seekToCurrentRangeStart() + { + if (currentRange.left.isMinimum(sstable.partitioner)) + return; + + long indexPosition = sstable.getIndexScanPosition(currentRange.left); + // -1 means the key is before everything in the sstable. So just start from the beginning. + if (indexPosition == -1) + { + // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and + // the seeks are no-op anyway if we are. + ifile.seek(0); + dfile.seek(0); + return; + } + + ifile.seek(indexPosition); + try + { + + while (!ifile.isEOF()) + { + indexPosition = ifile.getFilePointer(); + DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + int comparison = indexDecoratedKey.compareTo(currentRange.left); + // because our range start may be inclusive or exclusive, we need to also contains() + // instead of just checking (comparison >= 0) + if (comparison > 0 || currentRange.contains(indexDecoratedKey)) + { + // Found, just read the dataPosition and seek into index and data files + long dataPosition = ifile.readLong(); + ifile.seek(indexPosition); + dfile.seek(dataPosition); + break; + } + else + { + RowIndexEntry.Serializer.skip(ifile); + } + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + public void close() throws IOException + { + FileUtils.close(dfile, ifile); + } + + public long getLengthInBytes() + { + return dfile.length(); + } + + public long getCurrentPosition() + { + return dfile.getFilePointer(); + } + + public String getBackingFiles() + { + return sstable.toString(); + } + + public boolean hasNext() + { + if (iterator == null) + iterator = createIterator(); + return iterator.hasNext(); + } + + public OnDiskAtomIterator next() + { + if (iterator == null) + iterator = createIterator(); + return iterator.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + private Iterator<OnDiskAtomIterator> createIterator() + { + return new KeyScanningIterator(); + } + + protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator> + { + private DecoratedKey nextKey; + private RowIndexEntry nextEntry; + private DecoratedKey currentKey; + private RowIndexEntry currentEntry; + + protected OnDiskAtomIterator computeNext() + { + try + { + if (nextEntry == null) + { + do + { + // we're starting the first range or we just passed the end of the previous range + if (!rangeIterator.hasNext()) + return endOfData(); + + currentRange = rangeIterator.next(); + seekToCurrentRangeStart(); + + if (ifile.isEOF()) + return endOfData(); + + currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + } while (!currentRange.contains(currentKey)); + } + else + { + // we're in the middle of a range + currentKey = nextKey; + currentEntry = nextEntry; + } + + long readEnd; + if (ifile.isEOF()) + { + nextEntry = null; + nextKey = null; + readEnd = dfile.length(); + } + else + { + // we need the position of the start of the next key, regardless of whether it falls in the current range + nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); + nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + readEnd = nextEntry.position; + + if (!currentRange.contains(nextKey)) + { + nextKey = null; + nextEntry = null; + } + } + + if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey())) + { + dfile.seek(currentEntry.position + currentEntry.headerOffset()); + ByteBufferUtil.readWithShortLength(dfile); // key + return new SSTableIdentityIterator(sstable, dfile, currentKey); + } + + return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() + { + public OnDiskAtomIterator create() + { + return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); + } + }); + + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(" + + "dfile=" + dfile + + " ifile=" + ifile + + " sstable=" + sstable + + ")"; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java new file mode 100644 index 0000000..ec53b4e --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FilterFactory; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.StreamingHistogram; + +public class BigTableWriter extends SSTableWriter +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); + + // not very random, but the only value that can't be mistaken for a legal column-name length + public static final int END_OF_ROW = 0x0000; + + private IndexWriter iwriter; + private SegmentedFile.Builder dbuilder; + private final SequentialWriter dataFile; + private DecoratedKey lastWrittenKey; + private FileMark dataMark; + + BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + { + super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + + iwriter = new IndexWriter(keyCount); + + if (compression) + { + dataFile = SequentialWriter.open(getFilename(), + descriptor.filenameFor(Component.COMPRESSION_INFO), + metadata.compressionParameters(), + metadataCollector); + dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); + } + else + { + dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); + dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); + } + } + + public void mark() + { + dataMark = dataFile.mark(); + iwriter.mark(); + } + + public void resetAndTruncate() + { + dataFile.resetAndTruncate(dataMark); + iwriter.resetAndTruncate(); + } + + /** + * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written + */ + private long beforeAppend(DecoratedKey decoratedKey) + { + assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values + if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); + return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); + } + + private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index) + { + metadataCollector.addKey(decoratedKey.getKey()); + lastWrittenKey = decoratedKey; + last = lastWrittenKey; + if (first == null) + first = lastWrittenKey; + + if (logger.isTraceEnabled()) + logger.trace("wrote {} at {}", decoratedKey, dataPosition); + iwriter.append(decoratedKey, index); + dbuilder.addPotentialBoundary(dataPosition); + } + + /** + * @param row + * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + */ + public RowIndexEntry append(AbstractCompactedRow row) + { + long currentPosition = beforeAppend(row.key); + RowIndexEntry entry; + try + { + entry = row.write(currentPosition, dataFile); + if (entry == null) + return null; + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); + afterAppend(row.key, currentPosition, entry); + return entry; + } + + public void append(DecoratedKey decoratedKey, ColumnFamily cf) + { + long startPosition = beforeAppend(decoratedKey); + try + { + RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); + afterAppend(decoratedKey, startPosition, entry); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); + } + + private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException + { + assert cf.hasColumns() || cf.isMarkedForDelete(); + + ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); + ColumnIndex index = builder.build(cf); + + out.writeShort(END_OF_ROW); + return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + } + + /** + * @throws IOException if a read from the DataInput fails + * @throws FSWriteError if a write to the dataFile fails + */ + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException + { + long currentPosition = beforeAppend(key); + + ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); + ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); + ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); + List<ByteBuffer> minColumnNames = Collections.emptyList(); + List<ByteBuffer> maxColumnNames = Collections.emptyList(); + StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); + boolean hasLegacyCounterShards = false; + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + cf.delete(DeletionTime.serializer.deserialize(in)); + + ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); + + if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) + { + tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); + minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); + } + + Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); + while (rangeTombstoneIterator.hasNext()) + { + RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); + tombstones.update(rangeTombstone.getLocalDeletionTime()); + minTimestampTracker.update(rangeTombstone.timestamp()); + maxTimestampTracker.update(rangeTombstone.timestamp()); + maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); + } + + Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator); + try + { + while (iter.hasNext()) + { + OnDiskAtom atom = iter.next(); + if (atom == null) + break; + + if (atom instanceof CounterCell) + { + atom = ((CounterCell) atom).markLocalToBeCleared(); + hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); + } + + int deletionTime = atom.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + tombstones.update(deletionTime); + minTimestampTracker.update(atom.timestamp()); + maxTimestampTracker.update(atom.timestamp()); + minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); + maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); + maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); + + columnIndexer.add(atom); // This write the atom on disk too + } + + columnIndexer.maybeWriteEmptyRowHeader(); + dataFile.stream.writeShort(END_OF_ROW); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); + } + + metadataCollector.updateMinTimestamp(minTimestampTracker.get()) + .updateMaxTimestamp(maxTimestampTracker.get()) + .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) + .addRowSize(dataFile.getFilePointer() - currentPosition) + .addColumnCount(columnIndexer.writtenAtomCount()) + .mergeTombstoneHistogram(tombstones) + .updateMinColumnNames(minColumnNames) + .updateMaxColumnNames(maxColumnNames) + .updateHasLegacyCounterShards(hasLegacyCounterShards); + + afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); + return currentPosition; + } + + /** + * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable + */ + public void abort(boolean closeBf) + { + assert descriptor.type.isTemporary; + if (iwriter == null && dataFile == null) + return; + if (iwriter != null) + { + FileUtils.closeQuietly(iwriter.indexFile); + if (closeBf) + { + iwriter.bf.close(); + } + } + if (dataFile!= null) + FileUtils.closeQuietly(dataFile); + + Set<Component> components = SSTable.componentsFor(descriptor); + try + { + if (!components.isEmpty()) + SSTable.delete(descriptor, components); + } + catch (FSWriteError e) + { + logger.error(String.format("Failed deleting temp components for %s", descriptor), e); + throw e; + } + } + + // we use this method to ensure any managed data we may have retained references to during the write are no + // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction + public void isolateReferences() + { + // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other + // data retention is done through copying + first = getMinimalKey(first); + last = lastWrittenKey = getMinimalKey(last); + } + + public SSTableReader openEarly(long maxDataAge) + { + StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(), + repairedAt).get(MetadataType.STATS); + + // find the max (exclusive) readable key + DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0); + if (exclusiveUpperBoundOfReadableIndex == null) + return null; + + // create temp links if they don't already exist + Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK); + if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists()) + { + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX))); + FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA))); + } + + // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers + SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX)); + SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA)); + SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL), + components, metadata, + partitioner, ifile, + dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex), + iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY); + + // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex); + DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1); + if (inclusiveUpperBoundOfReadableData == null) + return null; + int offset = 2; + while (true) + { + RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT); + if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset()) + break; + inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++); + if (inclusiveUpperBoundOfReadableData == null) + return null; + } + sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData); + return sstable; + } + + public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt) + { + Pair<Descriptor, StatsMetadata> p = close(repairedAt); + Descriptor newdesc = p.left; + StatsMetadata sstableMetadata = p.right; + + // finalize in-memory state for the reader + SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX)); + SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA)); + SSTableReader sstable = SSTableReader.internalOpen(newdesc, + components, + metadata, + partitioner, + ifile, + dfile, + iwriter.summary.build(partitioner), + iwriter.bf, + maxDataAge, + sstableMetadata, + SSTableReader.OpenReason.NORMAL); + sstable.first = getMinimalKey(first); + sstable.last = getMinimalKey(last); + // try to save the summaries to disk + sstable.saveSummary(iwriter.builder, dbuilder); + iwriter = null; + dbuilder = null; + return sstable; + } + + // Close the writer and return the descriptor to the new sstable and it's metadata + public Pair<Descriptor, StatsMetadata> close() + { + return close(this.repairedAt); + } + + private Pair<Descriptor, StatsMetadata> close(long repairedAt) + { + + // index and filter + iwriter.close(); + // main data, close will truncate if necessary + dataFile.close(); + dataFile.writeFullChecksum(descriptor); + // write sstable statistics + Map<MetadataType, MetadataComponent> metadataComponents = metadataCollector.finalizeMetadata( + partitioner.getClass().getCanonicalName(), + metadata.getBloomFilterFpChance(), + repairedAt); + writeMetadata(descriptor, metadataComponents); + + // save the table of components + SSTable.appendTOC(descriptor, components); + + // remove the 'tmp' marker from all components + return Pair.create(SSTableWriter.rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS)); + + } + + + private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) + { + SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS))); + try + { + desc.getMetadataSerializer().serialize(components, out.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, out.getPath()); + } + finally + { + out.close(); + } + } + + public long getFilePointer() + { + return dataFile.getFilePointer(); + } + + public long getOnDiskFilePointer() + { + return dataFile.getOnDiskFilePointer(); + } + + /** + * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed. + */ + class IndexWriter implements Closeable + { + private final SequentialWriter indexFile; + public final SegmentedFile.Builder builder; + public final IndexSummaryBuilder summary; + public final IFilter bf; + private FileMark mark; + + IndexWriter(long keyCount) + { + indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); + summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); + bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true); + } + + // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file + DecoratedKey getMaxReadableKey(int offset) + { + long maxIndexLength = indexFile.getLastFlushOffset(); + return summary.getMaxReadableKey(maxIndexLength, offset); + } + + public void append(DecoratedKey key, RowIndexEntry indexEntry) + { + bf.add(key.getKey()); + long indexPosition = indexFile.getFilePointer(); + try + { + ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream); + rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream); + } + catch (IOException e) + { + throw new FSWriteError(e, indexFile.getPath()); + } + + if (logger.isTraceEnabled()) + logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition); + + summary.maybeAddEntry(key, indexPosition); + builder.addPotentialBoundary(indexPosition); + } + + /** + * Closes the index and bloomfilter, making the public state of this writer valid for consumption. + */ + public void close() + { + if (components.contains(Component.FILTER)) + { + String path = descriptor.filenameFor(Component.FILTER); + try + { + // bloom filter + FileOutputStream fos = new FileOutputStream(path); + DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos); + FilterFactory.serialize(bf, stream); + stream.flush(); + fos.getFD().sync(); + stream.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, path); + } + } + + // index + long position = indexFile.getFilePointer(); + indexFile.close(); // calls force + FileUtils.truncate(indexFile.getPath(), position); + } + + public void mark() + { + mark = indexFile.mark(); + } + + public void resetAndTruncate() + { + // we can't un-set the bloom filter addition, but extra keys in there are harmless. + // we can't reset dbuilder either, but that is the last thing called in afterappend so + // we assume that if that worked then we won't be trying to reset. + indexFile.resetAndTruncate(mark); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java new file mode 100644 index 0000000..a69cff9 --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call. + * This function assumes that the CF is sorted by name and exploits the name index. + */ +class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator +{ + private final ColumnFamily emptyColumnFamily; + + private final SSTableReader sstable; + private final List<IndexHelper.IndexInfo> indexes; + private final FileDataInput originalInput; + private FileDataInput file; + private final boolean reversed; + private final ColumnSlice[] slices; + private final BlockFetcher fetcher; + private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>(); + private final CellNameType comparator; + + // Holds range tombstone in reverse queries. See addColumn() + private final Deque<OnDiskAtom> rangeTombstonesReversed; + + /** + * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in + * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of + * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also + * assumes that validation has been performed in terms of intervals (no overlapping intervals). + */ + IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed) + { + Tracing.trace("Seeking to partition indexed section in data file"); + this.sstable = sstable; + this.originalInput = input; + this.reversed = reversed; + this.slices = slices; + this.comparator = sstable.metadata.comparator; + this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null; + + try + { + this.indexes = indexEntry.columnsIndex(); + emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata); + if (indexes.isEmpty()) + { + setToRowStart(indexEntry, input); + emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file)); + fetcher = new SimpleBlockFetcher(); + } + else + { + emptyColumnFamily.delete(indexEntry.deletionTime()); + fetcher = new IndexedBlockFetcher(indexEntry.position); + } + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, file.getPath()); + } + } + + /** + * Sets the seek position to the start of the row for column scanning. + */ + private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException + { + if (in == null) + { + this.file = sstable.getFileDataInput(rowEntry.position); + } + else + { + this.file = in; + in.seek(rowEntry.position); + } + sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); + } + + public ColumnFamily getColumnFamily() + { + return emptyColumnFamily; + } + + public DecoratedKey getKey() + { + throw new UnsupportedOperationException(); + } + + protected OnDiskAtom computeNext() + { + while (true) + { + if (reversed) + { + // Return all tombstone for the block first (see addColumn() below) + OnDiskAtom column = rangeTombstonesReversed.poll(); + if (column != null) + return column; + } + + OnDiskAtom column = blockColumns.poll(); + if (column == null) + { + if (!fetcher.fetchMoreData()) + return endOfData(); + } + else + { + return column; + } + } + } + + public void close() throws IOException + { + if (originalInput == null && file != null) + file.close(); + } + + protected void addColumn(OnDiskAtom col) + { + if (reversed) + { + /* + * We put range tomstone markers at the beginning of the range they delete. But for reversed queries, + * the caller still need to know about a RangeTombstone before it sees any column that it covers. + * To make that simple, we keep said tombstones separate and return them all before any column for + * a given block. + */ + if (col instanceof RangeTombstone) + rangeTombstonesReversed.addFirst(col); + else + blockColumns.addFirst(col); + } + else + { + blockColumns.addLast(col); + } + } + + private abstract class BlockFetcher + { + protected int currentSliceIdx; + + protected BlockFetcher(int sliceIdx) + { + this.currentSliceIdx = sliceIdx; + } + + /* + * Return the smallest key selected by the current ColumnSlice. + */ + protected Composite currentStart() + { + return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start; + } + + /* + * Return the biggest key selected by the current ColumnSlice. + */ + protected Composite currentFinish() + { + return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish; + } + + protected abstract boolean setNextSlice(); + + protected abstract boolean fetchMoreData(); + + protected boolean isColumnBeforeSliceStart(OnDiskAtom column) + { + return isBeforeSliceStart(column.name()); + } + + protected boolean isBeforeSliceStart(Composite name) + { + Composite start = currentStart(); + return !start.isEmpty() && comparator.compare(name, start) < 0; + } + + protected boolean isColumnBeforeSliceFinish(OnDiskAtom column) + { + Composite finish = currentFinish(); + return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0; + } + + protected boolean isAfterSliceFinish(Composite name) + { + Composite finish = currentFinish(); + return !finish.isEmpty() && comparator.compare(name, finish) > 0; + } + } + + private class IndexedBlockFetcher extends BlockFetcher + { + // where this row starts + private final long columnsStart; + + // the index entry for the next block to deserialize + private int nextIndexIdx = -1; + + // index of the last block we've read from disk; + private int lastDeserializedBlock = -1; + + // For reversed, keep columns at the beginning of the last deserialized block that + // may still match a slice + private final Deque<OnDiskAtom> prefetched; + + public IndexedBlockFetcher(long columnsStart) + { + super(-1); + this.columnsStart = columnsStart; + this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null; + setNextSlice(); + } + + protected boolean setNextSlice() + { + while (++currentSliceIdx < slices.length) + { + nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx); + if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size()) + // no index block for that slice + continue; + + // Check if we can exclude this slice entirely from the index + IndexInfo info = indexes.get(nextIndexIdx); + if (reversed) + { + if (!isBeforeSliceStart(info.lastName)) + return true; + } + else + { + if (!isAfterSliceFinish(info.firstName)) + return true; + } + } + nextIndexIdx = -1; + return false; + } + + protected boolean hasMoreSlice() + { + return currentSliceIdx < slices.length; + } + + protected boolean fetchMoreData() + { + if (!hasMoreSlice()) + return false; + + // If we read blocks in reversed disk order, we may have columns from the previous block to handle. + // Note that prefetched keeps columns in reversed disk order. + if (reversed && !prefetched.isEmpty()) + { + boolean gotSome = false; + // Avoids some comparison when we know it's not useful + boolean inSlice = false; + + OnDiskAtom prefetchedCol; + while ((prefetchedCol = prefetched.peek() ) != null) + { + // col is before slice, we update the slice + if (isColumnBeforeSliceStart(prefetchedCol)) + { + inSlice = false; + if (!setNextSlice()) + return false; + } + // col is within slice, all columns + // (we go in reverse, so as soon as we are in a slice, no need to check + // we're after the slice until we change slice) + else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol)) + { + blockColumns.addLast(prefetched.poll()); + gotSome = true; + inSlice = true; + } + // if col is after slice, ignore + else + { + prefetched.poll(); + } + } + if (gotSome) + return true; + } + try + { + return getNextBlock(); + } + catch (IOException e) + { + throw new CorruptSSTableException(e, file.getPath()); + } + } + + private boolean getNextBlock() throws IOException + { + if (lastDeserializedBlock == nextIndexIdx) + { + if (reversed) + nextIndexIdx--; + else + nextIndexIdx++; + } + lastDeserializedBlock = nextIndexIdx; + + // Are we done? + if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size()) + return false; + + IndexInfo currentIndex = indexes.get(lastDeserializedBlock); + + /* seek to the correct offset to the data, and calculate the data size */ + long positionToSeek = columnsStart + currentIndex.offset; + + // With new promoted indexes, our first seek in the data file will happen at that point. + if (file == null) + file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput; + + AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); + + file.seek(positionToSeek); + FileMark mark = file.mark(); + + // We remenber when we are whithin a slice to avoid some comparison + boolean inSlice = false; + + // scan from index start + while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed()) + { + // col is before slice + // (If in slice, don't bother checking that until we change slice) + Composite start = currentStart(); + if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) + { + if (reversed) + { + // the next slice select columns that are before the current one, so it may + // match this column, so keep it around. + prefetched.addFirst(deserializer.readNext()); + } + else + { + deserializer.skipNext(); + } + } + // col is within slice + else + { + Composite finish = currentFinish(); + if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) + { + inSlice = true; + addColumn(deserializer.readNext()); + } + // col is after slice. + else + { + // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice. + // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous + // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first + // columns of the block were not part of the current slice, i.e. if we have columns in prefetched. + if (reversed && prefetched.isEmpty()) + break; + + if (!setNextSlice()) + break; + + inSlice = false; + + // The next index block now corresponds to the first block that may have columns for the newly set slice. + // So if it's different from the current block, we're done with this block. And in that case, we know + // that our prefetched columns won't match. + if (nextIndexIdx != lastDeserializedBlock) + { + if (reversed) + prefetched.clear(); + break; + } + + // Even if the next slice may have column in this blocks, if we're reversed, those columns have been + // prefetched and we're done with that block + if (reversed) + break; + + // otherwise, we will deal with that column at the next iteration + } + } + } + return true; + } + } + + private class SimpleBlockFetcher extends BlockFetcher + { + public SimpleBlockFetcher() throws IOException + { + // Since we have to deserialize in order and will read all slices might as well reverse the slices and + // behave as if it was not reversed + super(reversed ? slices.length - 1 : 0); + + // We remenber when we are whithin a slice to avoid some comparison + boolean inSlice = false; + + AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); + while (deserializer.hasNext()) + { + // col is before slice + // (If in slice, don't bother checking that until we change slice) + Composite start = currentStart(); + if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) + { + deserializer.skipNext(); + continue; + } + + // col is within slice + Composite finish = currentFinish(); + if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0) + { + inSlice = true; + addColumn(deserializer.readNext()); + } + // col is after slice. more slices? + else + { + inSlice = false; + if (!setNextSlice()) + break; + } + } + } + + protected boolean setNextSlice() + { + if (reversed) + { + if (currentSliceIdx <= 0) + return false; + + currentSliceIdx--; + } + else + { + if (currentSliceIdx >= slices.length - 1) + return false; + + currentSliceIdx++; + } + return true; + } + + protected boolean fetchMoreData() + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java new file mode 100644 index 0000000..07dc59a --- /dev/null +++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import java.io.IOException; +import java.util.*; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; + +class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator +{ + private ColumnFamily cf; + private final SSTableReader sstable; + private FileDataInput fileToClose; + private Iterator<OnDiskAtom> iter; + public final SortedSet<CellName> columns; + public final DecoratedKey key; + + public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns) + { + assert columns != null; + this.sstable = sstable; + this.columns = columns; + this.key = key; + + RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); + if (indexEntry == null) + return; + + try + { + read(sstable, null, indexEntry); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + finally + { + if (fileToClose != null) + FileUtils.closeQuietly(fileToClose); + } + } + + public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry) + { + assert columns != null; + this.sstable = sstable; + this.columns = columns; + this.key = key; + + try + { + read(sstable, file, indexEntry); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } + } + + private FileDataInput createFileDataInput(long position) + { + fileToClose = sstable.getFileDataInput(position); + return fileToClose; + } + + private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry) + throws IOException + { + List<IndexHelper.IndexInfo> indexList; + + // If the entry is not indexed or the index is not promoted, read from the row start + if (!indexEntry.isIndexed()) + { + if (file == null) + file = createFileDataInput(indexEntry.position); + else + file.seek(indexEntry.position); + + DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file)); + assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath()); + } + + indexList = indexEntry.columnsIndex(); + + if (!indexEntry.isIndexed()) + { + ColumnFamilySerializer serializer = ColumnFamily.serializer; + try + { + cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); + cf.delete(DeletionTime.serializer.deserialize(file)); + } + catch (Exception e) + { + throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e); + } + } + else + { + cf = ArrayBackedSortedColumns.factory.create(sstable.metadata); + cf.delete(indexEntry.deletionTime()); + } + + List<OnDiskAtom> result = new ArrayList<OnDiskAtom>(); + if (indexList.isEmpty()) + { + readSimpleColumns(file, columns, result); + } + else + { + readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result); + } + + // create an iterator view of the columns we read + iter = result.iterator(); + } + + private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result) + { + Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version); + int n = 0; + while (atomIterator.hasNext()) + { + OnDiskAtom column = atomIterator.next(); + if (column instanceof Cell) + { + if (columnNames.contains(column.name())) + { + result.add(column); + if (++n >= columns.size()) + break; + } + } + else + { + result.add(column); + } + } + } + + private void readIndexedColumns(CFMetaData metadata, + FileDataInput file, + SortedSet<CellName> columnNames, + List<IndexHelper.IndexInfo> indexList, + long basePosition, + List<OnDiskAtom> result) + throws IOException + { + /* get the various column ranges we have to read */ + CellNameType comparator = metadata.comparator; + List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>(); + int lastIndexIdx = -1; + for (CellName name : columnNames) + { + int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx); + if (index < 0 || index == indexList.size()) + continue; + IndexHelper.IndexInfo indexInfo = indexList.get(index); + // Check the index block does contain the column names and that we haven't inserted this block yet. + if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx) + continue; + + ranges.add(indexInfo); + lastIndexIdx = index; + } + + if (ranges.isEmpty()) + return; + + Iterator<CellName> toFetch = columnNames.iterator(); + CellName nextToFetch = toFetch.next(); + for (IndexHelper.IndexInfo indexInfo : ranges) + { + long positionToSeek = basePosition + indexInfo.offset; + + // With new promoted indexes, our first seek in the data file will happen at that point. + if (file == null) + file = createFileDataInput(positionToSeek); + + AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version); + file.seek(positionToSeek); + FileMark mark = file.mark(); + while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null) + { + int cmp = deserializer.compareNextTo(nextToFetch); + if (cmp == 0) + { + nextToFetch = toFetch.hasNext() ? toFetch.next() : null; + result.add(deserializer.readNext()); + continue; + } + + deserializer.skipNext(); + if (cmp > 0) + nextToFetch = toFetch.hasNext() ? toFetch.next() : null; + } + } + } + + public DecoratedKey getKey() + { + return key; + } + + public ColumnFamily getColumnFamily() + { + return cf; + } + + protected OnDiskAtom computeNext() + { + if (iter == null || !iter.hasNext()) + return endOfData(); + return iter.next(); + } + + public void close() throws IOException { } +}
