http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 1458461..a38e60f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -41,10 +41,9 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.dht.*; @@ -199,6 +198,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // not final since we need to be able to change level on a file. protected volatile StatsMetadata sstableMetadata; + public final SerializationHeader header; + protected final AtomicLong keyCacheHit = new AtomicLong(0); protected final AtomicLong keyCacheRequest = new AtomicLong(0); @@ -331,7 +332,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS String parentName = descriptor.cfname.substring(0, i); CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName); ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1)); - metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def)); + metadata = SecondaryIndex.newIndexMetadata(parent, def); } else { @@ -375,10 +376,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor; assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, - EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER); + Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types); + ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER); // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. @@ -392,8 +395,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), - statsMetadata, OpenReason.NORMAL); + SSTableReader sstable = internalOpen(descriptor, + components, + metadata, + partitioner, + System.currentTimeMillis(), + statsMetadata, + OpenReason.NORMAL, + header.toHeader(metadata)); // special implementation of load to use non-pooled SegmentedFile builders try(SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder(); @@ -421,10 +430,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor; assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor; - Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, - EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS)); + // For the 3.0+ sstable format, the (misnomed) stats component hold the serialization header which we need to deserialize the sstable content + assert !descriptor.version.storeRows() || components.contains(Component.STATS) : "Stats component is missing for sstable " + descriptor; + + EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER); + Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor, types); ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION); StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS); + SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER); + assert !descriptor.version.storeRows() || header != null; // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. @@ -438,8 +452,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length()); - SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(), - statsMetadata, OpenReason.NORMAL); + SSTableReader sstable = internalOpen(descriptor, + components, + metadata, + partitioner, + System.currentTimeMillis(), + statsMetadata, + OpenReason.NORMAL, + header == null ? null : header.toHeader(metadata)); // load index and filter long start = System.nanoTime(); @@ -520,11 +540,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS IFilter bf, long maxDataAge, StatsMetadata sstableMetadata, - OpenReason openReason) + OpenReason openReason, + SerializationHeader header) { assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; - SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); reader.bf = bf; reader.ifile = ifile; @@ -542,11 +563,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, - OpenReason openReason) + OpenReason openReason, + SerializationHeader header) { Factory readerFactory = descriptor.getFormat().getReaderFactory(); - return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } protected SSTableReader(final Descriptor desc, @@ -555,13 +577,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS IPartitioner partitioner, long maxDataAge, StatsMetadata sstableMetadata, - OpenReason openReason) + OpenReason openReason, + SerializationHeader header) { super(desc, components, metadata, partitioner); this.sstableMetadata = sstableMetadata; + this.header = header; this.maxDataAge = maxDataAge; this.openReason = openReason; - this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata); + this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, desc.version, header); } public static long getTotalBytes(Iterable<SSTableReader> sstables) @@ -736,12 +760,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel)) { long indexPosition; - RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata); + RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header); while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); - RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version); + RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex); DecoratedKey decoratedKey = partitioner.decorateKey(key); if (first == null) first = decoratedKey; @@ -979,7 +1003,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS bf.sharedCopy(), maxDataAge, sstableMetadata, - reason); + reason, + header); replacement.first = newFirst; replacement.last = last; replacement.isSuspect.set(isSuspect.get()); @@ -1140,7 +1165,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away, * modulo downsampling of the index summary). Always returns a value >= 0 */ - public long getIndexScanPosition(RowPosition key) + public long getIndexScanPosition(PartitionPosition key) { if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0) key = first; @@ -1287,8 +1312,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS for (Range<Token> range : Range.normalize(ranges)) { - RowPosition leftPosition = range.left.maxKeyBound(); - RowPosition rightPosition = range.right.maxKeyBound(); + PartitionPosition leftPosition = range.left.maxKeyBound(); + PartitionPosition rightPosition = range.right.maxKeyBound(); int left = summary.binarySearch(leftPosition); if (left < 0) @@ -1382,9 +1407,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { assert !range.isWrapAround() || range.right.isMinimum(); // truncate the range so it at most covers the sstable - AbstractBounds<RowPosition> bounds = Range.makeRowRange(range); - RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound(); - RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right; + AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range); + PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound(); + PartitionPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right; if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0) continue; @@ -1455,14 +1480,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS /** * Get position updating key cache and stats. - * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean) + * @see #getPosition(PartitionPosition, SSTableReader.Operator, boolean) */ - public RowIndexEntry getPosition(RowPosition key, Operator op) + public RowIndexEntry getPosition(PartitionPosition key, Operator op) { return getPosition(key, op, true, false); } - public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats) + public RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats) { return getPosition(key, op, updateCacheAndStats, false); } @@ -1473,20 +1498,15 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * @param updateCacheAndStats true if updating stats and cache * @return The index entry corresponding to the key, or null if the key is not present */ - protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast); + protected abstract RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast); - //Corresponds to a name column - public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns); - public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry); - - //Corresponds to a slice query - public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse); - public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry); + public abstract SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift); + public abstract SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift); /** * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. */ - public DecoratedKey firstKeyBeyond(RowPosition token) + public DecoratedKey firstKeyBeyond(PartitionPosition token) { if (token.compareTo(first) < 0) return first; @@ -1589,19 +1609,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return getScanner((RateLimiter) null); } - public ISSTableScanner getScanner(RateLimiter limiter) - { - return getScanner(DataRange.allData(partitioner), limiter); - } - /** - * + * @param columns the columns to return. * @param dataRange filter to use when reading the columns * @return A Scanner for seeking over the rows of the SSTable. */ - public ISSTableScanner getScanner(DataRange dataRange) + public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, boolean isForThrift) { - return getScanner(dataRange, null); + return getScanner(columns, dataRange, null, isForThrift); } /** @@ -1618,6 +1633,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } /** + * Direct I/O SSTableScanner over the entirety of the sstable.. + * + * @return A Scanner over the full content of the SSTable. + */ + public abstract ISSTableScanner getScanner(RateLimiter limiter); + + /** * Direct I/O SSTableScanner over a defined collection of ranges of tokens. * * @param ranges the range of keys to cover @@ -1626,11 +1648,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter); /** - * + * @param columns the columns to return. * @param dataRange filter to use when reading the columns * @return A Scanner for seeking over the rows of the SSTable. */ - public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter); + public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift); @@ -1761,6 +1783,43 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return sstableMetadata.maxTimestamp; } + public int getMinLocalDeletionTime() + { + return sstableMetadata.minLocalDeletionTime; + } + + public int getMaxLocalDeletionTime() + { + return sstableMetadata.maxLocalDeletionTime; + } + + public int getMinTTL() + { + return sstableMetadata.minTTL; + } + + public int getMaxTTL() + { + return sstableMetadata.maxTTL; + } + + public long getTotalColumnsSet() + { + return sstableMetadata.totalColumnsSet; + } + + public long getTotalRows() + { + return sstableMetadata.totalRows; + } + + public int getAvgColumnSetPerRow() + { + return sstableMetadata.totalRows < 0 + ? -1 + : (sstableMetadata.totalRows == 0 ? 0 : (int)(sstableMetadata.totalColumnsSet / sstableMetadata.totalRows)); + } + public Set<Integer> getAncestors() { try @@ -1850,6 +1909,34 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS readMeter.mark(); } + /** + * Checks if this sstable can overlap with another one based on the min/man clustering values. + * If this methods return false, we're guarantee that {@code this} and {@code other} have no overlapping + * data, i.e. no cells to reconcile. + */ + public boolean mayOverlapsWith(SSTableReader other) + { + StatsMetadata m1 = getSSTableMetadata(); + StatsMetadata m2 = other.getSSTableMetadata(); + + if (m1.minClusteringValues.isEmpty() || m1.maxClusteringValues.isEmpty() || m2.minClusteringValues.isEmpty() || m2.maxClusteringValues.isEmpty()) + return true; + + return !(compare(m1.maxClusteringValues, m2.minClusteringValues) < 0 || compare(m1.minClusteringValues, m2.maxClusteringValues) > 0); + } + + private int compare(List<ByteBuffer> values1, List<ByteBuffer> values2) + { + ClusteringComparator comparator = metadata.comparator; + for (int i = 0; i < Math.min(values1.size(), values2.size()); i++) + { + int cmp = comparator.subtype(i).compare(values1.get(i), values2.get(i)); + if (cmp != 0) + return cmp; + } + return 0; + } + public static class SizeComparator implements Comparator<SSTableReader> { public int compare(SSTableReader o1, SSTableReader o2) @@ -2172,7 +2259,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, - OpenReason openReason); + OpenReason openReason, + SerializationHeader header); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index f99292e..c3c69b3 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -18,14 +18,19 @@ package org.apache.cassandra.io.sstable.format; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import com.google.common.collect.Sets; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -37,13 +42,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.concurrent.Transactional; -import java.io.DataInput; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - /** * This is the API all table writers must implement. * @@ -57,6 +55,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected final long keyCount; protected final MetadataCollector metadataCollector; protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + protected final SerializationHeader header; protected final TransactionalProxy txnProxy = txnProxy(); protected abstract TransactionalProxy txnProxy(); @@ -69,30 +68,37 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected boolean openResult; } - protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header) { super(descriptor, components(metadata), metadata, partitioner); this.keyCount = keyCount; this.repairedAt = repairedAt; this.metadataCollector = metadataCollector; - this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata); + this.header = header; + this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header); } - public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + public static SSTableWriter create(Descriptor descriptor, + Long keyCount, + Long repairedAt, + CFMetaData metadata, + IPartitioner partitioner, + MetadataCollector metadataCollector, + SerializationHeader header) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header); } - public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt) + public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, SerializationHeader header) { - return create(descriptor, keyCount, repairedAt, 0); + return create(descriptor, keyCount, repairedAt, 0, header); } - public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel) + public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner()); + return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header); } public static SSTableWriter create(CFMetaData metadata, @@ -100,20 +106,21 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, int sstableLevel, - IPartitioner partitioner) + IPartitioner partitioner, + SerializationHeader header) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector); + return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header); } - public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel) + public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel); + return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header); } - public static SSTableWriter create(String filename, long keyCount, long repairedAt) + public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header) { - return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0); + return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header); } private static Set<Component> components(CFMetaData metadata) @@ -141,19 +148,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional return components; } - public abstract void mark(); - /** - * @param row - * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + * Appends partition data to this writer. + * + * @param iterator the partition to write + * @return the created index entry if something was written, that is if {@code iterator} + * wasn't empty, {@code null} otherwise. + * + * @throws FSWriteError if a write to the dataFile fails */ - public abstract RowIndexEntry append(AbstractCompactedRow row); - - public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf); - - public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException; + public abstract RowIndexEntry append(UnfilteredRowIterator iterator); public abstract long getFilePointer(); @@ -244,7 +250,9 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected Map<MetadataType, MetadataComponent> finalizeMetadata() { return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), - metadata.getBloomFilterFpChance(), repairedAt); + metadata.getBloomFilterFpChance(), + repairedAt, + header); } protected StatsMetadata statsMetadata() @@ -276,6 +284,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional public static abstract class Factory { - public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector); + public abstract SSTableWriter open(Descriptor descriptor, + long keyCount, + long repairedAt, + CFMetaData metadata, + IPartitioner partitioner, + MetadataCollector metadataCollector, + SerializationHeader header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index faaa89e..8077a45 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -52,6 +52,10 @@ public abstract class Version public abstract boolean hasNewFileName(); + public abstract boolean storeRows(); + + public abstract int correspondingMessagingVersion(); // Only use by storage that 'storeRows' so far + public String getVersion() { return version; @@ -73,6 +77,7 @@ public abstract class Version } abstract public boolean isCompatible(); + abstract public boolean isCompatibleForStreaming(); @Override public String toString() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index a1e32cf..fd0b5d5 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -17,16 +17,14 @@ */ package org.apache.cassandra.io.sstable.format.big; +import java.util.Iterator; +import java.util.Set; + import com.google.common.collect.ImmutableList; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.AbstractCell; -import org.apache.cassandra.db.ColumnSerializer; -import org.apache.cassandra.db.OnDiskAtom; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.LazilyCompactedRow; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -38,9 +36,7 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileDataInput; - -import java.util.Iterator; -import java.util.Set; +import org.apache.cassandra.net.MessagingService; /** * Legacy bigtable format @@ -82,38 +78,26 @@ public class BigFormat implements SSTableFormat } @Override - public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version) - { - return AbstractCell.onDiskIterator(in, flag, expireBefore, version, cfm.comparator); - } - - @Override - public AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators) - { - return new LazilyCompactedRow(controller, onDiskAtomIterators); - } - - @Override - public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData cfMetaData) + public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData metadata, Version version, SerializationHeader header) { - return new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(cfMetaData.comparator)); + return new RowIndexEntry.Serializer(metadata, version, header); } static class WriterFactory extends SSTableWriter.Factory { @Override - public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + public SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header); } } static class ReaderFactory extends SSTableReader.Factory { @Override - public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason) + public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header) { - return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } } @@ -143,6 +127,8 @@ public class BigFormat implements SSTableFormat private final boolean hasRepairedAt; private final boolean tracksLegacyCounterShards; private final boolean newFileName; + public final boolean storeRows; + public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far public BigVersion(String version) { @@ -155,6 +141,8 @@ public class BigFormat implements SSTableFormat hasRepairedAt = version.compareTo("ka") >= 0; tracksLegacyCounterShards = version.compareTo("ka") >= 0; newFileName = version.compareTo("la") >= 0; + storeRows = version.compareTo("la") >= 0; + correspondingMessagingVersion = storeRows ? MessagingService.VERSION_30 : MessagingService.VERSION_21; } @Override @@ -200,9 +188,27 @@ public class BigFormat implements SSTableFormat } @Override + public boolean storeRows() + { + return storeRows; + } + + @Override + public int correspondingMessagingVersion() + { + return correspondingMessagingVersion; + } + + @Override public boolean isCompatible() { return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0); } + + @Override + public boolean isCompatibleForStreaming() + { + return isCompatible() && version.charAt(0) == current_version.charAt(0); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 3f375e7..7a7b913 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -20,13 +20,11 @@ package org.apache.cassandra.io.sstable.format.big; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.columniterator.SSTableIterator; +import org.apache.cassandra.db.columniterator.SSTableReversedIterator; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -37,7 +35,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; @@ -55,41 +52,45 @@ public class BigTableReader extends SSTableReader { private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); - BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason) + BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) { - super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } - public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns) + public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) { - return new SSTableNamesIterator(this, key, columns); + return reversed + ? new SSTableReversedIterator(this, key, selectedColumns, isForThrift) + : new SSTableIterator(this, key, selectedColumns, isForThrift); } - public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry ) + public SliceableUnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) { - return new SSTableNamesIterator(this, input, key, columns, indexEntry); + return reversed + ? new SSTableReversedIterator(this, file, key, indexEntry, selectedColumns, isForThrift) + : new SSTableIterator(this, file, key, indexEntry, selectedColumns, isForThrift); } - public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse) + /** + * @param columns the columns to return. + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift) { - return new SSTableSliceIterator(this, key, slices, reverse); + return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift); } - public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry) - { - return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry); - } /** + * Direct I/O SSTableScanner over the full sstable. * - * @param dataRange filter to use when reading the columns - * @return A Scanner for seeking over the rows of the SSTable. + * @return A Scanner for reading the full SSTable. */ - public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter) + public ISSTableScanner getScanner(RateLimiter limiter) { - return BigTableScanner.getScanner(this, dataRange, limiter); + return BigTableScanner.getScanner(this, limiter); } - /** * Direct I/O SSTableScanner over a defined collection of ranges of tokens. * @@ -109,7 +110,7 @@ public class BigTableReader extends SSTableReader * @param updateCacheAndStats true if updating stats and cache * @return The index entry corresponding to the key, or null if the key is not present */ - protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast) + protected RowIndexEntry getPosition(PartitionPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast) { if (op == Operator.EQ) { @@ -212,7 +213,7 @@ public class BigTableReader extends SSTableReader if (opSatisfied) { // read data position from index entry - RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version); + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in); if (exactMatch && updateCacheAndStats) { assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index d477152..0794e90 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -22,16 +22,13 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Ordering; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.RateLimiter; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.columniterator.IColumnIteratorFactory; -import org.apache.cassandra.db.columniterator.LazyColumnIterator; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.AbstractBounds.Boundary; import org.apache.cassandra.dht.Bounds; @@ -57,18 +54,27 @@ public class BigTableScanner implements ISSTableScanner protected final RandomAccessReader ifile; public final SSTableReader sstable; - private final Iterator<AbstractBounds<RowPosition>> rangeIterator; - private AbstractBounds<RowPosition> currentRange; + private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator; + private AbstractBounds<PartitionPosition> currentRange; + private final ColumnFilter columns; private final DataRange dataRange; private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + private final boolean isForThrift; - protected Iterator<OnDiskAtomIterator> iterator; + protected UnfilteredPartitionIterator iterator; - public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + // Full scan of the sstables + public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter) { - return new BigTableScanner(sstable, dataRange, limiter); + return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, Iterators.singletonIterator(fullRange(sstable))); } + + public static ISSTableScanner getScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift) + { + return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator()); + } + public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) { // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249) @@ -76,60 +82,54 @@ public class BigTableScanner implements ISSTableScanner if (positions.isEmpty()) return new EmptySSTableScanner(sstable.getFilename()); - return new BigTableScanner(sstable, tokenRanges, limiter); + return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, limiter, false, makeBounds(sstable, tokenRanges).iterator()); } - /** - * @param sstable SSTable to scan; must not be null - * @param dataRange a single range to scan; must not be null - * @param limiter background i/o RateLimiter; may be null - */ - private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter) + private BigTableScanner(SSTableReader sstable, ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift, Iterator<AbstractBounds<PartitionPosition>> rangeIterator) { assert sstable != null; this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); this.ifile = sstable.openIndexReader(); this.sstable = sstable; + this.columns = columns; this.dataRange = dataRange; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); - - List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2); - addRange(dataRange.keyRange(), boundsList); - this.rangeIterator = boundsList.iterator(); + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, + sstable.descriptor.version, + sstable.header); + this.isForThrift = isForThrift; + this.rangeIterator = rangeIterator; } - /** - * @param sstable SSTable to scan; must not be null - * @param tokenRanges A set of token ranges to scan - * @param limiter background i/o RateLimiter; may be null - */ - private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter) + private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges) { - assert sstable != null; - - this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter); - this.ifile = sstable.openIndexReader(); - this.sstable = sstable; - this.dataRange = null; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); - - List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(tokenRanges.size()); + List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size()); for (Range<Token> range : Range.normalize(tokenRanges)) - addRange(Range.makeRowRange(range), boundsList); + addRange(sstable, Range.makeRowRange(range), boundsList); + return boundsList; + } - this.rangeIterator = boundsList.iterator(); + private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, DataRange dataRange) + { + List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(2); + addRange(sstable, dataRange.keyRange(), boundsList); + return boundsList; + } + + private static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable) + { + return new Bounds<PartitionPosition>(sstable.first, sstable.last); } - private void addRange(AbstractBounds<RowPosition> requested, List<AbstractBounds<RowPosition>> boundsList) + private static void addRange(SSTableReader sstable, AbstractBounds<PartitionPosition> requested, List<AbstractBounds<PartitionPosition>> boundsList) { if (requested instanceof Range && ((Range)requested).isWrapAround()) { if (requested.right.compareTo(sstable.first) >= 0) { // since we wrap, we must contain the whole sstable prior to stopKey() - Boundary<RowPosition> left = new Boundary<RowPosition>(sstable.first, true); - Boundary<RowPosition> right; + Boundary<PartitionPosition> left = new Boundary<PartitionPosition>(sstable.first, true); + Boundary<PartitionPosition> right; right = requested.rightBoundary(); right = minRight(right, sstable.last, true); if (!isEmpty(left, right)) @@ -138,8 +138,8 @@ public class BigTableScanner implements ISSTableScanner if (requested.left.compareTo(sstable.last) <= 0) { // since we wrap, we must contain the whole sstable after dataRange.startKey() - Boundary<RowPosition> right = new Boundary<RowPosition>(sstable.last, true); - Boundary<RowPosition> left; + Boundary<PartitionPosition> right = new Boundary<PartitionPosition>(sstable.last, true); + Boundary<PartitionPosition> left; left = requested.leftBoundary(); left = maxLeft(left, sstable.first, true); if (!isEmpty(left, right)) @@ -149,12 +149,12 @@ public class BigTableScanner implements ISSTableScanner else { assert requested.left.compareTo(requested.right) <= 0 || requested.right.isMinimum(); - Boundary<RowPosition> left, right; + Boundary<PartitionPosition> left, right; left = requested.leftBoundary(); right = requested.rightBoundary(); left = maxLeft(left, sstable.first, true); // apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping - right = requested.right.isMinimum() ? new Boundary<RowPosition>(sstable.last, true) + right = requested.right.isMinimum() ? new Boundary<PartitionPosition>(sstable.last, true) : minRight(right, sstable.last, true); if (!isEmpty(left, right)) boundsList.add(AbstractBounds.bounds(left, right)); @@ -193,10 +193,18 @@ public class BigTableScanner implements ISSTableScanner } } - public void close() throws IOException + public void close() { - if (isClosed.compareAndSet(false, true)) - FileUtils.close(dfile, ifile); + try + { + if (isClosed.compareAndSet(false, true)) + FileUtils.close(dfile, ifile); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } } public long getLengthInBytes() @@ -214,6 +222,11 @@ public class BigTableScanner implements ISSTableScanner return sstable.toString(); } + public boolean isForThrift() + { + return isForThrift; + } + public boolean hasNext() { if (iterator == null) @@ -221,7 +234,7 @@ public class BigTableScanner implements ISSTableScanner return iterator.hasNext(); } - public OnDiskAtomIterator next() + public UnfilteredRowIterator next() { if (iterator == null) iterator = createIterator(); @@ -233,19 +246,24 @@ public class BigTableScanner implements ISSTableScanner throw new UnsupportedOperationException(); } - private Iterator<OnDiskAtomIterator> createIterator() + private UnfilteredPartitionIterator createIterator() { return new KeyScanningIterator(); } - protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator> + protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator { private DecoratedKey nextKey; private RowIndexEntry nextEntry; private DecoratedKey currentKey; private RowIndexEntry currentEntry; - protected OnDiskAtomIterator computeNext() + public boolean isForThrift() + { + return isForThrift; + } + + protected UnfilteredRowIterator computeNext() { try { @@ -264,7 +282,7 @@ public class BigTableScanner implements ISSTableScanner return endOfData(); currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + currentEntry = rowIndexEntrySerializer.deserialize(ifile); } while (!currentRange.contains(currentKey)); } else @@ -283,7 +301,7 @@ public class BigTableScanner implements ISSTableScanner { // we need the position of the start of the next key, regardless of whether it falls in the current range nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile)); - nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version); + nextEntry = rowIndexEntrySerializer.deserialize(ifile); if (!currentRange.contains(nextKey)) { @@ -292,21 +310,34 @@ public class BigTableScanner implements ISSTableScanner } } - if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey())) + /* + * For a given partition key, we want to avoid hitting the data + * file unless we're explicitely asked to. This is important + * for PartitionRangeReadCommand#checkCacheFilter. + */ + return new LazilyInitializedUnfilteredRowIterator(currentKey) { - dfile.seek(currentEntry.position + currentEntry.headerOffset()); - ByteBufferUtil.readWithShortLength(dfile); // key - return new SSTableIdentityIterator(sstable, dfile, currentKey); - } - - return new LazyColumnIterator(currentKey, new IColumnIteratorFactory() - { - public OnDiskAtomIterator create() + protected UnfilteredRowIterator initializeIterator() { - return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry); + try + { + if (dataRange == null) + { + dfile.seek(currentEntry.position + currentEntry.headerOffset()); + ByteBufferUtil.readWithShortLength(dfile); // key + return new SSTableIdentityIterator(sstable, dfile, partitionKey()); + } + + ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey()); + return filter.filter(sstable.iterator(dfile, partitionKey(), currentEntry, columns, filter.isReversed(), isForThrift)); + } + catch (CorruptSSTableException | IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, sstable.getFilename()); + } } - }); - + }; } catch (CorruptSSTableException | IOException e) { @@ -314,6 +345,11 @@ public class BigTableScanner implements ISSTableScanner throw new CorruptSSTableException(e, sstable.getFilename()); } } + + public void close() + { + BigTableScanner.this.close(); + } } @Override @@ -326,7 +362,7 @@ public class BigTableScanner implements ISSTableScanner ")"; } - public static class EmptySSTableScanner implements ISSTableScanner + public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner { private final String filename; @@ -350,20 +386,19 @@ public class BigTableScanner implements ISSTableScanner return filename; } + public boolean isForThrift() + { + return false; + } + public boolean hasNext() { return false; } - public OnDiskAtomIterator next() + public UnfilteredRowIterator next() { return null; } - - public void close() throws IOException { } - - public void remove() { } } - - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 30b55a0..66b8ac0 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -18,10 +18,7 @@ package org.apache.cassandra.io.sstable.format.big; import java.io.*; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -29,12 +26,13 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.sstable.format.Version; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; @@ -47,7 +45,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FilterFactory; import org.apache.cassandra.utils.IFilter; -import org.apache.cassandra.utils.StreamingHistogram; import org.apache.cassandra.utils.concurrent.Transactional; import static org.apache.cassandra.utils.Throwables.merge; @@ -66,9 +63,9 @@ public class BigTableWriter extends SSTableWriter private DecoratedKey lastWrittenKey; private FileMark dataMark; - BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector) + public BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header) { - super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector); + super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header); if (compression) { @@ -124,21 +121,38 @@ public class BigTableWriter extends SSTableWriter } /** - * @param row - * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + * Appends partition data to this writer. + * + * @param iterator the partition to write + * @return the created index entry if something was written, that is if {@code iterator} + * wasn't empty, {@code null} otherwise. + * + * @throws FSWriteError if a write to the dataFile fails */ - public RowIndexEntry append(AbstractCompactedRow row) + public RowIndexEntry append(UnfilteredRowIterator iterator) { - long startPosition = beforeAppend(row.key); - RowIndexEntry entry; - try + DecoratedKey key = iterator.partitionKey(); + + if (key.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + { + logger.error("Key size {} exceeds maximum of {}, skipping row", key.getKey().remaining(), FBUtilities.MAX_UNSIGNED_SHORT); + return null; + } + + if (iterator.isEmpty()) + return null; + + long startPosition = beforeAppend(key); + + try (StatsCollector withStats = new StatsCollector(iterator, metadataCollector)) { - entry = row.write(startPosition, dataFile); - if (entry == null) - return null; + ColumnIndex index = ColumnIndex.writeAndBuildIndex(withStats, dataFile, header, descriptor.version); + + RowIndexEntry entry = RowIndexEntry.create(startPosition, iterator.partitionLevelDeletion(), index); + long endPosition = dataFile.getFilePointer(); - metadataCollector.update(endPosition - startPosition, row.columnStats()); - afterAppend(row.key, endPosition, entry); + metadataCollector.addPartitionSizeInBytes(endPosition - startPosition); + afterAppend(key, endPosition, entry); return entry; } catch (IOException e) @@ -147,130 +161,77 @@ public class BigTableWriter extends SSTableWriter } } - public void append(DecoratedKey decoratedKey, ColumnFamily cf) + private static class StatsCollector extends WrappingUnfilteredRowIterator { - if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT) - { - logger.error("Key size {} exceeds maximum of {}, skipping row", - decoratedKey.getKey().remaining(), - FBUtilities.MAX_UNSIGNED_SHORT); - return; - } + private int cellCount; + private final MetadataCollector collector; + private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>(); - long startPosition = beforeAppend(decoratedKey); - long endPosition; - try - { - RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); - endPosition = dataFile.getFilePointer(); - afterAppend(decoratedKey, endPosition, entry); - } - catch (IOException e) + StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector) { - throw new FSWriteError(e, dataFile.getPath()); + super(iter); + this.collector = collector; + collector.update(iter.partitionLevelDeletion()); } - metadataCollector.update(endPosition - startPosition, cf.getColumnStats()); - } - - private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException - { - assert cf.hasColumns() || cf.isMarkedForDelete(); - ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out); - ColumnIndex index = builder.build(cf); + @Override + public Unfiltered next() + { + Unfiltered unfiltered = super.next(); + collector.updateClusteringValues(unfiltered.clustering()); - out.writeShort(END_OF_ROW); - return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); - } + switch (unfiltered.kind()) + { + case ROW: + Row row = (Row) unfiltered; + collector.update(row.primaryKeyLivenessInfo()); + collector.update(row.deletion()); - /** - * @throws IOException if a read from the DataInput fails - * @throws FSWriteError if a write to the dataFile fails - */ - public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException - { - long currentPosition = beforeAppend(key); + int simpleColumnsSet = 0; + complexColumnsSetInRow.clear(); - ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE); - ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE); - ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE); - List<ByteBuffer> minColumnNames = Collections.emptyList(); - List<ByteBuffer> maxColumnNames = Collections.emptyList(); - StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); - boolean hasLegacyCounterShards = false; + for (Cell cell : row) + { + if (cell.column().isComplex()) + complexColumnsSetInRow.add(cell.column()); + else + ++simpleColumnsSet; - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); - cf.delete(DeletionTime.serializer.deserialize(in)); + ++cellCount; + collector.update(cell.livenessInfo()); - ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream); + if (cell.isCounterCell()) + collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell)); + } - if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE) - { - tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); - maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime); - minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); - maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt); - } + for (int i = 0; i < row.columns().complexColumnCount(); i++) + collector.update(row.getDeletion(row.columns().getComplex(i))); - Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator(); - while (rangeTombstoneIterator.hasNext()) - { - RangeTombstone rangeTombstone = rangeTombstoneIterator.next(); - tombstones.update(rangeTombstone.getLocalDeletionTime()); - minTimestampTracker.update(rangeTombstone.timestamp()); - maxTimestampTracker.update(rangeTombstone.timestamp()); - maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime()); - minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator); - maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator); - } + collector.updateColumnSetPerRow(simpleColumnsSet + complexColumnsSetInRow.size()); - Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator); - try - { - while (iter.hasNext()) - { - OnDiskAtom atom = iter.next(); - if (atom == null) break; - - if (atom instanceof CounterCell) - { - atom = ((CounterCell) atom).markLocalToBeCleared(); - hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards(); - } - - int deletionTime = atom.getLocalDeletionTime(); - if (deletionTime < Integer.MAX_VALUE) - tombstones.update(deletionTime); - minTimestampTracker.update(atom.timestamp()); - maxTimestampTracker.update(atom.timestamp()); - minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator); - maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator); - maxDeletionTimeTracker.update(atom.getLocalDeletionTime()); - - columnIndexer.add(atom); // This write the atom on disk too + case RANGE_TOMBSTONE_MARKER: + if (((RangeTombstoneMarker) unfiltered).isBoundary()) + { + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)unfiltered; + collector.update(bm.endDeletionTime()); + collector.update(bm.startDeletionTime()); + } + else + { + collector.update(((RangeTombstoneBoundMarker)unfiltered).deletionTime()); + } + break; } - - columnIndexer.maybeWriteEmptyRowHeader(); - dataFile.stream.writeShort(END_OF_ROW); + return unfiltered; } - catch (IOException e) + + @Override + public void close() { - throw new FSWriteError(e, dataFile.getPath()); + collector.addCellPerPartitionCount(cellCount); + super.close(); } - - metadataCollector.updateMinTimestamp(minTimestampTracker.get()) - .updateMaxTimestamp(maxTimestampTracker.get()) - .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get()) - .addRowSize(dataFile.getFilePointer() - currentPosition) - .addColumnCount(columnIndexer.writtenAtomCount()) - .mergeTombstoneHistogram(tombstones) - .updateMinColumnNames(minColumnNames) - .updateMaxColumnNames(maxColumnNames) - .updateHasLegacyCounterShards(hasLegacyCounterShards); - - afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build())); - return currentPosition; } private Descriptor makeTmpLinks() @@ -303,7 +264,7 @@ public class BigTableWriter extends SSTableWriter components, metadata, partitioner, ifile, dfile, iwriter.summary.build(partitioner, boundary), - iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY); + iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header); // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed) sstable.first = getMinimalKey(first); @@ -339,7 +300,8 @@ public class BigTableWriter extends SSTableWriter iwriter.bf.sharedCopy(), maxDataAge, stats, - openReason); + openReason, + header); sstable.first = getMinimalKey(first); sstable.last = getMinimalKey(last); return sstable;
