Extend Descriptor to include a format value and refactor reader/writer apis

patch by tjake; reviewed by Marcus Eriksson for CASSANDRA-7443


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0368e97e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0368e97e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0368e97e

Branch: refs/heads/trunk
Commit: 0368e97ee4a807cb832a90c590ae5c65a98730c1
Parents: 7e53db0
Author: Jake Luciani <[email protected]>
Authored: Tue Sep 2 12:49:01 2014 -0400
Committer: T Jake Luciani <[email protected]>
Committed: Thu Oct 23 11:10:55 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 build.xml                                       |   12 +-
 .../org/apache/cassandra/config/CFMetaData.java |   10 +-
 .../cassandra/config/DatabaseDescriptor.java    |    9 +
 .../org/apache/cassandra/db/AbstractCell.java   |    3 +-
 .../apache/cassandra/db/AtomDeserializer.java   |    5 +-
 .../apache/cassandra/db/BatchlogManager.java    |    2 +-
 .../cassandra/db/CollationController.java       |    2 +-
 .../cassandra/db/ColumnFamilySerializer.java    |    3 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   31 +-
 .../org/apache/cassandra/db/DataTracker.java    |    2 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |    2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |    4 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   13 +-
 .../org/apache/cassandra/db/OnDiskAtom.java     |    5 +-
 .../org/apache/cassandra/db/RangeTombstone.java |    7 +-
 .../org/apache/cassandra/db/RowIndexEntry.java  |   60 +-
 .../apache/cassandra/db/RowIteratorFactory.java |    2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |    2 +-
 .../db/columniterator/IndexedSliceReader.java   |  499 -----
 .../db/columniterator/SSTableNamesIterator.java |  249 ---
 .../db/columniterator/SSTableSliceIterator.java |  101 -
 .../db/columniterator/SimpleSliceReader.java    |  107 -
 .../db/commitlog/CommitLogDescriptor.java       |    7 +-
 .../cassandra/db/commitlog/ReplayPosition.java  |    2 +-
 .../db/compaction/AbstractCompactedRow.java     |    3 +-
 .../compaction/AbstractCompactionStrategy.java  |    4 +-
 .../db/compaction/AbstractCompactionTask.java   |    2 +-
 .../db/compaction/CompactionController.java     |    4 +-
 .../db/compaction/CompactionIterable.java       |    8 +-
 .../db/compaction/CompactionManager.java        |   29 +-
 .../cassandra/db/compaction/CompactionTask.java |   37 +-
 .../DateTieredCompactionStrategy.java           |    2 +-
 .../db/compaction/LazilyCompactedRow.java       |   45 +-
 .../compaction/LeveledCompactionStrategy.java   |    9 +-
 .../db/compaction/LeveledCompactionTask.java    |    4 +-
 .../db/compaction/LeveledManifest.java          |    9 +-
 .../db/compaction/SSTableSplitter.java          |    3 +-
 .../cassandra/db/compaction/Scrubber.java       |   13 +-
 .../SizeTieredCompactionStrategy.java           |    5 +-
 .../cassandra/db/compaction/Upgrader.java       |    7 +-
 .../cassandra/db/composites/AbstractCType.java  |   14 -
 .../apache/cassandra/db/composites/CType.java   |    2 -
 .../cassandra/db/filter/IDiskAtomFilter.java    |    2 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |    7 +-
 .../apache/cassandra/db/filter/QueryFilter.java |    2 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |    7 +-
 .../cassandra/db/index/SecondaryIndex.java      |    2 +-
 .../db/index/SecondaryIndexManager.java         |    2 +-
 .../org/apache/cassandra/dht/BytesToken.java    |    1 -
 .../apache/cassandra/io/ISSTableSerializer.java |    3 +-
 .../io/compress/CompressionMetadata.java        |   11 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   22 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |    9 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  172 +-
 .../cassandra/io/sstable/IndexHelper.java       |   26 -
 .../io/sstable/IndexSummaryManager.java         |    1 +
 .../io/sstable/ReducingKeyIterator.java         |    1 +
 .../apache/cassandra/io/sstable/SSTable.java    |    2 +-
 .../io/sstable/SSTableDeletingTask.java         |    1 +
 .../io/sstable/SSTableIdentityIterator.java     |   23 +-
 .../cassandra/io/sstable/SSTableLoader.java     |    1 +
 .../cassandra/io/sstable/SSTableReader.java     | 2055 ------------------
 .../cassandra/io/sstable/SSTableRewriter.java   |    2 +
 .../cassandra/io/sstable/SSTableScanner.java    |  294 ---
 .../io/sstable/SSTableSimpleUnsortedWriter.java |    7 +-
 .../io/sstable/SSTableSimpleWriter.java         |    1 +
 .../cassandra/io/sstable/SSTableWriter.java     |  641 ------
 .../io/sstable/format/SSTableFormat.java        |   90 +
 .../io/sstable/format/SSTableReader.java        | 1881 ++++++++++++++++
 .../io/sstable/format/SSTableWriter.java        |  202 ++
 .../cassandra/io/sstable/format/Version.java    |  104 +
 .../io/sstable/format/big/BigFormat.java        |  224 ++
 .../io/sstable/format/big/BigTableReader.java   |  256 +++
 .../io/sstable/format/big/BigTableScanner.java  |  299 +++
 .../io/sstable/format/big/BigTableWriter.java   |  541 +++++
 .../sstable/format/big/IndexedSliceReader.java  |  500 +++++
 .../format/big/SSTableNamesIterator.java        |  250 +++
 .../format/big/SSTableSliceIterator.java        |  102 +
 .../sstable/format/big/SimpleSliceReader.java   |  108 +
 .../io/sstable/metadata/CompactionMetadata.java |    3 +-
 .../metadata/IMetadataComponentSerializer.java  |    3 +-
 .../io/sstable/metadata/MetadataCollector.java  |    2 +-
 .../io/sstable/metadata/StatsMetadata.java      |    7 +-
 .../io/sstable/metadata/ValidationMetadata.java |    3 +-
 .../cassandra/io/util/AbstractDataInput.java    |    6 +-
 .../io/util/DataIntegrityMetadata.java          |    2 +-
 .../apache/cassandra/io/util/FileDataInput.java |    1 +
 .../org/apache/cassandra/io/util/FileUtils.java |   29 +-
 .../cassandra/io/util/MappedFileDataInput.java  |    4 +-
 .../cassandra/io/util/MemoryInputStream.java    |    6 +-
 .../cassandra/metrics/ColumnFamilyMetrics.java  |    2 +-
 .../apache/cassandra/net/MessagingService.java  |    3 +-
 .../notifications/SSTableAddedNotification.java |    2 +-
 .../SSTableDeletingNotification.java            |    2 +-
 .../SSTableListChangedNotification.java         |    3 +-
 .../SSTableRepairStatusChanged.java             |    2 +-
 .../repair/RepairMessageVerbHandler.java        |    2 +-
 .../cassandra/service/ActiveRepairService.java  |    3 +-
 .../apache/cassandra/service/CacheService.java  |    7 +-
 .../cassandra/streaming/StreamLockfile.java     |    2 +-
 .../cassandra/streaming/StreamReader.java       |   33 +-
 .../cassandra/streaming/StreamReceiveTask.java  |    6 +-
 .../cassandra/streaming/StreamSession.java      |    2 +-
 .../cassandra/streaming/StreamTransferTask.java |    2 +-
 .../cassandra/streaming/StreamWriter.java       |    2 +-
 .../compress/CompressedStreamReader.java        |   16 +-
 .../compress/CompressedStreamWriter.java        |    2 +-
 .../streaming/messages/FileMessageHeader.java   |   27 +-
 .../streaming/messages/IncomingFileMessage.java |    2 +-
 .../streaming/messages/OutgoingFileMessage.java |    3 +-
 .../streaming/messages/StreamMessage.java       |    4 +-
 .../apache/cassandra/tools/SSTableExport.java   |    8 +-
 .../apache/cassandra/tools/SSTableImport.java   |    7 +-
 .../tools/SSTableRepairedAtSetter.java          |    2 +-
 .../cassandra/tools/StandaloneScrubber.java     |    3 +-
 .../cassandra/tools/StandaloneSplitter.java     |    3 +-
 .../cassandra/tools/StandaloneUpgrader.java     |    5 +-
 .../utils/vint/EncodedDataInputStream.java      |    6 +-
 .../db/compaction/LongCompactionsTest.java      |    4 +-
 .../LongLeveledCompactionStrategyTest.java      |    2 +-
 test/unit/org/apache/cassandra/Util.java        |    2 +-
 .../cassandra/cache/AutoSavingCacheTest.java    |    2 +-
 .../org/apache/cassandra/db/CleanupTest.java    |    2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   33 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |    5 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |    2 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |    3 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |    7 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |    5 +-
 .../db/compaction/AntiCompactionTest.java       |   15 +-
 .../compaction/BlacklistingCompactionsTest.java |    2 +-
 .../db/compaction/CompactionsPurgeTest.java     |    2 +-
 .../db/compaction/CompactionsTest.java          |   26 +-
 .../DateTieredCompactionStrategyTest.java       |    3 +-
 .../LeveledCompactionStrategyTest.java          |    2 +-
 .../SizeTieredCompactionStrategyTest.java       |    2 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |    5 +-
 .../cassandra/io/sstable/DescriptorTest.java    |   52 +-
 .../io/sstable/IndexSummaryManagerTest.java     |    1 +
 .../cassandra/io/sstable/LegacySSTableTest.java |   15 +-
 .../io/sstable/SSTableMetadataTest.java         |    1 +
 .../cassandra/io/sstable/SSTableReaderTest.java |    1 +
 .../io/sstable/SSTableScannerTest.java          |   11 +-
 .../cassandra/io/sstable/SSTableUtils.java      |    9 +-
 .../metadata/MetadataSerializerTest.java        |    2 +-
 .../apache/cassandra/repair/ValidatorTest.java  |    3 +-
 .../streaming/StreamTransferTaskTest.java       |    2 +-
 .../streaming/StreamingTransferTest.java        |    2 +-
 .../cassandra/tools/SSTableExportTest.java      |   20 +-
 .../cassandra/tools/SSTableImportTest.java      |    2 +-
 tools/cqlstress-example.yaml                    |    9 +-
 tools/cqlstress-insanity-example.yaml           |    2 -
 153 files changed, 5202 insertions(+), 4473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0dae098..524e776 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Extend Descriptor to include a format value and refactor reader/writer apis 
(CASSANDRA-7443)
  * Integrate JMH for microbenchmarks (CASSANDRA-8151)
  * Keep sstable levels when bootstrapping (CASSANDRA-7460)
  * Add Sigar library and perform basic OS settings check on startup 
(CASSANDRA-7838)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 9cd80c2..7a6476c 100644
--- a/build.xml
+++ b/build.xml
@@ -334,7 +334,7 @@
           <dependency groupId="com.googlecode.json-simple" 
artifactId="json-simple" version="1.1"/>
           <dependency groupId="com.boundary" artifactId="high-scale-lib" 
version="1.0.6"/>
           <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="0.2.6"/>
-          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7">
+          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.5">
                <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
           </dependency>
           <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.11"/>
@@ -418,10 +418,11 @@
        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core"/>
-        <dependency groupId="net.ju-n.compile-command-annotations" 
artifactId="compile-command-annotations"/>
         <dependency groupId="org.javassist" artifactId="javassist"/>
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
         <dependency groupId="org.openjdk.jmh" 
artifactId="jmh-generator-annprocess"/>
+           <dependency groupId="net.ju-n.compile-command-annotations" 
artifactId="compile-command-annotations"/>
+        <dependency groupId="org.javassist" artifactId="javassist" />
       </artifact:pom>
 
       <artifact:pom id="coverage-deps-pom"
@@ -475,10 +476,9 @@
         <dependency groupId="org.mindrot" artifactId="jbcrypt"/>
         <dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
         <dependency groupId="com.addthis.metrics" 
artifactId="reporter-config"/>
-        <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7"/>
+        <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.5"/>
         <dependency groupId="com.clearspring.analytics" artifactId="stream" 
version="2.5.2" />
         <dependency groupId="net.sf.supercsv" artifactId="super-csv" 
version="2.1.0" />
-        <dependency groupId="org.javassist" artifactId="javassist"/>
 
         <dependency groupId="ch.qos.logback" artifactId="logback-core"/>
         <dependency groupId="ch.qos.logback" artifactId="logback-classic"/>
@@ -492,6 +492,7 @@
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
        <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" optional="true"/>
 
+
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna" 
version="4.0.0"/>
         
@@ -1110,7 +1111,8 @@
         <jvmarg value="-Xss256k"/>
         <jvmarg 
value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg 
value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
-       <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>        
+           <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
+        <jvmarg value="-Dcassandra.test.sstableformatdevelopment=true"/>
        <optjvmargs/>
         <classpath>
           <path refid="cassandra.classpath" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 1e2a2e1..cf4d761 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -44,6 +44,8 @@ import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -1460,17 +1462,17 @@ public final class CFMetaData
         return (cfName + "_" + columnName + "_idx").replaceAll("\\W", "");
     }
 
-    public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, 
Descriptor.Version version)
+    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, Version 
version)
     {
         return getOnDiskIterator(in, ColumnSerializer.Flag.LOCAL, 
Integer.MIN_VALUE, version);
     }
 
-    public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
+    public Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Version version)
     {
-        return AbstractCell.onDiskIterator(in, flag, expireBefore, version, 
comparator);
+        return version.getSSTableFormat().getOnDiskIterator(in, flag, 
expireBefore, this, version);
     }
 
-    public AtomDeserializer getOnDiskDeserializer(DataInput in, 
Descriptor.Version version)
+    public AtomDeserializer getOnDiskDeserializer(DataInput in, Version 
version)
     {
         return new AtomDeserializer(comparator, in, 
ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8659c94..00e875b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Longs;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.AllowAllAuthenticator;
@@ -98,6 +99,8 @@ public class DatabaseDescriptor
 
     private static Config conf;
 
+    private static SSTableFormat.Type sstable_format = SSTableFormat.Type.BIG;
+
     private static IAuthenticator authenticator = new AllowAllAuthenticator();
     private static IAuthorizer authorizer = new AllowAllAuthorizer();
 
@@ -1544,6 +1547,12 @@ public class DatabaseDescriptor
         return conf.inter_dc_tcp_nodelay;
     }
 
+
+    public static SSTableFormat.Type getSSTableFormat()
+    {
+        return sstable_format;
+    }
+
     public static MemtablePool getMemtableAllocatorPool()
     {
         long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractCell.java 
b/src/java/org/apache/cassandra/db/AbstractCell.java
index f27871f..de86126 100644
--- a/src/java/org/apache/cassandra/db/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/AbstractCell.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -39,7 +40,7 @@ public abstract class AbstractCell implements Cell
     public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
                                                       final 
ColumnSerializer.Flag flag,
                                                       final int expireBefore,
-                                                      final Descriptor.Version 
version,
+                                                      final Version version,
                                                       final CellNameType type)
     {
         return new AbstractIterator<OnDiskAtom>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java 
b/src/java/org/apache/cassandra/db/AtomDeserializer.java
index 799ed0e..0c43422 100644
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 
 /**
  * Helper class to deserialize OnDiskAtom efficiently.
@@ -40,9 +41,9 @@ public class AtomDeserializer
     private final DataInput in;
     private final ColumnSerializer.Flag flag;
     private final int expireBefore;
-    private final Descriptor.Version version;
+    private final Version version;
 
-    public AtomDeserializer(CellNameType type, DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
+    public AtomDeserializer(CellNameType type, DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Version version)
     {
         this.type = type;
         this.nameDeserializer = type.newDeserializer(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 18d9a17..7f04cba 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +45,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java 
b/src/java/org/apache/cassandra/db/CollationController.java
index ff5fe88..922cbfe 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.SearchIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java 
b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index e2aeb6c..29866d6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -146,7 +147,7 @@ public class ColumnFamilySerializer implements 
IVersionedSerializer<ColumnFamily
         throw new UnsupportedOperationException();
     }
 
-    public ColumnFamily deserializeFromSSTable(DataInput in, 
Descriptor.Version version)
+    public ColumnFamily deserializeFromSSTable(DataInput in, Version version)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index aa266be..a23b2ad 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -34,6 +34,10 @@ import com.google.common.util.concurrent.*;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -441,7 +445,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             generations.add(desc.generation);
             if (!desc.isCompatible())
                 throw new RuntimeException(String.format("Incompatible SSTable 
found. Current version %s is unable to read file: %s. Please run 
upgradesstables.",
-                                                          
Descriptor.Version.CURRENT, desc));
+                        desc.getFormat().getLatestVersion(), desc));
         }
         Collections.sort(generations);
         int value = (generations.size() > 0) ? 
(generations.get(generations.size() - 1)) : 0;
@@ -672,7 +676,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         Set<Descriptor> currentDescriptors = new HashSet<Descriptor>();
         for (SSTableReader sstable : data.getView().sstables)
             currentDescriptors.add(sstable.descriptor);
-        Set<SSTableReader> newSSTables = new HashSet<SSTableReader>();
+        Set<SSTableReader> newSSTables = new HashSet<>();
 
         Directories.SSTableLister lister = 
directories.sstableLister().skipTemporary(true);
         for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
@@ -686,8 +690,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             if (!descriptor.isCompatible())
                 throw new RuntimeException(String.format("Can't open 
incompatible SSTable! Current version %s, found file: %s",
-                                                         
Descriptor.Version.CURRENT,
-                                                         descriptor));
+                        descriptor.getFormat().getLatestVersion(),
+                        descriptor));
 
             // force foreign sstables to level 0
             try
@@ -711,7 +715,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                                                descriptor.ksname,
                                                descriptor.cfname,
                                                
fileIndexGenerator.incrementAndGet(),
-                                               Descriptor.Type.FINAL);
+                                               Descriptor.Type.FINAL,
+                                               descriptor.formatType);
             }
             while (new 
File(newDescriptor.filenameFor(Component.DATA)).exists());
 
@@ -780,17 +785,23 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public String getTempSSTablePath(File directory)
     {
-        return getTempSSTablePath(directory, Descriptor.Version.CURRENT);
+        return getTempSSTablePath(directory, 
DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), 
DatabaseDescriptor.getSSTableFormat());
     }
 
-    private String getTempSSTablePath(File directory, Descriptor.Version 
version)
+    public String getTempSSTablePath(File directory, SSTableFormat.Type format)
+    {
+        return getTempSSTablePath(directory, format.info.getLatestVersion(), 
format);
+    }
+
+    private String getTempSSTablePath(File directory, Version version, 
SSTableFormat.Type format)
     {
         Descriptor desc = new Descriptor(version,
                                          directory,
                                          keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
-                                         Descriptor.Type.TEMP);
+                                         Descriptor.Type.TEMP,
+                                         format);
         return desc.filenameFor(Component.DATA);
     }
 
@@ -2193,7 +2204,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public List<SSTableReader> getSnapshotSSTableReader(String tag) throws 
IOException
     {
         Map<Descriptor, Set<Component>> snapshots = 
directories.sstableLister().snapshots(tag).list();
-        List<SSTableReader> readers = new 
ArrayList<SSTableReader>(snapshots.size());
+        List<SSTableReader> readers = new ArrayList<>(snapshots.size());
         for (Map.Entry<Descriptor, Set<Component>> entries : 
snapshots.entrySet())
             readers.add(SSTableReader.open(entries.getKey(), 
entries.getValue(), metadata, partitioner));
         return readers;
@@ -2779,7 +2790,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         assert data.getCompacting().isEmpty() : data.getCompacting();
 
-        List<SSTableReader> truncatedSSTables = new ArrayList<SSTableReader>();
+        List<SSTableReader> truncatedSSTables = new ArrayList<>();
 
         for (SSTableReader sstable : getSSTables())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index 7393323..d106190 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,13 +24,13 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.notifications.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java 
b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 3e5e845..048324a 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -147,7 +147,7 @@ public class DeletionInfo implements IMeasurableMemory
     /**
      * Returns a new {@link InOrderTester} in forward order.
      */
-    InOrderTester inOrderTester()
+    public InOrderTester inOrderTester()
     {
         return inOrderTester(false);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index ca43df6..8986154 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +45,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.pager.QueryPagers;
@@ -260,7 +260,7 @@ public class Keyspace
      */
     public List<SSTableReader> getAllSSTables()
     {
-        List<SSTableReader> list = new 
ArrayList<SSTableReader>(columnFamilyStores.size());
+        List<SSTableReader> list = new ArrayList<>(columnFamilyStores.size());
         for (ColumnFamilyStore cfStore : columnFamilyStores.values())
             list.addAll(cfStore.getSSTables());
         return list;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index a711833..80376f7 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Throwables;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,8 +40,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.dht.LongToken;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -384,12 +385,8 @@ public class Memtable
         public SSTableWriter createFlushWriter(String filename)
         {
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-            return new SSTableWriter(filename,
-                                     rows.size(),
-                                     ActiveRepairService.UNREPAIRED_SSTABLE,
-                                     cfs.metadata,
-                                     cfs.partitioner,
-                                     sstableMetadataCollector);
+
+            return SSTableWriter.create(Descriptor.fromFilename(filename), 
(long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, 
cfs.partitioner, sstableMetadataCollector);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java 
b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index b53e43b..f97ca42 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
 
@@ -65,12 +66,12 @@ public interface OnDiskAtom
             }
         }
 
-        public OnDiskAtom deserializeFromSSTable(DataInput in, 
Descriptor.Version version) throws IOException
+        public OnDiskAtom deserializeFromSSTable(DataInput in, Version 
version) throws IOException
         {
             return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, 
Integer.MIN_VALUE, version);
         }
 
-        public OnDiskAtom deserializeFromSSTable(DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) 
throws IOException
+        public OnDiskAtom deserializeFromSSTable(DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Version version) throws 
IOException
         {
             Composite name = type.serializer().deserialize(in);
             if (name.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java 
b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 6a246f9..4a0037b 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
@@ -259,7 +260,7 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
             DeletionTime.serializer.serialize(t.data, out);
         }
 
-        public RangeTombstone deserializeFromSSTable(DataInput in, 
Descriptor.Version version) throws IOException
+        public RangeTombstone deserializeFromSSTable(DataInput in, Version 
version) throws IOException
         {
             Composite min = type.serializer().deserialize(in);
 
@@ -268,14 +269,14 @@ public class RangeTombstone extends Interval<Composite, 
DeletionTime> implements
             return deserializeBody(in, min, version);
         }
 
-        public RangeTombstone deserializeBody(DataInput in, Composite min, 
Descriptor.Version version) throws IOException
+        public RangeTombstone deserializeBody(DataInput in, Composite min, 
Version version) throws IOException
         {
             Composite max = type.serializer().deserialize(in);
             DeletionTime dt = DeletionTime.serializer.deserialize(in);
             return new RangeTombstone(min, max, dt);
         }
 
-        public void skipBody(DataInput in, Descriptor.Version version) throws 
IOException
+        public void skipBody(DataInput in, Version version) throws IOException
         {
             type.serializer().skip(in);
             DeletionTime.serializer.skip(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java 
b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 01035c4..4ff61ce 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -27,15 +27,14 @@ import java.util.List;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
 
-public class RowIndexEntry implements IMeasurableMemory
+public class RowIndexEntry<T> implements IMeasurableMemory
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new 
RowIndexEntry(0));
 
@@ -46,12 +45,12 @@ public class RowIndexEntry implements IMeasurableMemory
         this.position = position;
     }
 
-    protected int promotedSize(CType type)
+    public int promotedSize(ISerializer<T> idxSerializer)
     {
         return 0;
     }
 
-    public static RowIndexEntry create(long position, DeletionTime 
deletionTime, ColumnIndex index)
+    public static RowIndexEntry<IndexHelper.IndexInfo> create(long position, 
DeletionTime deletionTime, ColumnIndex index)
     {
         assert index != null;
         assert deletionTime != null;
@@ -62,7 +61,7 @@ public class RowIndexEntry implements IMeasurableMemory
         if (index.columnsIndex.size() > 1)
             return new IndexedEntry(position, deletionTime, 
index.columnsIndex);
         else
-            return new RowIndexEntry(position);
+            return new RowIndexEntry<>(position);
     }
 
     /**
@@ -79,7 +78,16 @@ public class RowIndexEntry implements IMeasurableMemory
         throw new UnsupportedOperationException();
     }
 
-    public List<IndexHelper.IndexInfo> columnsIndex()
+    /**
+     * @return the offset to the start of the header information for this row.
+     * For some formats this may not be the start of the row.
+     */
+    public long headerOffset()
+    {
+        return 0;
+    }
+
+    public List<T> columnsIndex()
     {
         return Collections.emptyList();
     }
@@ -89,31 +97,37 @@ public class RowIndexEntry implements IMeasurableMemory
         return EMPTY_SIZE;
     }
 
-    public static class Serializer
+    public static interface IndexSerializer<T>
     {
-        private final CType type;
+        void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws 
IOException;
+        RowIndexEntry<T> deserialize(DataInput in, Version version) throws 
IOException;
+        public int serializedSize(RowIndexEntry<T> rie);
+    }
 
-        public Serializer(CType type)
+    public static class Serializer implements 
IndexSerializer<IndexHelper.IndexInfo>
+    {
+        private final ISerializer<IndexHelper.IndexInfo> idxSerializer;
+
+        public Serializer(ISerializer<IndexHelper.IndexInfo> idxSerializer)
         {
-            this.type = type;
+            this.idxSerializer = idxSerializer;
         }
 
-        public void serialize(RowIndexEntry rie, DataOutputPlus out) throws 
IOException
+        public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, 
DataOutputPlus out) throws IOException
         {
             out.writeLong(rie.position);
-            out.writeInt(rie.promotedSize(type));
+            out.writeInt(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
                 DeletionTime.serializer.serialize(rie.deletionTime(), out);
                 out.writeInt(rie.columnsIndex().size());
-                ISerializer<IndexHelper.IndexInfo> idxSerializer = 
type.indexSerializer();
                 for (IndexHelper.IndexInfo info : rie.columnsIndex())
                     idxSerializer.serialize(info, out);
             }
         }
 
-        public RowIndexEntry deserialize(DataInput in, Descriptor.Version 
version) throws IOException
+        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in, 
Version version) throws IOException
         {
             long position = in.readLong();
 
@@ -123,8 +137,7 @@ public class RowIndexEntry implements IMeasurableMemory
                 DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
 
                 int entries = in.readInt();
-                ISerializer<IndexHelper.IndexInfo> idxSerializer = 
type.indexSerializer();
-                List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<IndexHelper.IndexInfo>(entries);
+                List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>(entries);
                 for (int i = 0; i < entries; i++)
                     columnsIndex.add(idxSerializer.deserialize(in));
 
@@ -132,7 +145,7 @@ public class RowIndexEntry implements IMeasurableMemory
             }
             else
             {
-                return new RowIndexEntry(position);
+                return new RowIndexEntry<>(position);
             }
         }
 
@@ -151,9 +164,9 @@ public class RowIndexEntry implements IMeasurableMemory
             FileUtils.skipBytesFully(in, size);
         }
 
-        public int serializedSize(RowIndexEntry rie)
+        public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
         {
-            int size = TypeSizes.NATIVE.sizeof(rie.position) + 
TypeSizes.NATIVE.sizeof(rie.promotedSize(type));
+            int size = TypeSizes.NATIVE.sizeof(rie.position) + 
TypeSizes.NATIVE.sizeof(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
@@ -162,11 +175,11 @@ public class RowIndexEntry implements IMeasurableMemory
                 size += 
DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE);
                 size += TypeSizes.NATIVE.sizeof(index.size());
 
-                ISerializer<IndexHelper.IndexInfo> idxSerializer = 
type.indexSerializer();
                 for (IndexHelper.IndexInfo info : index)
                     size += idxSerializer.serializedSize(info, 
TypeSizes.NATIVE);
             }
 
+
             return size;
         }
     }
@@ -174,7 +187,7 @@ public class RowIndexEntry implements IMeasurableMemory
     /**
      * An entry in the row index for a row whose columns are indexed.
      */
-    private static class IndexedEntry extends RowIndexEntry
+    private static class IndexedEntry extends 
RowIndexEntry<IndexHelper.IndexInfo>
     {
         private final DeletionTime deletionTime;
         private final List<IndexHelper.IndexInfo> columnsIndex;
@@ -204,12 +217,11 @@ public class RowIndexEntry implements IMeasurableMemory
         }
 
         @Override
-        public int promotedSize(CType type)
+        public int promotedSize(ISerializer<IndexHelper.IndexInfo> 
idxSerializer)
         {
             TypeSizes typeSizes = TypeSizes.NATIVE;
             long size = DeletionTime.serializer.serializedSize(deletionTime, 
typeSizes);
             size += typeSizes.sizeof(columnsIndex.size()); // number of entries
-            ISerializer<IndexHelper.IndexInfo> idxSerializer = 
type.indexSerializer();
             for (IndexHelper.IndexInfo info : columnsIndex)
                 size += idxSerializer.serializedSize(info, typeSizes);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java 
b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 5bd2d9b..6ac74ae 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -26,7 +26,7 @@ import 
org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index bfd92e9..5c0d935 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -29,6 +29,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,6 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java 
b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
deleted file mode 100644
index 7012321..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * This is a reader that finds the block for a starting column and returns 
blocks before/after it for each next call.
- * This function assumes that the CF is sorted by name and exploits the name 
index.
- */
-class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements 
OnDiskAtomIterator
-{
-    private final ColumnFamily emptyColumnFamily;
-
-    private final SSTableReader sstable;
-    private final List<IndexHelper.IndexInfo> indexes;
-    private final FileDataInput originalInput;
-    private FileDataInput file;
-    private final boolean reversed;
-    private final ColumnSlice[] slices;
-    private final BlockFetcher fetcher;
-    private final Deque<OnDiskAtom> blockColumns = new 
ArrayDeque<OnDiskAtom>();
-    private final CellNameType comparator;
-
-    // Holds range tombstone in reverse queries. See addColumn()
-    private final Deque<OnDiskAtom> rangeTombstonesReversed;
-
-    /**
-     * This slice reader assumes that slices are sorted correctly, e.g. that 
for forward lookup slices are in
-     * lexicographic order of start elements and that for reverse lookup they 
are in reverse lexicographic order of
-     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] 
reverse: [h,g],[e,d],[b,a]. This reader also
-     * assumes that validation has been performed in terms of intervals (no 
overlapping intervals).
-     */
-    public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, 
FileDataInput input, ColumnSlice[] slices, boolean reversed)
-    {
-        Tracing.trace("Seeking to partition indexed section in data file");
-        this.sstable = sstable;
-        this.originalInput = input;
-        this.reversed = reversed;
-        this.slices = slices;
-        this.comparator = sstable.metadata.comparator;
-        this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() 
: null;
-
-        try
-        {
-            this.indexes = indexEntry.columnsIndex();
-            emptyColumnFamily = 
ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            if (indexes.isEmpty())
-            {
-                setToRowStart(indexEntry, input);
-                
emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
-                fetcher = new SimpleBlockFetcher();
-            }
-            else
-            {
-                emptyColumnFamily.delete(indexEntry.deletionTime());
-                fetcher = new IndexedBlockFetcher(indexEntry.position);
-            }
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, file.getPath());
-        }
-    }
-
-    /**
-     * Sets the seek position to the start of the row for column scanning.
-     */
-    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) 
throws IOException
-    {
-        if (in == null)
-        {
-            this.file = sstable.getFileDataInput(rowEntry.position);
-        }
-        else
-        {
-            this.file = in;
-            in.seek(rowEntry.position);
-        }
-        
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return emptyColumnFamily;
-    }
-
-    public DecoratedKey getKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        while (true)
-        {
-            if (reversed)
-            {
-                // Return all tombstone for the block first (see addColumn() 
below)
-                OnDiskAtom column = rangeTombstonesReversed.poll();
-                if (column != null)
-                    return column;
-            }
-
-            OnDiskAtom column = blockColumns.poll();
-            if (column == null)
-            {
-                if (!fetcher.fetchMoreData())
-                    return endOfData();
-            }
-            else
-            {
-                return column;
-            }
-        }
-    }
-
-    public void close() throws IOException
-    {
-        if (originalInput == null && file != null)
-            file.close();
-    }
-
-    protected void addColumn(OnDiskAtom col)
-    {
-        if (reversed)
-        {
-            /*
-             * We put range tomstone markers at the beginning of the range 
they delete. But for reversed queries,
-             * the caller still need to know about a RangeTombstone before it 
sees any column that it covers.
-             * To make that simple, we keep said tombstones separate and 
return them all before any column for
-             * a given block.
-             */
-            if (col instanceof RangeTombstone)
-                rangeTombstonesReversed.addFirst(col);
-            else
-                blockColumns.addFirst(col);
-        }
-        else
-        {
-            blockColumns.addLast(col);
-        }
-    }
-
-    private abstract class BlockFetcher
-    {
-        protected int currentSliceIdx;
-
-        protected BlockFetcher(int sliceIdx)
-        {
-            this.currentSliceIdx = sliceIdx;
-        }
-
-        /*
-         * Return the smallest key selected by the current ColumnSlice.
-         */
-        protected Composite currentStart()
-        {
-            return reversed ? slices[currentSliceIdx].finish : 
slices[currentSliceIdx].start;
-        }
-
-        /*
-         * Return the biggest key selected by the current ColumnSlice.
-         */
-        protected Composite currentFinish()
-        {
-            return reversed ? slices[currentSliceIdx].start : 
slices[currentSliceIdx].finish;
-        }
-
-        protected abstract boolean setNextSlice();
-
-        protected abstract boolean fetchMoreData();
-
-        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
-        {
-            return isBeforeSliceStart(column.name());
-        }
-
-        protected boolean isBeforeSliceStart(Composite name)
-        {
-            Composite start = currentStart();
-            return !start.isEmpty() && comparator.compare(name, start) < 0;
-        }
-
-        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
-        {
-            Composite finish = currentFinish();
-            return finish.isEmpty() || comparator.compare(column.name(), 
finish) <= 0;
-        }
-
-        protected boolean isAfterSliceFinish(Composite name)
-        {
-            Composite finish = currentFinish();
-            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
-        }
-    }
-
-    private class IndexedBlockFetcher extends BlockFetcher
-    {
-        // where this row starts
-        private final long columnsStart;
-
-        // the index entry for the next block to deserialize
-        private int nextIndexIdx = -1;
-
-        // index of the last block we've read from disk;
-        private int lastDeserializedBlock = -1;
-
-        // For reversed, keep columns at the beginning of the last 
deserialized block that
-        // may still match a slice
-        private final Deque<OnDiskAtom> prefetched;
-
-        public IndexedBlockFetcher(long columnsStart)
-        {
-            super(-1);
-            this.columnsStart = columnsStart;
-            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
-            setNextSlice();
-        }
-
-        protected boolean setNextSlice()
-        {
-            while (++currentSliceIdx < slices.length)
-            {
-                nextIndexIdx = 
IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, 
reversed, nextIndexIdx);
-                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
-                    // no index block for that slice
-                    continue;
-
-                // Check if we can exclude this slice entirely from the index
-                IndexInfo info = indexes.get(nextIndexIdx);
-                if (reversed)
-                {
-                    if (!isBeforeSliceStart(info.lastName))
-                        return true;
-                }
-                else
-                {
-                    if (!isAfterSliceFinish(info.firstName))
-                        return true;
-                }
-            }
-            nextIndexIdx = -1;
-            return false;
-        }
-
-        protected boolean hasMoreSlice()
-        {
-            return currentSliceIdx < slices.length;
-        }
-
-        protected boolean fetchMoreData()
-        {
-            if (!hasMoreSlice())
-                return false;
-
-            // If we read blocks in reversed disk order, we may have columns 
from the previous block to handle.
-            // Note that prefetched keeps columns in reversed disk order.
-            if (reversed && !prefetched.isEmpty())
-            {
-                boolean gotSome = false;
-                // Avoids some comparison when we know it's not useful
-                boolean inSlice = false;
-
-                OnDiskAtom prefetchedCol;
-                while ((prefetchedCol = prefetched.peek() ) != null)
-                {
-                    // col is before slice, we update the slice
-                    if (isColumnBeforeSliceStart(prefetchedCol))
-                    {
-                        inSlice = false;
-                        if (!setNextSlice())
-                            return false;
-                    }
-                    // col is within slice, all columns
-                    // (we go in reverse, so as soon as we are in a slice, no 
need to check
-                    // we're after the slice until we change slice)
-                    else if (inSlice || 
isColumnBeforeSliceFinish(prefetchedCol))
-                    {
-                        blockColumns.addLast(prefetched.poll());
-                        gotSome = true;
-                        inSlice = true;
-                    }
-                    // if col is after slice, ignore
-                    else
-                    {
-                        prefetched.poll();
-                    }
-                }
-                if (gotSome)
-                    return true;
-            }
-            try
-            {
-                return getNextBlock();
-            }
-            catch (IOException e)
-            {
-                throw new CorruptSSTableException(e, file.getPath());
-            }
-        }
-
-        private boolean getNextBlock() throws IOException
-        {
-            if (lastDeserializedBlock == nextIndexIdx)
-            {
-                if (reversed)
-                    nextIndexIdx--;
-                else
-                    nextIndexIdx++;
-            }
-            lastDeserializedBlock = nextIndexIdx;
-
-            // Are we done?
-            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= 
indexes.size())
-                return false;
-
-            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
-
-            /* seek to the correct offset to the data, and calculate the data 
size */
-            long positionToSeek = columnsStart + currentIndex.offset;
-
-            // With new promoted indexes, our first seek in the data file will 
happen at that point.
-            if (file == null)
-                file = originalInput == null ? 
sstable.getFileDataInput(positionToSeek) : originalInput;
-
-            AtomDeserializer deserializer = 
emptyColumnFamily.metadata().getOnDiskDeserializer(file, 
sstable.descriptor.version);
-
-            file.seek(positionToSeek);
-            FileMark mark = file.mark();
-
-            // We remenber when we are whithin a slice to avoid some comparison
-            boolean inSlice = false;
-
-            // scan from index start
-            while (file.bytesPastMark(mark) < currentIndex.width || 
deserializer.hasUnprocessed())
-            {
-                // col is before slice
-                // (If in slice, don't bother checking that until we change 
slice)
-                Composite start = currentStart();
-                if (!inSlice && !start.isEmpty() && 
deserializer.compareNextTo(start) < 0)
-                {
-                    if (reversed)
-                    {
-                        // the next slice select columns that are before the 
current one, so it may
-                        // match this column, so keep it around.
-                        prefetched.addFirst(deserializer.readNext());
-                    }
-                    else
-                    {
-                        deserializer.skipNext();
-                    }
-                }
-                // col is within slice
-                else
-                {
-                    Composite finish = currentFinish();
-                    if (finish.isEmpty() || deserializer.compareNextTo(finish) 
<= 0)
-                    {
-                        inSlice = true;
-                        addColumn(deserializer.readNext());
-                    }
-                    // col is after slice.
-                    else
-                    {
-                        // When reading forward, if we hit a column that sorts 
after the current slice, it means we're done with this slice.
-                        // For reversed, this may either mean that we're done 
with the current slice, or that we need to read the previous
-                        // index block. However, we can be sure that we are in 
the first case though (the current slice is done) if the first
-                        // columns of the block were not part of the current 
slice, i.e. if we have columns in prefetched.
-                        if (reversed && prefetched.isEmpty())
-                            break;
-
-                        if (!setNextSlice())
-                            break;
-
-                        inSlice = false;
-
-                        // The next index block now corresponds to the first 
block that may have columns for the newly set slice.
-                        // So if it's different from the current block, we're 
done with this block. And in that case, we know
-                        // that our prefetched columns won't match.
-                        if (nextIndexIdx != lastDeserializedBlock)
-                        {
-                            if (reversed)
-                                prefetched.clear();
-                            break;
-                        }
-
-                        // Even if the next slice may have column in this 
blocks, if we're reversed, those columns have been
-                        // prefetched and we're done with that block
-                        if (reversed)
-                            break;
-
-                        // otherwise, we will deal with that column at the 
next iteration
-                    }
-                }
-            }
-            return true;
-        }
-    }
-
-    private class SimpleBlockFetcher extends BlockFetcher
-    {
-        public SimpleBlockFetcher() throws IOException
-        {
-            // Since we have to deserialize in order and will read all slices 
might as well reverse the slices and
-            // behave as if it was not reversed
-            super(reversed ? slices.length - 1 : 0);
-
-            // We remenber when we are whithin a slice to avoid some comparison
-            boolean inSlice = false;
-
-            AtomDeserializer deserializer = 
emptyColumnFamily.metadata().getOnDiskDeserializer(file, 
sstable.descriptor.version);
-            while (deserializer.hasNext())
-            {
-                // col is before slice
-                // (If in slice, don't bother checking that until we change 
slice)
-                Composite start = currentStart();
-                if (!inSlice && !start.isEmpty() && 
deserializer.compareNextTo(start) < 0)
-                {
-                    deserializer.skipNext();
-                    continue;
-                }
-
-                // col is within slice
-                Composite finish = currentFinish();
-                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 
0)
-                {
-                    inSlice = true;
-                    addColumn(deserializer.readNext());
-                }
-                // col is after slice. more slices?
-                else
-                {
-                    inSlice = false;
-                    if (!setNextSlice())
-                        break;
-                }
-            }
-        }
-
-        protected boolean setNextSlice()
-        {
-            if (reversed)
-            {
-                if (currentSliceIdx <= 0)
-                    return false;
-
-                currentSliceIdx--;
-            }
-            else
-            {
-                if (currentSliceIdx >= slices.length - 1)
-                    return false;
-
-                currentSliceIdx++;
-            }
-            return true;
-        }
-
-        protected boolean fetchMoreData()
-        {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
deleted file mode 100644
index 224b63f..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> 
implements OnDiskAtomIterator
-{
-    private ColumnFamily cf;
-    private final SSTableReader sstable;
-    private FileDataInput fileToClose;
-    private Iterator<OnDiskAtom> iter;
-    public final SortedSet<CellName> columns;
-    public final DecoratedKey key;
-
-    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, 
SortedSet<CellName> columns)
-    {
-        assert columns != null;
-        this.sstable = sstable;
-        this.columns = columns;
-        this.key = key;
-
-        RowIndexEntry indexEntry = sstable.getPosition(key, 
SSTableReader.Operator.EQ);
-        if (indexEntry == null)
-            return;
-
-        try
-        {
-            read(sstable, null, indexEntry);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-        finally
-        {
-            if (fileToClose != null)
-                FileUtils.closeQuietly(fileToClose);
-        }
-    }
-
-    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, 
DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
-    {
-        assert columns != null;
-        this.sstable = sstable;
-        this.columns = columns;
-        this.key = key;
-
-        try
-        {
-            read(sstable, file, indexEntry);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-    }
-
-    private FileDataInput createFileDataInput(long position)
-    {
-        fileToClose = sstable.getFileDataInput(position);
-        return fileToClose;
-    }
-
-    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry 
indexEntry)
-    throws IOException
-    {
-        List<IndexHelper.IndexInfo> indexList;
-
-        // If the entry is not indexed or the index is not promoted, read from 
the row start
-        if (!indexEntry.isIndexed())
-        {
-            if (file == null)
-                file = createFileDataInput(indexEntry.position);
-            else
-                file.seek(indexEntry.position);
-
-            DecoratedKey keyInDisk = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
-            assert keyInDisk.equals(key) : String.format("%s != %s in %s", 
keyInDisk, key, file.getPath());
-        }
-
-        indexList = indexEntry.columnsIndex();
-
-        if (!indexEntry.isIndexed())
-        {
-            ColumnFamilySerializer serializer = ColumnFamily.serializer;
-            try
-            {
-                cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-                cf.delete(DeletionTime.serializer.deserialize(file));
-            }
-            catch (Exception e)
-            {
-                throw new IOException(serializer + " failed to deserialize " + 
sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, 
e);
-            }
-        }
-        else
-        {
-            cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            cf.delete(indexEntry.deletionTime());
-        }
-
-        List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
-        if (indexList.isEmpty())
-        {
-            readSimpleColumns(file, columns, result);
-        }
-        else
-        {
-            readIndexedColumns(sstable.metadata, file, columns, indexList, 
indexEntry.position, result);
-        }
-
-        // create an iterator view of the columns we read
-        iter = result.iterator();
-    }
-
-    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> 
columnNames, List<OnDiskAtom> result)
-    {
-        Iterator<OnDiskAtom> atomIterator = 
cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
-        int n = 0;
-        while (atomIterator.hasNext())
-        {
-            OnDiskAtom column = atomIterator.next();
-            if (column instanceof Cell)
-            {
-                if (columnNames.contains(column.name()))
-                {
-                    result.add(column);
-                    if (++n >= columns.size())
-                        break;
-                }
-            }
-            else
-            {
-                result.add(column);
-            }
-        }
-    }
-
-    private void readIndexedColumns(CFMetaData metadata,
-                                    FileDataInput file,
-                                    SortedSet<CellName> columnNames,
-                                    List<IndexHelper.IndexInfo> indexList,
-                                    long basePosition,
-                                    List<OnDiskAtom> result)
-    throws IOException
-    {
-        /* get the various column ranges we have to read */
-        CellNameType comparator = metadata.comparator;
-        List<IndexHelper.IndexInfo> ranges = new 
ArrayList<IndexHelper.IndexInfo>();
-        int lastIndexIdx = -1;
-        for (CellName name : columnNames)
-        {
-            int index = IndexHelper.indexFor(name, indexList, comparator, 
false, lastIndexIdx);
-            if (index < 0 || index == indexList.size())
-                continue;
-            IndexHelper.IndexInfo indexInfo = indexList.get(index);
-            // Check the index block does contain the column names and that we 
haven't inserted this block yet.
-            if (comparator.compare(name, indexInfo.firstName) < 0 || index == 
lastIndexIdx)
-                continue;
-
-            ranges.add(indexInfo);
-            lastIndexIdx = index;
-        }
-
-        if (ranges.isEmpty())
-            return;
-
-        Iterator<CellName> toFetch = columnNames.iterator();
-        CellName nextToFetch = toFetch.next();
-        for (IndexHelper.IndexInfo indexInfo : ranges)
-        {
-            long positionToSeek = basePosition + indexInfo.offset;
-
-            // With new promoted indexes, our first seek in the data file will 
happen at that point.
-            if (file == null)
-                file = createFileDataInput(positionToSeek);
-
-            AtomDeserializer deserializer = 
cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
-            file.seek(positionToSeek);
-            FileMark mark = file.mark();
-            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch 
!= null)
-            {
-                int cmp = deserializer.compareNextTo(nextToFetch);
-                if (cmp == 0)
-                {
-                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
-                    result.add(deserializer.readNext());
-                    continue;
-                }
-
-                deserializer.skipNext();
-                if (cmp > 0)
-                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
-            }
-        }
-    }
-
-    public DecoratedKey getKey()
-    {
-        return key;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return cf;
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        if (iter == null || !iter.hasNext())
-            return endOfData();
-        return iter.next();
-    }
-
-    public void close() throws IOException { }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
deleted file mode 100644
index 0057d52..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-
-/**
- *  A Cell Iterator over SSTable
- */
-public class SSTableSliceIterator implements OnDiskAtomIterator
-{
-    private final OnDiskAtomIterator reader;
-    private final DecoratedKey key;
-
-    public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, 
ColumnSlice[] slices, boolean reversed)
-    {
-        this.key = key;
-        RowIndexEntry indexEntry = sstable.getPosition(key, 
SSTableReader.Operator.EQ);
-        this.reader = indexEntry == null ? null : createReader(sstable, 
indexEntry, null, slices, reversed);
-    }
-
-    /**
-     * An iterator for a slice within an SSTable
-     * @param sstable Keyspace for the CFS we are reading from
-     * @param file Optional parameter that input is read from.  If null is 
passed, this class creates an appropriate one automatically.
-     * If this class creates, it will close the underlying file when #close() 
is called.
-     * If a caller passes a non-null argument, this class will NOT close the 
underlying file when the iterator is closed (i.e. the caller is responsible for 
closing the file)
-     * In all cases the caller should explicitly #close() this iterator.
-     * @param key The key the requested slice resides under
-     * @param slices the column slices
-     * @param reversed Results are returned in reverse order iff reversed is 
true.
-     * @param indexEntry position of the row
-     */
-    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, 
DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry 
indexEntry)
-    {
-        this.key = key;
-        reader = createReader(sstable, indexEntry, file, slices, reversed);
-    }
-
-    private static OnDiskAtomIterator createReader(SSTableReader sstable, 
RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean 
reversed)
-    {
-        return slices.length == 1 && slices[0].start.isEmpty() && !reversed
-             ? new SimpleSliceReader(sstable, indexEntry, file, 
slices[0].finish)
-             : new IndexedSliceReader(sstable, indexEntry, file, slices, 
reversed);
-    }
-
-    public DecoratedKey getKey()
-    {
-        return key;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return reader == null ? null : reader.getColumnFamily();
-    }
-
-    public boolean hasNext()
-    {
-        return reader != null && reader.hasNext();
-    }
-
-    public OnDiskAtom next()
-    {
-        return reader.next();
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void close() throws IOException
-    {
-        if (reader != null)
-            reader.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java 
b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
deleted file mode 100644
index bdbf4bd..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements 
OnDiskAtomIterator
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(SimpleSliceReader.class);
-
-    private final FileDataInput file;
-    private final boolean needsClosing;
-    private final Composite finishColumn;
-    private final CellNameType comparator;
-    private final ColumnFamily emptyColumnFamily;
-    private final Iterator<OnDiskAtom> atomIterator;
-
-    public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, 
FileDataInput input, Composite finishColumn)
-    {
-        Tracing.trace("Seeking to partition beginning in data file");
-        this.finishColumn = finishColumn;
-        this.comparator = sstable.metadata.comparator;
-        try
-        {
-            if (input == null)
-            {
-                this.file = sstable.getFileDataInput(indexEntry.position);
-                this.needsClosing = true;
-            }
-            else
-            {
-                this.file = input;
-                input.seek(indexEntry.position);
-                this.needsClosing = false;
-            }
-
-            // Skip key and data size
-            ByteBufferUtil.skipShortLength(file);
-
-            emptyColumnFamily = 
ArrayBackedSortedColumns.factory.create(sstable.metadata);
-            
emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
-            atomIterator = 
emptyColumnFamily.metadata().getOnDiskIterator(file, 
sstable.descriptor.version);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        if (!atomIterator.hasNext())
-            return endOfData();
-
-        OnDiskAtom column = atomIterator.next();
-        if (!finishColumn.isEmpty() && comparator.compare(column.name(), 
finishColumn) > 0)
-            return endOfData();
-
-        return column;
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return emptyColumnFamily;
-    }
-
-    public void close() throws IOException
-    {
-        if (needsClosing)
-            file.close();
-    }
-
-    public DecoratedKey getKey()
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index e50a585..f914c2c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.PureJavaCrc32;
@@ -43,11 +44,13 @@ public class CommitLogDescriptor
     public static final int VERSION_12 = 2;
     public static final int VERSION_20 = 3;
     public static final int VERSION_21 = 4;
+    public static final int VERSION_30 = 5;
     /**
      * Increment this number if there is a changes in the commit log disc 
layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
-    public static final int current_version = VERSION_21;
+    @VisibleForTesting
+    public static final int current_version = VERSION_30;
 
     // [version, id, checksum]
     static final int HEADER_SIZE = 4 + 8 + 4;
@@ -126,6 +129,8 @@ public class CommitLogDescriptor
                 return MessagingService.VERSION_20;
             case VERSION_21:
                 return MessagingService.VERSION_21;
+            case VERSION_30:
+                return MessagingService.VERSION_30;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + 
version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java 
b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 31fc28e..ca1969f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -27,7 +27,7 @@ import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class ReplayPosition implements Comparable<ReplayPosition>

Reply via email to