http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
new file mode 100644
index 0000000..eb43968
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.AbstractCell;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.FileDataInput;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Legacy bigtable format
+ */
+public class BigFormat implements SSTableFormat
+{
+    public static final BigFormat instance = new BigFormat();
+    public static final BigVersion latestVersion = new 
BigVersion(BigVersion.current_version);
+    private static final SSTableReader.Factory readerFactory = new 
ReaderFactory();
+    private static final SSTableWriter.Factory writerFactory = new 
WriterFactory();
+
+    private BigFormat()
+    {
+
+    }
+
+    @Override
+    public Version getLatestVersion()
+    {
+        return latestVersion;
+    }
+
+    @Override
+    public Version getVersion(String version)
+    {
+        return new BigVersion(version);
+    }
+
+    @Override
+    public SSTableWriter.Factory getWriterFactory()
+    {
+        return writerFactory;
+    }
+
+    @Override
+    public SSTableReader.Factory getReaderFactory()
+    {
+        return readerFactory;
+    }
+
+    @Override
+    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, 
ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version)
+    {
+        return AbstractCell.onDiskIterator(in, flag, expireBefore, version, 
cfm.comparator);
+    }
+
+    @Override
+    public AbstractCompactedRow getCompactedRowWriter(CompactionController 
controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators)
+    {
+        return new LazilyCompactedRow(controller, onDiskAtomIterators);
+    }
+
+    @Override
+    public RowIndexEntry.IndexSerializer getIndexSerializer(CFMetaData 
cfMetaData)
+    {
+        return new RowIndexEntry.Serializer(new 
IndexHelper.IndexInfo.Serializer(cfMetaData.comparator));
+    }
+
+    static class WriterFactory extends SSTableWriter.Factory
+    {
+        @Override
+        public SSTableWriter open(Descriptor descriptor, long keyCount, long 
repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector 
metadataCollector)
+        {
+            return new BigTableWriter(descriptor, keyCount, repairedAt, 
metadata, partitioner, metadataCollector);
+        }
+    }
+
+    static class ReaderFactory extends SSTableReader.Factory
+    {
+        @Override
+        public SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, 
StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason)
+        {
+            return new BigTableReader(descriptor, components, metadata, 
partitioner, maxDataAge, sstableMetadata, openReason);
+        }
+    }
+
+    // versions are denoted as [major][minor].  Minor versions must be 
forward-compatible:
+    // new fields are allowed in e.g. the metadata component, but fields can't 
be removed
+    // or have their size changed.
+    //
+    // Minor versions were introduced with version "hb" for Cassandra 1.0.3; 
prior to that,
+    // we always incremented the major version.
+    static class BigVersion extends Version
+    {
+        public static final String current_version = "la";
+        public static final String earliest_supported_version = "ja";
+
+        // ja (2.0.0): super columns are serialized as composites (note that 
there is no real format change,
+        //               this is mostly a marker to know if we should expect 
super columns or not. We do need
+        //               a major version bump however, because we should not 
allow streaming of super columns
+        //               into this new format)
+        //             tracks max local deletiontime in sstable metadata
+        //             records bloom_filter_fp_chance in metadata component
+        //             remove data size and column count from data file 
(CASSANDRA-4180)
+        //             tracks max/min column values (according to comparator)
+        // jb (2.0.1): switch from crc32 to adler32 for compression checksums
+        //             checksum the compressed data
+        // ka (2.1.0): new Statistics.db file format
+        //             index summaries can be downsampled and the sampling 
level is persisted
+        //             switch uncompressed checksums to adler32
+        //             tracks presense of legacy (local and remote) counter 
shards
+        // la (3.0.0): new file name format
+
+        private final boolean isLatestVersion;
+        private final boolean hasPostCompressionAdlerChecksums;
+        private final boolean hasSamplingLevel;
+        private final boolean newStatsFile;
+        private final boolean hasAllAdlerChecksums;
+        private final boolean hasRepairedAt;
+        private final boolean tracksLegacyCounterShards;
+        private final boolean newFileName;
+
+        public BigVersion(String version)
+        {
+            super(instance,version);
+
+            isLatestVersion = version.compareTo(current_version) == 0;
+            hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
+            hasSamplingLevel = version.compareTo("ka") >= 0;
+            newStatsFile = version.compareTo("ka") >= 0;
+            hasAllAdlerChecksums = version.compareTo("ka") >= 0;
+            hasRepairedAt = version.compareTo("ka") >= 0;
+            tracksLegacyCounterShards = version.compareTo("ka") >= 0;
+            newFileName = version.compareTo("la") >= 0;
+        }
+
+        @Override
+        public boolean isLatestVersion()
+        {
+            return isLatestVersion;
+        }
+
+        @Override
+        public boolean hasPostCompressionAdlerChecksums()
+        {
+            return hasPostCompressionAdlerChecksums;
+        }
+
+        @Override
+        public boolean hasSamplingLevel()
+        {
+            return hasSamplingLevel;
+        }
+
+        @Override
+        public boolean hasNewStatsFile()
+        {
+            return newStatsFile;
+        }
+
+        @Override
+        public boolean hasAllAdlerChecksums()
+        {
+            return hasAllAdlerChecksums;
+        }
+
+        @Override
+        public boolean hasRepairedAt()
+        {
+            return hasRepairedAt;
+        }
+
+        @Override
+        public boolean tracksLegacyCounterShards()
+        {
+            return tracksLegacyCounterShards;
+        }
+
+        @Override
+        public boolean hasNewFileName()
+        {
+            return newFileName;
+        }
+
+        @Override
+        public boolean isCompatible()
+        {
+            return version.compareTo(earliest_supported_version) >= 0 && 
version.charAt(0) <= current_version.charAt(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
new file mode 100644
index 0000000..2488f86
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * SSTableReaders are open()ed by Keyspace.onStart; after that they are 
created by SSTableWriter.renameAndOpen.
+ * Do not re-call open() on existing SSTable files; use the references kept by 
ColumnFamilyStore post-start instead.
+ */
+public class BigTableReader extends SSTableReader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BigTableReader.class);
+
+    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData 
metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata 
sstableMetadata, OpenReason openReason)
+    {
+        super(desc, components, metadata, partitioner, maxDataAge, 
sstableMetadata, openReason);
+    }
+
+    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> 
columns)
+    {
+        return new SSTableNamesIterator(this, key, columns);
+    }
+
+    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, 
SortedSet<CellName> columns, RowIndexEntry indexEntry )
+    {
+        return new SSTableNamesIterator(this, input, key, columns, indexEntry);
+    }
+
+    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, 
boolean reverse)
+    {
+        return new SSTableSliceIterator(this, key, slices, reverse);
+    }
+
+    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, 
ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
+    {
+        return new SSTableSliceIterator(this, input, key, slices, reverse, 
indexEntry);
+    }
+    /**
+     *
+     * @param dataRange filter to use when reading the columns
+     * @return A Scanner for seeking over the rows of the SSTable.
+     */
+    public ICompactionScanner getScanner(DataRange dataRange, RateLimiter 
limiter)
+    {
+        return new BigTableScanner(this, dataRange, limiter);
+    }
+
+
+    /**
+     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
+     *
+     * @param ranges the range of keys to cover
+     * @return A Scanner for seeking over the rows of the SSTable.
+     */
+    public ICompactionScanner getScanner(Collection<Range<Token>> ranges, 
RateLimiter limiter)
+    {
+        // We want to avoid allocating a SSTableScanner if the range don't 
overlap the sstable (#5249)
+        List<Pair<Long, Long>> positions = 
getPositionsForRanges(Range.normalize(ranges));
+        if (positions.isEmpty())
+            return new EmptyCompactionScanner(getFilename());
+        else
+            return new BigTableScanner(this, ranges, limiter);
+    }
+
+
+    /**
+     * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
+     * allow key selection by token bounds but only if op != * EQ
+     * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
+     * @param updateCacheAndStats true if updating stats and cache
+     * @return The index entry corresponding to the key, or null if the key is 
not present
+     */
+    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean 
updateCacheAndStats)
+    {
+        // first, check bloom filter
+        if (op == Operator.EQ)
+        {
+            assert key instanceof DecoratedKey; // EQ only make sense if the 
key is a valid row key
+            if (!bf.isPresent(((DecoratedKey)key).getKey()))
+            {
+                Tracing.trace("Bloom filter allows skipping sstable {}", 
descriptor.generation);
+                return null;
+            }
+        }
+
+        // next, the key cache (only make sense for valid row key)
+        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof 
DecoratedKey))
+        {
+            DecoratedKey decoratedKey = (DecoratedKey)key;
+            KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, 
decoratedKey.getKey());
+            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, 
updateCacheAndStats);
+            if (cachedPosition != null)
+            {
+                Tracing.trace("Key cache hit for sstable {}", 
descriptor.generation);
+                return cachedPosition;
+            }
+        }
+
+        // check the smallest and greatest keys in the sstable to see if it 
can't be present
+        if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
+        {
+            if (op == Operator.EQ && updateCacheAndStats)
+                bloomFilterTracker.addFalsePositive();
+
+            if (op.apply(1) < 0)
+            {
+                Tracing.trace("Check against min and max keys allows skipping 
sstable {}", descriptor.generation);
+                return null;
+            }
+        }
+
+        int binarySearchResult = indexSummary.binarySearch(key);
+        long sampledPosition = 
getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
+        int sampledIndex = 
getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
+
+        // if we matched the -1th position, we'll start at the first position
+        sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
+
+        int effectiveInterval = 
indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
+
+        // scan the on-disk index, starting at the nearest sampled position.
+        // The check against IndexInterval is to be exit the loop in the EQ 
case when the key looked for is not present
+        // (bloom filter false positive). But note that for non-EQ cases, we 
might need to check the first key of the
+        // next index position because the searched key can be greater the 
last key of the index interval checked if it
+        // is lesser than the first key of next interval (and in that case we 
must return the position of the first key
+        // of the next interval).
+        int i = 0;
+        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
+        while (segments.hasNext() && i <= effectiveInterval)
+        {
+            FileDataInput in = segments.next();
+            try
+            {
+                while (!in.isEOF() && i <= effectiveInterval)
+                {
+                    i++;
+
+                    ByteBuffer indexKey = 
ByteBufferUtil.readWithShortLength(in);
+
+                    boolean opSatisfied; // did we find an appropriate 
position for the op requested
+                    boolean exactMatch; // is the current position an exact 
match for the key, suitable for caching
+
+                    // Compare raw keys if possible for performance, otherwise 
compare decorated keys.
+                    if (op == Operator.EQ)
+                    {
+                        opSatisfied = exactMatch = 
indexKey.equals(((DecoratedKey) key).getKey());
+                    }
+                    else
+                    {
+                        DecoratedKey indexDecoratedKey = 
partitioner.decorateKey(indexKey);
+                        int comparison = indexDecoratedKey.compareTo(key);
+                        int v = op.apply(comparison);
+                        opSatisfied = (v == 0);
+                        exactMatch = (comparison == 0);
+                        if (v < 0)
+                        {
+                            Tracing.trace("Partition index lookup allows 
skipping sstable {}", descriptor.generation);
+                            return null;
+                        }
+                    }
+
+                    if (opSatisfied)
+                    {
+                        // read data position from index entry
+                        RowIndexEntry indexEntry = 
rowIndexEntrySerializer.deserialize(in, descriptor.version);
+                        if (exactMatch && updateCacheAndStats)
+                        {
+                            assert key instanceof DecoratedKey; // key can be 
== to the index key only if it's a true row key
+                            DecoratedKey decoratedKey = (DecoratedKey)key;
+
+                            if (logger.isTraceEnabled())
+                            {
+                                // expensive sanity check!  see CASSANDRA-4687
+                                FileDataInput fdi = 
dfile.getSegment(indexEntry.position);
+                                DecoratedKey keyInDisk = 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+                                if (!keyInDisk.equals(key))
+                                    throw new AssertionError(String.format("%s 
!= %s in %s", keyInDisk, key, fdi.getPath()));
+                                fdi.close();
+                            }
+
+                            // store exact match for the key
+                            cacheKey(decoratedKey, indexEntry);
+                        }
+                        if (op == Operator.EQ && updateCacheAndStats)
+                            bloomFilterTracker.addTruePositive();
+                        Tracing.trace("Partition index with {} entries found 
for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
+                        return indexEntry;
+                    }
+
+                    RowIndexEntry.Serializer.skip(in);
+                }
+            }
+            catch (IOException e)
+            {
+                markSuspect();
+                throw new CorruptSSTableException(e, in.getPath());
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
+
+        if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
+            bloomFilterTracker.addFalsePositive();
+        Tracing.trace("Partition index lookup complete (bloom filter false 
positive) for sstable {}", descriptor.generation);
+        return null;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
new file mode 100644
index 0000000..c1fc079
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
+import org.apache.cassandra.db.columniterator.LazyColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class BigTableScanner implements ICompactionScanner
+{
+    protected final RandomAccessReader dfile;
+    protected final RandomAccessReader ifile;
+    public final SSTableReader sstable;
+
+    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
+    private AbstractBounds<RowPosition> currentRange;
+
+    private final DataRange dataRange;
+    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+    protected Iterator<OnDiskAtomIterator> iterator;
+
+    /**
+     * @param sstable SSTable to scan; must not be null
+     * @param dataRange a single range to scan; must not be null
+     * @param limiter background i/o RateLimiter; may be null
+     */
+    public BigTableScanner(SSTableReader sstable, DataRange dataRange, 
RateLimiter limiter)
+    {
+        assert sstable != null;
+
+        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
+        this.ifile = sstable.openIndexReader();
+        this.sstable = sstable;
+        this.dataRange = dataRange;
+        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
+        if (dataRange.isWrapAround() && 
!dataRange.stopKey().isMinimum(sstable.partitioner))
+        {
+            // split the wrapping range into two parts: 1) the part that 
starts at the beginning of the sstable, and
+            // 2) the part that comes before the wrap-around
+            boundsList.add(new 
Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), 
dataRange.stopKey(), sstable.partitioner));
+            boundsList.add(new Bounds<>(dataRange.startKey(), 
sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner));
+        }
+        else
+        {
+            boundsList.add(new Bounds<>(dataRange.startKey(), 
dataRange.stopKey(), sstable.partitioner));
+        }
+        this.rangeIterator = boundsList.iterator();
+    }
+
+    /**
+     * @param sstable SSTable to scan; must not be null
+     * @param tokenRanges A set of token ranges to scan
+     * @param limiter background i/o RateLimiter; may be null
+     */
+    public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> 
tokenRanges, RateLimiter limiter)
+    {
+        assert sstable != null;
+
+        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
+        this.ifile = sstable.openIndexReader();
+        this.sstable = sstable;
+        this.dataRange = null;
+        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+        List<Range<Token>> normalized = Range.normalize(tokenRanges);
+        List<AbstractBounds<RowPosition>> boundsList = new 
ArrayList<>(normalized.size());
+        for (Range<Token> range : normalized)
+            boundsList.add(new 
Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner),
+                                                  
range.right.maxKeyBound(sstable.partitioner),
+                                                  sstable.partitioner));
+
+        this.rangeIterator = boundsList.iterator();
+    }
+
+    private void seekToCurrentRangeStart()
+    {
+        if (currentRange.left.isMinimum(sstable.partitioner))
+            return;
+
+        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
+        // -1 means the key is before everything in the sstable. So just start 
from the beginning.
+        if (indexPosition == -1)
+        {
+            // Note: this method shouldn't assume we're at the start of the 
sstable already (see #6638) and
+            // the seeks are no-op anyway if we are.
+            ifile.seek(0);
+            dfile.seek(0);
+            return;
+        }
+
+        ifile.seek(indexPosition);
+        try
+        {
+
+            while (!ifile.isEOF())
+            {
+                indexPosition = ifile.getFilePointer();
+                DecoratedKey indexDecoratedKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+                int comparison = 
indexDecoratedKey.compareTo(currentRange.left);
+                // because our range start may be inclusive or exclusive, we 
need to also contains()
+                // instead of just checking (comparison >= 0)
+                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
+                {
+                    // Found, just read the dataPosition and seek into index 
and data files
+                    long dataPosition = ifile.readLong();
+                    ifile.seek(indexPosition);
+                    dfile.seek(dataPosition);
+                    break;
+                }
+                else
+                {
+                    RowIndexEntry.Serializer.skip(ifile);
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+    }
+
+    public void close() throws IOException
+    {
+        FileUtils.close(dfile, ifile);
+    }
+
+    public long getLengthInBytes()
+    {
+        return dfile.length();
+    }
+
+    public long getCurrentPosition()
+    {
+        return dfile.getFilePointer();
+    }
+
+    public String getBackingFiles()
+    {
+        return sstable.toString();
+    }
+
+    public boolean hasNext()
+    {
+        if (iterator == null)
+            iterator = createIterator();
+        return iterator.hasNext();
+    }
+
+    public OnDiskAtomIterator next()
+    {
+        if (iterator == null)
+            iterator = createIterator();
+        return iterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private Iterator<OnDiskAtomIterator> createIterator()
+    {
+        return new KeyScanningIterator();
+    }
+
+    protected class KeyScanningIterator extends 
AbstractIterator<OnDiskAtomIterator>
+    {
+        private DecoratedKey nextKey;
+        private RowIndexEntry nextEntry;
+        private DecoratedKey currentKey;
+        private RowIndexEntry currentEntry;
+
+        protected OnDiskAtomIterator computeNext()
+        {
+            try
+            {
+                if (nextEntry == null)
+                {
+                    do
+                    {
+                        // we're starting the first range or we just passed 
the end of the previous range
+                        if (!rangeIterator.hasNext())
+                            return endOfData();
+
+                        currentRange = rangeIterator.next();
+                        seekToCurrentRangeStart();
+
+                        if (ifile.isEOF())
+                            return endOfData();
+
+                        currentKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+                        currentEntry = 
rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
+                    } while (!currentRange.contains(currentKey));
+                }
+                else
+                {
+                    // we're in the middle of a range
+                    currentKey = nextKey;
+                    currentEntry = nextEntry;
+                }
+
+                long readEnd;
+                if (ifile.isEOF())
+                {
+                    nextEntry = null;
+                    nextKey = null;
+                    readEnd = dfile.length();
+                }
+                else
+                {
+                    // we need the position of the start of the next key, 
regardless of whether it falls in the current range
+                    nextKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
+                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, 
sstable.descriptor.version);
+                    readEnd = nextEntry.position;
+
+                    if (!currentRange.contains(nextKey))
+                    {
+                        nextKey = null;
+                        nextEntry = null;
+                    }
+                }
+
+                if (dataRange == null || 
dataRange.selectsFullRowFor(currentKey.getKey()))
+                {
+                    dfile.seek(currentEntry.position + 
currentEntry.headerOffset());
+                    ByteBufferUtil.readWithShortLength(dfile); // key
+                    return new SSTableIdentityIterator(sstable, dfile, 
currentKey);
+                }
+
+                return new LazyColumnIterator(currentKey, new 
IColumnIteratorFactory()
+                {
+                    public OnDiskAtomIterator create()
+                    {
+                        return 
dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, 
dfile, currentKey, currentEntry);
+                    }
+                });
+
+            }
+            catch (IOException e)
+            {
+                sstable.markSuspect();
+                throw new CorruptSSTableException(e, sstable.getFilename());
+            }
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getSimpleName() + "(" +
+               "dfile=" + dfile +
+               " ifile=" + ifile +
+               " sstable=" + sstable +
+               ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
new file mode 100644
index 0000000..ec53b4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class BigTableWriter extends SSTableWriter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BigTableWriter.class);
+
+    // not very random, but the only value that can't be mistaken for a legal 
column-name length
+    public static final int END_OF_ROW = 0x0000;
+
+    private IndexWriter iwriter;
+    private SegmentedFile.Builder dbuilder;
+    private final SequentialWriter dataFile;
+    private DecoratedKey lastWrittenKey;
+    private FileMark dataMark;
+
+    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, 
CFMetaData metadata, IPartitioner partitioner, MetadataCollector 
metadataCollector)
+    {
+        super(descriptor, keyCount, repairedAt, metadata, partitioner, 
metadataCollector);
+
+        iwriter = new IndexWriter(keyCount);
+
+        if (compression)
+        {
+            dataFile = SequentialWriter.open(getFilename(),
+                                             
descriptor.filenameFor(Component.COMPRESSION_INFO),
+                                             metadata.compressionParameters(),
+                                             metadataCollector);
+            dbuilder = 
SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
+        }
+        else
+        {
+            dataFile = SequentialWriter.open(new File(getFilename()), new 
File(descriptor.filenameFor(Component.CRC)));
+            dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+        }
+    }
+
+    public void mark()
+    {
+        dataMark = dataFile.mark();
+        iwriter.mark();
+    }
+
+    public void resetAndTruncate()
+    {
+        dataFile.resetAndTruncate(dataMark);
+        iwriter.resetAndTruncate();
+    }
+
+    /**
+     * Perform sanity checks on @param decoratedKey and @return the position 
in the data file before any data is written
+     */
+    private long beforeAppend(DecoratedKey decoratedKey)
+    {
+        assert decoratedKey != null : "Keys must not be null"; // empty keys 
ARE allowed b/c of indexed column values
+        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
+            throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
+        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
+    }
+
+    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, 
RowIndexEntry index)
+    {
+        metadataCollector.addKey(decoratedKey.getKey());
+        lastWrittenKey = decoratedKey;
+        last = lastWrittenKey;
+        if (first == null)
+            first = lastWrittenKey;
+
+        if (logger.isTraceEnabled())
+            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
+        iwriter.append(decoratedKey, index);
+        dbuilder.addPotentialBoundary(dataPosition);
+    }
+
+    /**
+     * @param row
+     * @return null if the row was compacted away entirely; otherwise, the PK 
index entry for this row
+     */
+    public RowIndexEntry append(AbstractCompactedRow row)
+    {
+        long currentPosition = beforeAppend(row.key);
+        RowIndexEntry entry;
+        try
+        {
+            entry = row.write(currentPosition, dataFile);
+            if (entry == null)
+                return null;
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
+        metadataCollector.update(dataFile.getFilePointer() - currentPosition, 
row.columnStats());
+        afterAppend(row.key, currentPosition, entry);
+        return entry;
+    }
+
+    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+    {
+        long startPosition = beforeAppend(decoratedKey);
+        try
+        {
+            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, 
dataFile.stream);
+            afterAppend(decoratedKey, startPosition, entry);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
+        metadataCollector.update(dataFile.getFilePointer() - startPosition, 
cf.getColumnStats());
+    }
+
+    private static RowIndexEntry rawAppend(ColumnFamily cf, long 
startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
+    {
+        assert cf.hasColumns() || cf.isMarkedForDelete();
+
+        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, 
key.getKey(), out);
+        ColumnIndex index = builder.build(cf);
+
+        out.writeShort(END_OF_ROW);
+        return RowIndexEntry.create(startPosition, 
cf.deletionInfo().getTopLevelDeletion(), index);
+    }
+
+    /**
+     * @throws IOException if a read from the DataInput fails
+     * @throws FSWriteError if a write to the dataFile fails
+     */
+    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in, Version version) throws IOException
+    {
+        long currentPosition = beforeAppend(key);
+
+        ColumnStats.MaxLongTracker maxTimestampTracker = new 
ColumnStats.MaxLongTracker(Long.MAX_VALUE);
+        ColumnStats.MinLongTracker minTimestampTracker = new 
ColumnStats.MinLongTracker(Long.MIN_VALUE);
+        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new 
ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
+        List<ByteBuffer> minColumnNames = Collections.emptyList();
+        List<ByteBuffer> maxColumnNames = Collections.emptyList();
+        StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
+        boolean hasLegacyCounterShards = false;
+
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+        cf.delete(DeletionTime.serializer.deserialize(in));
+
+        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, 
key.getKey(), dataFile.stream);
+
+        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < 
Integer.MAX_VALUE)
+        {
+            
tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+            
maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+            
minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+            
maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+        }
+
+        Iterator<RangeTombstone> rangeTombstoneIterator = 
cf.deletionInfo().rangeIterator();
+        while (rangeTombstoneIterator.hasNext())
+        {
+            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
+            tombstones.update(rangeTombstone.getLocalDeletionTime());
+            minTimestampTracker.update(rangeTombstone.timestamp());
+            maxTimestampTracker.update(rangeTombstone.timestamp());
+            
maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
+            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, 
rangeTombstone.min, metadata.comparator);
+            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, 
rangeTombstone.max, metadata.comparator);
+        }
+
+        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, 
ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, 
metadata.comparator);
+        try
+        {
+            while (iter.hasNext())
+            {
+                OnDiskAtom atom = iter.next();
+                if (atom == null)
+                    break;
+
+                if (atom instanceof CounterCell)
+                {
+                    atom = ((CounterCell) atom).markLocalToBeCleared();
+                    hasLegacyCounterShards = hasLegacyCounterShards || 
((CounterCell) atom).hasLegacyShards();
+                }
+
+                int deletionTime = atom.getLocalDeletionTime();
+                if (deletionTime < Integer.MAX_VALUE)
+                    tombstones.update(deletionTime);
+                minTimestampTracker.update(atom.timestamp());
+                maxTimestampTracker.update(atom.timestamp());
+                minColumnNames = 
ColumnNameHelper.minComponents(minColumnNames, atom.name(), 
metadata.comparator);
+                maxColumnNames = 
ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), 
metadata.comparator);
+                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
+
+                columnIndexer.add(atom); // This write the atom on disk too
+            }
+
+            columnIndexer.maybeWriteEmptyRowHeader();
+            dataFile.stream.writeShort(END_OF_ROW);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, dataFile.getPath());
+        }
+
+        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
+                         .updateMaxTimestamp(maxTimestampTracker.get())
+                         
.updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
+                         .addRowSize(dataFile.getFilePointer() - 
currentPosition)
+                         .addColumnCount(columnIndexer.writtenAtomCount())
+                         .mergeTombstoneHistogram(tombstones)
+                         .updateMinColumnNames(minColumnNames)
+                         .updateMaxColumnNames(maxColumnNames)
+                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
+
+        afterAppend(key, currentPosition, 
RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), 
columnIndexer.build()));
+        return currentPosition;
+    }
+
+    /**
+     * After failure, attempt to close the index writer and data file before 
deleting all temp components for the sstable
+     */
+    public void abort(boolean closeBf)
+    {
+        assert descriptor.type.isTemporary;
+        if (iwriter == null && dataFile == null)
+            return;
+        if (iwriter != null)
+        {
+            FileUtils.closeQuietly(iwriter.indexFile);
+            if (closeBf)
+            {
+                iwriter.bf.close();
+            }
+        }
+        if (dataFile!= null)
+            FileUtils.closeQuietly(dataFile);
+
+        Set<Component> components = SSTable.componentsFor(descriptor);
+        try
+        {
+            if (!components.isEmpty())
+                SSTable.delete(descriptor, components);
+        }
+        catch (FSWriteError e)
+        {
+            logger.error(String.format("Failed deleting temp components for 
%s", descriptor), e);
+            throw e;
+        }
+    }
+
+    // we use this method to ensure any managed data we may have retained 
references to during the write are no
+    // longer referenced, so that we do not need to enclose the expensive call 
to closeAndOpenReader() in a transaction
+    public void isolateReferences()
+    {
+        // currently we only maintain references to first/last/lastWrittenKey 
from the data provided; all other
+        // data retention is done through copying
+        first = getMinimalKey(first);
+        last = lastWrittenKey = getMinimalKey(last);
+    }
+
+    public SSTableReader openEarly(long maxDataAge)
+    {
+        StatsMetadata sstableMetadata = (StatsMetadata) 
metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                                  
metadata.getBloomFilterFpChance(),
+                                                  
repairedAt).get(MetadataType.STATS);
+
+        // find the max (exclusive) readable key
+        DecoratedKey exclusiveUpperBoundOfReadableIndex = 
iwriter.getMaxReadableKey(0);
+        if (exclusiveUpperBoundOfReadableIndex == null)
+            return null;
+
+        // create temp links if they don't already exist
+        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+        {
+            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new 
File(link.filenameFor(Component.PRIMARY_INDEX)));
+            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.DATA)), new 
File(link.filenameFor(Component.DATA)));
+        }
+
+        // open the reader early, giving it a FINAL descriptor type so that it 
is indistinguishable for other consumers
+        SegmentedFile ifile = 
iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = 
dbuilder.openEarly(link.filenameFor(Component.DATA));
+        SSTableReader sstable = 
SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+                                                           components, 
metadata,
+                                                           partitioner, ifile,
+                                                           dfile, 
iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+                                                           iwriter.bf, 
maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+
+        // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
+        sstable.first = getMinimalKey(first);
+        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+        DecoratedKey inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(1);
+        if (inclusiveUpperBoundOfReadableData == null)
+            return null;
+        int offset = 2;
+        while (true)
+        {
+            RowIndexEntry indexEntry = 
sstable.getPosition(inclusiveUpperBoundOfReadableData, 
SSTableReader.Operator.GT);
+            if (indexEntry != null && indexEntry.position <= 
dataFile.getLastFlushOffset())
+                break;
+            inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(offset++);
+            if (inclusiveUpperBoundOfReadableData == null)
+                return null;
+        }
+        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+        return sstable;
+    }
+
+    public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
+    {
+        Pair<Descriptor, StatsMetadata> p = close(repairedAt);
+        Descriptor newdesc = p.left;
+        StatsMetadata sstableMetadata = p.right;
+
+        // finalize in-memory state for the reader
+        SegmentedFile ifile = 
iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = 
dbuilder.complete(newdesc.filenameFor(Component.DATA));
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+                                                           components,
+                                                           metadata,
+                                                           partitioner,
+                                                           ifile,
+                                                           dfile,
+                                                           
iwriter.summary.build(partitioner),
+                                                           iwriter.bf,
+                                                           maxDataAge,
+                                                           sstableMetadata,
+                                                           
SSTableReader.OpenReason.NORMAL);
+        sstable.first = getMinimalKey(first);
+        sstable.last = getMinimalKey(last);
+        // try to save the summaries to disk
+        sstable.saveSummary(iwriter.builder, dbuilder);
+        iwriter = null;
+        dbuilder = null;
+        return sstable;
+    }
+
+    // Close the writer and return the descriptor to the new sstable and it's 
metadata
+    public Pair<Descriptor, StatsMetadata> close()
+    {
+        return close(this.repairedAt);
+    }
+
+    private Pair<Descriptor, StatsMetadata> close(long repairedAt)
+    {
+
+        // index and filter
+        iwriter.close();
+        // main data, close will truncate if necessary
+        dataFile.close();
+        dataFile.writeFullChecksum(descriptor);
+        // write sstable statistics
+        Map<MetadataType, MetadataComponent> metadataComponents = 
metadataCollector.finalizeMetadata(
+                                                                               
     partitioner.getClass().getCanonicalName(),
+                                                                               
     metadata.getBloomFilterFpChance(),
+                                                                               
     repairedAt);
+        writeMetadata(descriptor, metadataComponents);
+
+        // save the table of components
+        SSTable.appendTOC(descriptor, components);
+
+        // remove the 'tmp' marker from all components
+        return Pair.create(SSTableWriter.rename(descriptor, components), 
(StatsMetadata) metadataComponents.get(MetadataType.STATS));
+
+    }
+
+
+    private static void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
+    {
+        SequentialWriter out = SequentialWriter.open(new 
File(desc.filenameFor(Component.STATS)));
+        try
+        {
+            desc.getMetadataSerializer().serialize(components, out.stream);
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, out.getPath());
+        }
+        finally
+        {
+            out.close();
+        }
+    }
+
+    public long getFilePointer()
+    {
+        return dataFile.getFilePointer();
+    }
+
+    public long getOnDiskFilePointer()
+    {
+        return dataFile.getOnDiskFilePointer();
+    }
+
+    /**
+     * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
+     */
+    class IndexWriter implements Closeable
+    {
+        private final SequentialWriter indexFile;
+        public final SegmentedFile.Builder builder;
+        public final IndexSummaryBuilder summary;
+        public final IFilter bf;
+        private FileMark mark;
+
+        IndexWriter(long keyCount)
+        {
+            indexFile = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+            builder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            summary = new IndexSummaryBuilder(keyCount, 
metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
+            bf = FilterFactory.getFilter(keyCount, 
metadata.getBloomFilterFpChance(), true);
+        }
+
+        // finds the last (-offset) decorated key that can be guaranteed to 
occur fully in the flushed portion of the index file
+        DecoratedKey getMaxReadableKey(int offset)
+        {
+            long maxIndexLength = indexFile.getLastFlushOffset();
+            return summary.getMaxReadableKey(maxIndexLength, offset);
+        }
+
+        public void append(DecoratedKey key, RowIndexEntry indexEntry)
+        {
+            bf.add(key.getKey());
+            long indexPosition = indexFile.getFilePointer();
+            try
+            {
+                ByteBufferUtil.writeWithShortLength(key.getKey(), 
indexFile.stream);
+                rowIndexEntrySerializer.serialize(indexEntry, 
indexFile.stream);
+            }
+            catch (IOException e)
+            {
+                throw new FSWriteError(e, indexFile.getPath());
+            }
+
+            if (logger.isTraceEnabled())
+                logger.trace("wrote index entry: {} at {}", indexEntry, 
indexPosition);
+
+            summary.maybeAddEntry(key, indexPosition);
+            builder.addPotentialBoundary(indexPosition);
+        }
+
+        /**
+         * Closes the index and bloomfilter, making the public state of this 
writer valid for consumption.
+         */
+        public void close()
+        {
+            if (components.contains(Component.FILTER))
+            {
+                String path = descriptor.filenameFor(Component.FILTER);
+                try
+                {
+                    // bloom filter
+                    FileOutputStream fos = new FileOutputStream(path);
+                    DataOutputStreamAndChannel stream = new 
DataOutputStreamAndChannel(fos);
+                    FilterFactory.serialize(bf, stream);
+                    stream.flush();
+                    fos.getFD().sync();
+                    stream.close();
+                }
+                catch (IOException e)
+                {
+                    throw new FSWriteError(e, path);
+                }
+            }
+
+            // index
+            long position = indexFile.getFilePointer();
+            indexFile.close(); // calls force
+            FileUtils.truncate(indexFile.getPath(), position);
+        }
+
+        public void mark()
+        {
+            mark = indexFile.mark();
+        }
+
+        public void resetAndTruncate()
+        {
+            // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
+            // we can't reset dbuilder either, but that is the last thing 
called in afterappend so
+            // we assume that if that worked then we won't be trying to reset.
+            indexFile.resetAndTruncate(mark);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
new file mode 100644
index 0000000..a69cff9
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * This is a reader that finds the block for a starting column and returns 
blocks before/after it for each next call.
+ * This function assumes that the CF is sorted by name and exploits the name 
index.
+ */
+class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements 
OnDiskAtomIterator
+{
+    private final ColumnFamily emptyColumnFamily;
+
+    private final SSTableReader sstable;
+    private final List<IndexHelper.IndexInfo> indexes;
+    private final FileDataInput originalInput;
+    private FileDataInput file;
+    private final boolean reversed;
+    private final ColumnSlice[] slices;
+    private final BlockFetcher fetcher;
+    private final Deque<OnDiskAtom> blockColumns = new 
ArrayDeque<OnDiskAtom>();
+    private final CellNameType comparator;
+
+    // Holds range tombstone in reverse queries. See addColumn()
+    private final Deque<OnDiskAtom> rangeTombstonesReversed;
+
+    /**
+     * This slice reader assumes that slices are sorted correctly, e.g. that 
for forward lookup slices are in
+     * lexicographic order of start elements and that for reverse lookup they 
are in reverse lexicographic order of
+     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] 
reverse: [h,g],[e,d],[b,a]. This reader also
+     * assumes that validation has been performed in terms of intervals (no 
overlapping intervals).
+     */
+    IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, 
FileDataInput input, ColumnSlice[] slices, boolean reversed)
+    {
+        Tracing.trace("Seeking to partition indexed section in data file");
+        this.sstable = sstable;
+        this.originalInput = input;
+        this.reversed = reversed;
+        this.slices = slices;
+        this.comparator = sstable.metadata.comparator;
+        this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() 
: null;
+
+        try
+        {
+            this.indexes = indexEntry.columnsIndex();
+            emptyColumnFamily = 
ArrayBackedSortedColumns.factory.create(sstable.metadata);
+            if (indexes.isEmpty())
+            {
+                setToRowStart(indexEntry, input);
+                
emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
+                fetcher = new SimpleBlockFetcher();
+            }
+            else
+            {
+                emptyColumnFamily.delete(indexEntry.deletionTime());
+                fetcher = new IndexedBlockFetcher(indexEntry.position);
+            }
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, file.getPath());
+        }
+    }
+
+    /**
+     * Sets the seek position to the start of the row for column scanning.
+     */
+    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) 
throws IOException
+    {
+        if (in == null)
+        {
+            this.file = sstable.getFileDataInput(rowEntry.position);
+        }
+        else
+        {
+            this.file = in;
+            in.seek(rowEntry.position);
+        }
+        
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+    }
+
+    public ColumnFamily getColumnFamily()
+    {
+        return emptyColumnFamily;
+    }
+
+    public DecoratedKey getKey()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected OnDiskAtom computeNext()
+    {
+        while (true)
+        {
+            if (reversed)
+            {
+                // Return all tombstone for the block first (see addColumn() 
below)
+                OnDiskAtom column = rangeTombstonesReversed.poll();
+                if (column != null)
+                    return column;
+            }
+
+            OnDiskAtom column = blockColumns.poll();
+            if (column == null)
+            {
+                if (!fetcher.fetchMoreData())
+                    return endOfData();
+            }
+            else
+            {
+                return column;
+            }
+        }
+    }
+
+    public void close() throws IOException
+    {
+        if (originalInput == null && file != null)
+            file.close();
+    }
+
+    protected void addColumn(OnDiskAtom col)
+    {
+        if (reversed)
+        {
+            /*
+             * We put range tomstone markers at the beginning of the range 
they delete. But for reversed queries,
+             * the caller still need to know about a RangeTombstone before it 
sees any column that it covers.
+             * To make that simple, we keep said tombstones separate and 
return them all before any column for
+             * a given block.
+             */
+            if (col instanceof RangeTombstone)
+                rangeTombstonesReversed.addFirst(col);
+            else
+                blockColumns.addFirst(col);
+        }
+        else
+        {
+            blockColumns.addLast(col);
+        }
+    }
+
+    private abstract class BlockFetcher
+    {
+        protected int currentSliceIdx;
+
+        protected BlockFetcher(int sliceIdx)
+        {
+            this.currentSliceIdx = sliceIdx;
+        }
+
+        /*
+         * Return the smallest key selected by the current ColumnSlice.
+         */
+        protected Composite currentStart()
+        {
+            return reversed ? slices[currentSliceIdx].finish : 
slices[currentSliceIdx].start;
+        }
+
+        /*
+         * Return the biggest key selected by the current ColumnSlice.
+         */
+        protected Composite currentFinish()
+        {
+            return reversed ? slices[currentSliceIdx].start : 
slices[currentSliceIdx].finish;
+        }
+
+        protected abstract boolean setNextSlice();
+
+        protected abstract boolean fetchMoreData();
+
+        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
+        {
+            return isBeforeSliceStart(column.name());
+        }
+
+        protected boolean isBeforeSliceStart(Composite name)
+        {
+            Composite start = currentStart();
+            return !start.isEmpty() && comparator.compare(name, start) < 0;
+        }
+
+        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
+        {
+            Composite finish = currentFinish();
+            return finish.isEmpty() || comparator.compare(column.name(), 
finish) <= 0;
+        }
+
+        protected boolean isAfterSliceFinish(Composite name)
+        {
+            Composite finish = currentFinish();
+            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
+        }
+    }
+
+    private class IndexedBlockFetcher extends BlockFetcher
+    {
+        // where this row starts
+        private final long columnsStart;
+
+        // the index entry for the next block to deserialize
+        private int nextIndexIdx = -1;
+
+        // index of the last block we've read from disk;
+        private int lastDeserializedBlock = -1;
+
+        // For reversed, keep columns at the beginning of the last 
deserialized block that
+        // may still match a slice
+        private final Deque<OnDiskAtom> prefetched;
+
+        public IndexedBlockFetcher(long columnsStart)
+        {
+            super(-1);
+            this.columnsStart = columnsStart;
+            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
+            setNextSlice();
+        }
+
+        protected boolean setNextSlice()
+        {
+            while (++currentSliceIdx < slices.length)
+            {
+                nextIndexIdx = 
IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, 
reversed, nextIndexIdx);
+                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
+                    // no index block for that slice
+                    continue;
+
+                // Check if we can exclude this slice entirely from the index
+                IndexInfo info = indexes.get(nextIndexIdx);
+                if (reversed)
+                {
+                    if (!isBeforeSliceStart(info.lastName))
+                        return true;
+                }
+                else
+                {
+                    if (!isAfterSliceFinish(info.firstName))
+                        return true;
+                }
+            }
+            nextIndexIdx = -1;
+            return false;
+        }
+
+        protected boolean hasMoreSlice()
+        {
+            return currentSliceIdx < slices.length;
+        }
+
+        protected boolean fetchMoreData()
+        {
+            if (!hasMoreSlice())
+                return false;
+
+            // If we read blocks in reversed disk order, we may have columns 
from the previous block to handle.
+            // Note that prefetched keeps columns in reversed disk order.
+            if (reversed && !prefetched.isEmpty())
+            {
+                boolean gotSome = false;
+                // Avoids some comparison when we know it's not useful
+                boolean inSlice = false;
+
+                OnDiskAtom prefetchedCol;
+                while ((prefetchedCol = prefetched.peek() ) != null)
+                {
+                    // col is before slice, we update the slice
+                    if (isColumnBeforeSliceStart(prefetchedCol))
+                    {
+                        inSlice = false;
+                        if (!setNextSlice())
+                            return false;
+                    }
+                    // col is within slice, all columns
+                    // (we go in reverse, so as soon as we are in a slice, no 
need to check
+                    // we're after the slice until we change slice)
+                    else if (inSlice || 
isColumnBeforeSliceFinish(prefetchedCol))
+                    {
+                        blockColumns.addLast(prefetched.poll());
+                        gotSome = true;
+                        inSlice = true;
+                    }
+                    // if col is after slice, ignore
+                    else
+                    {
+                        prefetched.poll();
+                    }
+                }
+                if (gotSome)
+                    return true;
+            }
+            try
+            {
+                return getNextBlock();
+            }
+            catch (IOException e)
+            {
+                throw new CorruptSSTableException(e, file.getPath());
+            }
+        }
+
+        private boolean getNextBlock() throws IOException
+        {
+            if (lastDeserializedBlock == nextIndexIdx)
+            {
+                if (reversed)
+                    nextIndexIdx--;
+                else
+                    nextIndexIdx++;
+            }
+            lastDeserializedBlock = nextIndexIdx;
+
+            // Are we done?
+            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= 
indexes.size())
+                return false;
+
+            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
+
+            /* seek to the correct offset to the data, and calculate the data 
size */
+            long positionToSeek = columnsStart + currentIndex.offset;
+
+            // With new promoted indexes, our first seek in the data file will 
happen at that point.
+            if (file == null)
+                file = originalInput == null ? 
sstable.getFileDataInput(positionToSeek) : originalInput;
+
+            AtomDeserializer deserializer = 
emptyColumnFamily.metadata().getOnDiskDeserializer(file, 
sstable.descriptor.version);
+
+            file.seek(positionToSeek);
+            FileMark mark = file.mark();
+
+            // We remenber when we are whithin a slice to avoid some comparison
+            boolean inSlice = false;
+
+            // scan from index start
+            while (file.bytesPastMark(mark) < currentIndex.width || 
deserializer.hasUnprocessed())
+            {
+                // col is before slice
+                // (If in slice, don't bother checking that until we change 
slice)
+                Composite start = currentStart();
+                if (!inSlice && !start.isEmpty() && 
deserializer.compareNextTo(start) < 0)
+                {
+                    if (reversed)
+                    {
+                        // the next slice select columns that are before the 
current one, so it may
+                        // match this column, so keep it around.
+                        prefetched.addFirst(deserializer.readNext());
+                    }
+                    else
+                    {
+                        deserializer.skipNext();
+                    }
+                }
+                // col is within slice
+                else
+                {
+                    Composite finish = currentFinish();
+                    if (finish.isEmpty() || deserializer.compareNextTo(finish) 
<= 0)
+                    {
+                        inSlice = true;
+                        addColumn(deserializer.readNext());
+                    }
+                    // col is after slice.
+                    else
+                    {
+                        // When reading forward, if we hit a column that sorts 
after the current slice, it means we're done with this slice.
+                        // For reversed, this may either mean that we're done 
with the current slice, or that we need to read the previous
+                        // index block. However, we can be sure that we are in 
the first case though (the current slice is done) if the first
+                        // columns of the block were not part of the current 
slice, i.e. if we have columns in prefetched.
+                        if (reversed && prefetched.isEmpty())
+                            break;
+
+                        if (!setNextSlice())
+                            break;
+
+                        inSlice = false;
+
+                        // The next index block now corresponds to the first 
block that may have columns for the newly set slice.
+                        // So if it's different from the current block, we're 
done with this block. And in that case, we know
+                        // that our prefetched columns won't match.
+                        if (nextIndexIdx != lastDeserializedBlock)
+                        {
+                            if (reversed)
+                                prefetched.clear();
+                            break;
+                        }
+
+                        // Even if the next slice may have column in this 
blocks, if we're reversed, those columns have been
+                        // prefetched and we're done with that block
+                        if (reversed)
+                            break;
+
+                        // otherwise, we will deal with that column at the 
next iteration
+                    }
+                }
+            }
+            return true;
+        }
+    }
+
+    private class SimpleBlockFetcher extends BlockFetcher
+    {
+        public SimpleBlockFetcher() throws IOException
+        {
+            // Since we have to deserialize in order and will read all slices 
might as well reverse the slices and
+            // behave as if it was not reversed
+            super(reversed ? slices.length - 1 : 0);
+
+            // We remenber when we are whithin a slice to avoid some comparison
+            boolean inSlice = false;
+
+            AtomDeserializer deserializer = 
emptyColumnFamily.metadata().getOnDiskDeserializer(file, 
sstable.descriptor.version);
+            while (deserializer.hasNext())
+            {
+                // col is before slice
+                // (If in slice, don't bother checking that until we change 
slice)
+                Composite start = currentStart();
+                if (!inSlice && !start.isEmpty() && 
deserializer.compareNextTo(start) < 0)
+                {
+                    deserializer.skipNext();
+                    continue;
+                }
+
+                // col is within slice
+                Composite finish = currentFinish();
+                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 
0)
+                {
+                    inSlice = true;
+                    addColumn(deserializer.readNext());
+                }
+                // col is after slice. more slices?
+                else
+                {
+                    inSlice = false;
+                    if (!setNextSlice())
+                        break;
+                }
+            }
+        }
+
+        protected boolean setNextSlice()
+        {
+            if (reversed)
+            {
+                if (currentSliceIdx <= 0)
+                    return false;
+
+                currentSliceIdx--;
+            }
+            else
+            {
+                if (currentSliceIdx >= slices.length - 1)
+                    return false;
+
+                currentSliceIdx++;
+            }
+            return true;
+        }
+
+        protected boolean fetchMoreData()
+        {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
new file mode 100644
index 0000000..07dc59a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements 
OnDiskAtomIterator
+{
+    private ColumnFamily cf;
+    private final SSTableReader sstable;
+    private FileDataInput fileToClose;
+    private Iterator<OnDiskAtom> iter;
+    public final SortedSet<CellName> columns;
+    public final DecoratedKey key;
+
+    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, 
SortedSet<CellName> columns)
+    {
+        assert columns != null;
+        this.sstable = sstable;
+        this.columns = columns;
+        this.key = key;
+
+        RowIndexEntry indexEntry = sstable.getPosition(key, 
SSTableReader.Operator.EQ);
+        if (indexEntry == null)
+            return;
+
+        try
+        {
+            read(sstable, null, indexEntry);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+        finally
+        {
+            if (fileToClose != null)
+                FileUtils.closeQuietly(fileToClose);
+        }
+    }
+
+    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, 
DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
+    {
+        assert columns != null;
+        this.sstable = sstable;
+        this.columns = columns;
+        this.key = key;
+
+        try
+        {
+            read(sstable, file, indexEntry);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+    }
+
+    private FileDataInput createFileDataInput(long position)
+    {
+        fileToClose = sstable.getFileDataInput(position);
+        return fileToClose;
+    }
+
+    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry 
indexEntry)
+    throws IOException
+    {
+        List<IndexHelper.IndexInfo> indexList;
+
+        // If the entry is not indexed or the index is not promoted, read from 
the row start
+        if (!indexEntry.isIndexed())
+        {
+            if (file == null)
+                file = createFileDataInput(indexEntry.position);
+            else
+                file.seek(indexEntry.position);
+
+            DecoratedKey keyInDisk = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
+            assert keyInDisk.equals(key) : String.format("%s != %s in %s", 
keyInDisk, key, file.getPath());
+        }
+
+        indexList = indexEntry.columnsIndex();
+
+        if (!indexEntry.isIndexed())
+        {
+            ColumnFamilySerializer serializer = ColumnFamily.serializer;
+            try
+            {
+                cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+                cf.delete(DeletionTime.serializer.deserialize(file));
+            }
+            catch (Exception e)
+            {
+                throw new IOException(serializer + " failed to deserialize " + 
sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, 
e);
+            }
+        }
+        else
+        {
+            cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+            cf.delete(indexEntry.deletionTime());
+        }
+
+        List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
+        if (indexList.isEmpty())
+        {
+            readSimpleColumns(file, columns, result);
+        }
+        else
+        {
+            readIndexedColumns(sstable.metadata, file, columns, indexList, 
indexEntry.position, result);
+        }
+
+        // create an iterator view of the columns we read
+        iter = result.iterator();
+    }
+
+    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> 
columnNames, List<OnDiskAtom> result)
+    {
+        Iterator<OnDiskAtom> atomIterator = 
cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
+        int n = 0;
+        while (atomIterator.hasNext())
+        {
+            OnDiskAtom column = atomIterator.next();
+            if (column instanceof Cell)
+            {
+                if (columnNames.contains(column.name()))
+                {
+                    result.add(column);
+                    if (++n >= columns.size())
+                        break;
+                }
+            }
+            else
+            {
+                result.add(column);
+            }
+        }
+    }
+
+    private void readIndexedColumns(CFMetaData metadata,
+                                    FileDataInput file,
+                                    SortedSet<CellName> columnNames,
+                                    List<IndexHelper.IndexInfo> indexList,
+                                    long basePosition,
+                                    List<OnDiskAtom> result)
+    throws IOException
+    {
+        /* get the various column ranges we have to read */
+        CellNameType comparator = metadata.comparator;
+        List<IndexHelper.IndexInfo> ranges = new 
ArrayList<IndexHelper.IndexInfo>();
+        int lastIndexIdx = -1;
+        for (CellName name : columnNames)
+        {
+            int index = IndexHelper.indexFor(name, indexList, comparator, 
false, lastIndexIdx);
+            if (index < 0 || index == indexList.size())
+                continue;
+            IndexHelper.IndexInfo indexInfo = indexList.get(index);
+            // Check the index block does contain the column names and that we 
haven't inserted this block yet.
+            if (comparator.compare(name, indexInfo.firstName) < 0 || index == 
lastIndexIdx)
+                continue;
+
+            ranges.add(indexInfo);
+            lastIndexIdx = index;
+        }
+
+        if (ranges.isEmpty())
+            return;
+
+        Iterator<CellName> toFetch = columnNames.iterator();
+        CellName nextToFetch = toFetch.next();
+        for (IndexHelper.IndexInfo indexInfo : ranges)
+        {
+            long positionToSeek = basePosition + indexInfo.offset;
+
+            // With new promoted indexes, our first seek in the data file will 
happen at that point.
+            if (file == null)
+                file = createFileDataInput(positionToSeek);
+
+            AtomDeserializer deserializer = 
cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
+            file.seek(positionToSeek);
+            FileMark mark = file.mark();
+            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch 
!= null)
+            {
+                int cmp = deserializer.compareNextTo(nextToFetch);
+                if (cmp == 0)
+                {
+                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+                    result.add(deserializer.readNext());
+                    continue;
+                }
+
+                deserializer.skipNext();
+                if (cmp > 0)
+                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+            }
+        }
+    }
+
+    public DecoratedKey getKey()
+    {
+        return key;
+    }
+
+    public ColumnFamily getColumnFamily()
+    {
+        return cf;
+    }
+
+    protected OnDiskAtom computeNext()
+    {
+        if (iter == null || !iter.hasNext())
+            return endOfData();
+        return iter.next();
+    }
+
+    public void close() throws IOException { }
+}

Reply via email to