http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 1d2bf48..f88fe92 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
 
 /**
  * a CompactedRow is an object that takes a bunch of rows (keys + 
columnfamilies)
@@ -47,7 +48,7 @@ public abstract class AbstractCompactedRow implements 
Closeable
      *
      * @return index information for the written row, or null if the 
compaction resulted in only expired tombstones.
      */
-    public abstract RowIndexEntry write(long currentPosition, DataOutputPlus 
out) throws IOException;
+    public abstract RowIndexEntry write(long currentPosition, SequentialWriter 
out) throws IOException;
 
     /**
      * update @param digest with the data bytes of the row (not including row 
key or row size).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 6a0e0df..d228cba 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +34,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableReader;
 
 /**
  * Pluggable compaction strategy determines how SSTables get merged.
@@ -369,7 +369,7 @@ public abstract class AbstractCompactionStrategy
             long keys = sstable.estimatedKeys();
             Set<Range<Token>> ranges = new 
HashSet<Range<Token>>(overlaps.size());
             for (SSTableReader overlap : overlaps)
-                ranges.add(new Range<Token>(overlap.first.getToken(), 
overlap.last.getToken(), overlap.partitioner));
+                ranges.add(new Range<>(overlap.first.getToken(), 
overlap.last.getToken(), overlap.partitioner));
             long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
             // next, calculate what percentage of columns we have within those 
keys
             long columns = sstable.getEstimatedColumnCount().mean() * 
remainingKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 59338f4..7631baa 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -22,7 +22,7 @@ import java.util.Set;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 
 public abstract class AbstractCompactionTask extends DiskAwareRunnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index ef27805..e57d01d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -24,13 +24,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
 /**
@@ -112,7 +112,7 @@ public class CompactionController implements AutoCloseable
         if (compacting == null)
             return Collections.<SSTableReader>emptySet();
 
-        List<SSTableReader> candidates = new ArrayList<SSTableReader>();
+        List<SSTableReader> candidates = new ArrayList<>();
 
         long minTimestamp = Long.MAX_VALUE;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 0c9b52a..86918bc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -24,11 +24,14 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
 public class CompactionIterable extends AbstractCompactionIterable
 {
+    final SSTableFormat format;
+
     private static final Comparator<OnDiskAtomIterator> comparator = new 
Comparator<OnDiskAtomIterator>()
     {
         public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
@@ -37,9 +40,10 @@ public class CompactionIterable extends 
AbstractCompactionIterable
         }
     };
 
-    public CompactionIterable(OperationType type, List<ICompactionScanner> 
scanners, CompactionController controller)
+    public CompactionIterable(OperationType type, List<ICompactionScanner> 
scanners, CompactionController controller, SSTableFormat.Type formatType)
     {
         super(controller, type, scanners);
+        this.format = formatType.info;
     }
 
     public CloseableIterator<AbstractCompactedRow> iterator()
@@ -71,7 +75,7 @@ public class CompactionIterable extends 
AbstractCompactionIterable
                 // create a new container for rows, since we're going to clear 
ours for the next one,
                 // and the AbstractCompactionRow code should be able to assume 
that the collection it receives
                 // won't be pulled out from under it.
-                return new LazilyCompactedRow(controller, 
ImmutableList.copyOf(rows));
+                return format.getCompactedRowWriter(controller, 
ImmutableList.copyOf(rows));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b08668e..18ad7ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -52,7 +52,9 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,10 +78,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -327,7 +326,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                     @Override
                     public boolean apply(SSTableReader sstable)
                     {
-                        return !(excludeCurrentVersion && 
sstable.descriptor.version.equals(Descriptor.Version.CURRENT));
+                        return !(excludeCurrentVersion && 
sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()));
                     }
                 });
             }
@@ -502,7 +501,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             {
                 // look up the sstables now that we're on the compaction 
executor, so we don't try to re-compact
                 // something that was already being compacted earlier.
-                Collection<SSTableReader> sstables = new 
ArrayList<SSTableReader>(dataFiles.size());
+                Collection<SSTableReader> sstables = new 
ArrayList<>(dataFiles.size());
                 for (Descriptor desc : dataFiles)
                 {
                     // inefficient but not in a performance sensitive path
@@ -845,12 +844,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                                              SSTableReader sstable)
     {
         FileUtils.createDirectory(compactionFileLocation);
-        return new 
SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
-                                 expectedBloomFilterSize,
-                                 repairedAt,
-                                 cfs.metadata,
-                                 cfs.partitioner,
-                                 new 
MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, 
sstable.getSSTableLevel()));
+
+        return 
SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
 expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel());
     }
 
     public static SSTableWriter 
createWriterForAntiCompaction(ColumnFamilyStore cfs,
@@ -875,8 +870,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                 break;
             }
         }
-        return new 
SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
-                                 expectedBloomFilterSize,
+        return 
SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
+                                 (long)expectedBloomFilterSize,
                                  repairedAt,
                                  cfs.metadata,
                                  cfs.partitioner,
@@ -1007,6 +1002,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     private void doAntiCompaction(ColumnFamilyStore cfs, 
Collection<Range<Token>> ranges,
                                                        
Collection<SSTableReader> repairedSSTables, long repairedAt)
     {
+
         // TODO(5351): we can do better here:
         logger.info("Performing anticompaction on {} sstables", 
repairedSSTables.size());
 
@@ -1044,6 +1040,7 @@ public class CompactionManager implements 
CompactionManagerMBean
                 groupMaxDataAge = sstable.maxDataAge;
         }
 
+     
         if (anticompactionGroup.size() == 0)
         {
             logger.info("No valid anticompactions for this group, All sstables 
were compacted and are no longer available");
@@ -1068,7 +1065,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
             
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstableAsSet));
 
-            CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+            CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, 
DatabaseDescriptor.getSSTableFormat());
             Iterator<AbstractCompactedRow> iter = ci.iterator();
             while(iter.hasNext())
             {
@@ -1175,7 +1172,7 @@ public class CompactionManager implements 
CompactionManagerMBean
     {
         public ValidationCompactionIterable(ColumnFamilyStore cfs, 
List<ICompactionScanner> scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION, scanners, new 
ValidationCompactionController(cfs, gcBefore));
+            super(OperationType.VALIDATION, scanners, new 
ValidationCompactionController(cfs, gcBefore), 
DatabaseDescriptor.getSSTableFormat());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index bfb86c9..584ff38 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -31,6 +31,10 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +43,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.UUIDGen;
@@ -150,11 +152,13 @@ public class CompactionTask extends AbstractCompactionTask
             long estimatedTotalKeys = 
Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
             long estimatedSSTables = Math.max(1, 
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
             long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys 
/ estimatedSSTables);
+            SSTableFormat.Type sstableFormat = getFormatType(sstables);
+
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
             try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(actuallyCompact))
             {
-                AbstractCompactionIterable ci = new 
CompactionIterable(compactionType, scanners.scanners, controller);
+                AbstractCompactionIterable ci = new 
CompactionIterable(compactionType, scanners.scanners, controller, 
sstableFormat);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
 
                 // we can't preheat until the tracker has been set. This 
doesn't happen until we tell the cfs to
@@ -177,7 +181,7 @@ public class CompactionTask extends AbstractCompactionTask
                         return;
                     }
 
-                    
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                    
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt, sstableFormat));
                     while (iter.hasNext())
                     {
                         if (ci.isStopRequested())
@@ -189,7 +193,7 @@ public class CompactionTask extends AbstractCompactionTask
                             totalKeysWritten++;
                             if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
                             {
-                                
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                                
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt, sstableFormat));
                             }
                         }
 
@@ -271,14 +275,14 @@ public class CompactionTask extends AbstractCompactionTask
         return minRepairedAt;
     }
 
-    private SSTableWriter createCompactionWriter(File sstableDirectory, long 
keysPerSSTable, long repairedAt)
+    private SSTableWriter createCompactionWriter(File sstableDirectory, long 
keysPerSSTable, long repairedAt, SSTableFormat.Type type)
     {
-        return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
-                                 keysPerSSTable,
-                                 repairedAt,
-                                 cfs.metadata,
-                                 cfs.partitioner,
-                                 new MetadataCollector(sstables, 
cfs.metadata.comparator, getLevel()));
+        return 
SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory),
 type),
+                keysPerSSTable,
+                repairedAt,
+                cfs.metadata,
+                cfs.partitioner,
+                new MetadataCollector(sstables, cfs.metadata.comparator, 
getLevel()));
     }
 
     protected int getLevel()
@@ -312,4 +316,13 @@ public class CompactionTask extends AbstractCompactionTask
         }
         return max;
     }
+
+    public static SSTableFormat.Type getFormatType(Collection<SSTableReader> 
sstables)
+    {
+        if (sstables.isEmpty() || 
!SSTableFormat.enableSSTableDevelopmentTestMode)
+            return DatabaseDescriptor.getSSTableFormat();
+
+        //Allows us to test compaction of non-default formats
+        return sstables.iterator().next().descriptor.formatType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 8c997ed..3f2aed6 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -22,13 +22,13 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
 public class DateTieredCompactionStrategy extends AbstractCompactionStrategy

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java 
b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index cfdbd17..1b28179 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -30,12 +30,13 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
 import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -48,17 +49,17 @@ import org.apache.cassandra.utils.StreamingHistogram;
  */
 public class LazilyCompactedRow extends AbstractCompactedRow
 {
-    private final List<? extends OnDiskAtomIterator> rows;
-    private final CompactionController controller;
-    private final long maxPurgeableTimestamp;
-    private final ColumnFamily emptyColumnFamily;
-    private ColumnStats columnStats;
-    private boolean closed;
-    private ColumnIndex.Builder indexBuilder;
-    private final SecondaryIndexManager.Updater indexer;
-    private final Reducer reducer;
-    private final Iterator<OnDiskAtom> merger;
-    private DeletionTime maxRowTombstone;
+    protected final List<? extends OnDiskAtomIterator> rows;
+    protected final CompactionController controller;
+    protected final long maxPurgeableTimestamp;
+    protected final ColumnFamily emptyColumnFamily;
+    protected ColumnStats columnStats;
+    protected boolean closed;
+    protected ColumnIndex.Builder indexBuilder;
+    protected final SecondaryIndexManager.Updater indexer;
+    protected final Reducer reducer;
+    protected final Iterator<OnDiskAtom> merger;
+    protected DeletionTime maxRowTombstone;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends 
OnDiskAtomIterator> rows)
     {
@@ -99,10 +100,12 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow
         ColumnFamilyStore.removeDeletedColumnsOnly(cf, overriddenGCBefore, 
controller.cfs.indexManager.gcUpdaterFor(key));
     }
 
-    public RowIndexEntry write(long currentPosition, DataOutputPlus out) 
throws IOException
+    public RowIndexEntry write(long currentPosition, SequentialWriter 
dataFile) throws IOException
     {
         assert !closed;
 
+        DataOutputPlus out = dataFile.stream;
+
         ColumnIndex columnsIndex;
         try
         {
@@ -130,7 +133,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         // in case no columns were ever written, we may still need to write an 
empty header with a top-level tombstone
         indexBuilder.maybeWriteEmptyRowHeader();
 
-        out.writeShort(SSTableWriter.END_OF_ROW);
+        out.writeShort(BigTableWriter.END_OF_ROW);
 
         close();
 
@@ -183,7 +186,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         closed = true;
     }
 
-    private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
+    protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, 
OnDiskAtom>
     {
         // all columns reduced together will have the same name, so there will 
only be one column
         // in the container; we just want to leverage the conflict resolution 
code from CF.
@@ -192,9 +195,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow
 
         // tombstone reference; will be reconciled w/ column during 
getReduced.  Note that the top-level (row) tombstone
         // is held by LCR.deletionInfo.
-        RangeTombstone tombstone;
+        public RangeTombstone tombstone;
 
-        int columns = 0;
+        public int columns = 0;
         // if the row tombstone is 'live' we need to set timestamp to 
MAX_VALUE to be able to overwrite it later
         // markedForDeleteAt is MIN_VALUE for 'live' row tombstones (which we 
use to default maxTimestampSeen)
 
@@ -204,10 +207,10 @@ public class LazilyCompactedRow extends 
AbstractCompactedRow
         // we are bound to have either a RangeTombstone or standard cells will 
set this properly:
         ColumnStats.MaxIntTracker maxDeletionTimeTracker = new 
ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 
-        StreamingHistogram tombstones = new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
-        List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
-        boolean hasLegacyCounterShards = false;
+        public StreamingHistogram tombstones = new 
StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+        public List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
+        public List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
+        public boolean hasLegacyCounterShards = false;
 
         public Reducer()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 994e52d..7a1f883 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Doubles;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,6 @@ import 
org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -160,6 +160,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         }
     }
 
+    @Override
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, int gcBefore)
     {
         throw new UnsupportedOperationException("LevelDB compaction strategy 
does not allow user-specified compactions");
@@ -319,7 +320,7 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
             this.range = range;
 
             // add only sstables that intersect our range, and estimate how 
much data that involves
-            this.sstables = new ArrayList<SSTableReader>(sstables.size());
+            this.sstables = new ArrayList<>(sstables.size());
             long length = 0;
             for (SSTableReader sstable : sstables)
             {
@@ -342,10 +343,10 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
 
         public static List<SSTableReader> 
intersecting(Collection<SSTableReader> sstables, Range<Token> range)
         {
-            ArrayList<SSTableReader> filtered = new ArrayList<SSTableReader>();
+            ArrayList<SSTableReader> filtered = new ArrayList<>();
             for (SSTableReader sstable : sstables)
             {
-                Range<Token> sstableRange = new 
Range<Token>(sstable.first.getToken(), sstable.last.getToken(), 
sstable.partitioner);
+                Range<Token> sstableRange = new 
Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
                 if (range == null || sstableRange.intersects(range))
                     filtered.add(sstable);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 2731b6d..9a33b49 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.db.compaction;
 import java.util.Collection;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 public class LeveledCompactionTask extends CompactionTask
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 6d3bf69..74be143 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -574,7 +575,7 @@ public class LeveledManifest
     private static Set<SSTableReader> overlapping(Token start, Token end, 
Iterable<SSTableReader> sstables)
     {
         assert start.compareTo(end) <= 0;
-        Set<SSTableReader> overlapped = new HashSet<SSTableReader>();
+        Set<SSTableReader> overlapped = new HashSet<>();
         Bounds<Token> promotedBounds = new Bounds<Token>(start, end);
         for (SSTableReader candidate : sstables)
         {
@@ -622,8 +623,8 @@ public class LeveledManifest
             // Note that we ignore suspect-ness of L1 sstables here, since if 
an L1 sstable is suspect we're
             // basically screwed, since we expect all or most L0 sstables to 
overlap with each L1 sstable.
             // So if an L1 sstable is suspect we can't do much besides try 
anyway and hope for the best.
-            Set<SSTableReader> candidates = new HashSet<SSTableReader>();
-            Set<SSTableReader> remaining = new HashSet<SSTableReader>();
+            Set<SSTableReader> candidates = new HashSet<>();
+            Set<SSTableReader> remaining = new HashSet<>();
             Iterables.addAll(remaining, Iterables.filter(getLevel(0), 
Predicates.not(suspectP)));
             for (SSTableReader sstable : ageSortedSSTables(remaining))
             {
@@ -699,7 +700,7 @@ public class LeveledManifest
 
     private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> 
candidates)
     {
-        List<SSTableReader> ageSortedCandidates = new 
ArrayList<SSTableReader>(candidates);
+        List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
         Collections.sort(ageSortedCandidates, 
SSTableReader.maxTimestampComparator);
         return ageSortedCandidates;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java 
b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 6b9f161..87c82e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -20,7 +20,8 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 public class SSTableSplitter {
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java 
b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b3d098d..1bea188 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -25,6 +25,8 @@ import com.google.common.base.Throwables;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -46,6 +48,7 @@ public class Scrubber implements Closeable
     private final RandomAccessReader dataFile;
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
+    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 
     private final boolean isOffline;
 
@@ -80,6 +83,8 @@ public class Scrubber implements Closeable
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
 
+        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
         // Calculate the expected compacted filesize
         this.destination = cfs.directories.getDirectoryForNewSSTables();
         if (destination == null)
@@ -113,7 +118,7 @@ public class Scrubber implements Closeable
             ByteBuffer nextIndexKey = 
ByteBufferUtil.readWithShortLength(indexFile);
             {
                 // throw away variable so we don't have a side effect in the 
assert
-                long firstRowPositionFromIndex = 
sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, 
sstable.descriptor.version).position;
+                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserialize(indexFile, 
sstable.descriptor.version).position;
                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
             }
 
@@ -147,7 +152,7 @@ public class Scrubber implements Closeable
                     nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
                     nextRowPositionFromIndex = indexFile.isEOF()
                                              ? dataFile.length()
-                                             : 
sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, 
sstable.descriptor.version).position;
+                                             : 
rowIndexEntrySerializer.deserialize(indexFile, 
sstable.descriptor.version).position;
                 }
                 catch (Throwable th)
                 {
@@ -177,7 +182,7 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size 
" + dataSize));
 
-                    SSTableIdentityIterator atoms = new 
SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    SSTableIdentityIterator atoms = new 
SSTableIdentityIterator(sstable, dataFile, key, true);
                     if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
                         saveOutOfOrderRow(prevKey, key, atoms);
@@ -206,7 +211,7 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator atoms = new 
SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            SSTableIdentityIterator atoms = new 
SSTableIdentityIterator(sstable, dataFile, key, true);
                             if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
                                 saveOutOfOrderRow(prevKey, key, atoms);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 2fa188c..0abb68d 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -24,13 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
 public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -311,7 +312,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
 
     public static List<Pair<SSTableReader, Long>> 
createSSTableAndLengthPairs(Iterable<SSTableReader> sstables)
     {
-        List<Pair<SSTableReader, Long>> sstableLengthPairs = new 
ArrayList<Pair<SSTableReader, Long>>(Iterables.size(sstables));
+        List<Pair<SSTableReader, Long>> sstableLengthPairs = new 
ArrayList<>(Iterables.size(sstables));
         for(SSTableReader sstable : sstables)
             sstableLengthPairs.add(Pair.create(sstable, 
sstable.onDiskLength()));
         return sstableLengthPairs;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java 
b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 2e33ac9..c9e7034 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -22,9 +22,12 @@ import java.util.*;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
@@ -78,7 +81,7 @@ public class Upgrader
             sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         }
 
-        return new SSTableWriter(cfs.getTempSSTablePath(directory), 
estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, 
sstableMetadataCollector);
+        return 
SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)),
 estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, 
sstableMetadataCollector);
     }
 
     public void upgrade()
@@ -88,7 +91,7 @@ public class Upgrader
         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, 
CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, 
true);
         try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(this.toUpgrade))
         {
-            Iterator<AbstractCompactedRow> iter = new 
CompactionIterable(compactionType, scanners.scanners, controller).iterator();
+            Iterator<AbstractCompactedRow> iter = new 
CompactionIterable(compactionType, scanners.scanners, controller, 
DatabaseDescriptor.getSSTableFormat()).iterator();
             
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java 
b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index 5af7458..1df73fe 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -80,12 +80,10 @@ public abstract class AbstractCType implements CType
 
     private final Serializer serializer;
 
-    private final ISerializer<IndexInfo> indexSerializer;
     private final IVersionedSerializer<ColumnSlice> sliceSerializer;
     private final IVersionedSerializer<SliceQueryFilter> 
sliceQueryFilterSerializer;
     private final DeletionInfo.Serializer deletionInfoSerializer;
     private final RangeTombstone.Serializer rangeTombstoneSerializer;
-    private final RowIndexEntry.Serializer rowIndexEntrySerializer;
 
     protected final boolean isByteOrderComparable;
 
@@ -115,12 +113,10 @@ public abstract class AbstractCType implements CType
 
         serializer = new Serializer(this);
 
-        indexSerializer = new IndexInfo.Serializer(this);
         sliceSerializer = new ColumnSlice.Serializer(this);
         sliceQueryFilterSerializer = new SliceQueryFilter.Serializer(this);
         deletionInfoSerializer = new DeletionInfo.Serializer(this);
         rangeTombstoneSerializer = new RangeTombstone.Serializer(this);
-        rowIndexEntrySerializer = new RowIndexEntry.Serializer(this);
         this.isByteOrderComparable = isByteOrderComparable;
     }
 
@@ -295,11 +291,6 @@ public abstract class AbstractCType implements CType
         return indexReverseComparator;
     }
 
-    public ISerializer<IndexInfo> indexSerializer()
-    {
-        return indexSerializer;
-    }
-
     public IVersionedSerializer<ColumnSlice> sliceSerializer()
     {
         return sliceSerializer;
@@ -320,11 +311,6 @@ public abstract class AbstractCType implements CType
         return rangeTombstoneSerializer;
     }
 
-    public RowIndexEntry.Serializer rowIndexEntrySerializer()
-    {
-        return rowIndexEntrySerializer;
-    }
-
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/composites/CType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CType.java 
b/src/java/org/apache/cassandra/db/composites/CType.java
index d24277e..3844342 100644
--- a/src/java/org/apache/cassandra/db/composites/CType.java
+++ b/src/java/org/apache/cassandra/db/composites/CType.java
@@ -130,12 +130,10 @@ public interface CType extends Comparator<Composite>
 
     public Serializer serializer();
 
-    public ISerializer<IndexInfo> indexSerializer();
     public IVersionedSerializer<ColumnSlice> sliceSerializer();
     public IVersionedSerializer<SliceQueryFilter> sliceQueryFilterSerializer();
     public DeletionInfo.Serializer deletionInfoSerializer();
     public RangeTombstone.Serializer rangeTombstoneSerializer();
-    public RowIndexEntry.Serializer rowIndexEntrySerializer();
 
     public interface Serializer extends ISerializer<Composite>
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java 
b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 3750c75..16be34c 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 0a34dfd..b5515bc 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -24,20 +24,19 @@ import java.util.Iterator;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.columniterator.SSTableNamesIterator;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.SearchIterator;
@@ -87,12 +86,12 @@ public class NamesQueryFilter implements IDiskAtomFilter
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, 
DecoratedKey key)
     {
-        return new SSTableNamesIterator(sstable, key, columns);
+        return sstable.iterator(key, columns);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, 
FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        return new SSTableNamesIterator(sstable, file, key, columns, 
indexEntry);
+        return sstable.iterator(file, key, columns, indexEntry);
     }
 
     public void collectReducedColumns(ColumnFamily container, Iterator<Cell> 
reducedColumns, int gcBefore, long now)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index f58fa9f..1914970 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -33,7 +33,7 @@ import 
org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.MergeIterator;
 
 public class QueryFilter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java 
b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 71d9095..0887cbe 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -24,19 +24,18 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.tracing.Tracing;
@@ -176,12 +175,12 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, 
DecoratedKey key)
     {
-        return new SSTableSliceIterator(sstable, key, slices, reversed);
+        return sstable.iterator(key, slices, reversed);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, 
FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        return new SSTableSliceIterator(sstable, file, key, slices, reversed, 
indexEntry);
+        return sstable.iterator(file, key, slices, reversed, indexEntry);
     }
 
     public Comparator<Cell> getColumnComparator(CellNameType comparator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 529e82c..c50019a 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +50,6 @@ import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.dht.LocalToken;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index c2d481b..5640d23 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,6 @@ import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/dht/BytesToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BytesToken.java 
b/src/java/org/apache/cassandra/dht/BytesToken.java
index f2f9eab..da965d2 100644
--- a/src/java/org/apache/cassandra/dht/BytesToken.java
+++ b/src/java/org/apache/cassandra/dht/BytesToken.java
@@ -49,7 +49,6 @@ public class BytesToken extends Token<byte[]>
         return FBUtilities.compareUnsigned(token, o.token, 0, 0, token.length, 
o.token.length);
     }
 
-
     @Override
     public int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/ISSTableSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ISSTableSerializer.java 
b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
index 20ee352..2834662 100644
--- a/src/java/org/apache/cassandra/io/ISSTableSerializer.java
+++ b/src/java/org/apache/cassandra/io/ISSTableSerializer.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 public interface ISSTableSerializer<T>
@@ -43,5 +44,5 @@ public interface ISSTableSerializer<T>
      * @throws IOException
      * @return the type that was deserialized
      */
-    public T deserializeFromSSTable(DataInput in, Descriptor.Version version) 
throws IOException;
+    public T deserializeFromSSTable(DataInput in, Version version) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 3d6cb71..75fdf1b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
 import org.apache.cassandra.cache.RefCountedMemory;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
@@ -47,10 +48,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.utils.Pair;
+import org.hsqldb.Database;
 
 /**
  * Holds metadata about compressed file
@@ -79,7 +82,7 @@ public class CompressionMetadata
     public static CompressionMetadata create(String dataFilePath)
     {
         Descriptor desc = Descriptor.fromFilename(dataFilePath);
-        return new 
CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new 
File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums);
+        return new 
CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new 
File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums());
     }
 
     @VisibleForTesting
@@ -262,6 +265,8 @@ public class CompressionMetadata
         private int maxCount = 100;
         private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
         private int count = 0;
+        private Version latestVersion =  
DatabaseDescriptor.getSSTableFormat().info.getLatestVersion();
+
 
         private Writer(CompressionParameters parameters, String path)
         {
@@ -311,14 +316,14 @@ public class CompressionMetadata
 
         public CompressionMetadata openEarly(long dataLength, long 
compressedLength)
         {
-            return new CompressionMetadata(filePath, parameters, offsets, 
count * 8L, dataLength, compressedLength, 
Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+            return new CompressionMetadata(filePath, parameters, offsets, 
count * 8L, dataLength, compressedLength, 
latestVersion.hasPostCompressionAdlerChecksums());
         }
 
         public CompressionMetadata openAfterClose(long dataLength, long 
compressedLength)
         {
             RefCountedMemory newOffsets = offsets.copy(count * 8L);
             offsets.unreference();
-            return new CompressionMetadata(filePath, parameters, newOffsets, 
count * 8L, dataLength, compressedLength, 
Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+            return new CompressionMetadata(filePath, parameters, newOffsets, 
count * 8L, dataLength, compressedLength, 
latestVersion.hasPostCompressionAdlerChecksums());
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index ad8fb3e..11d6d5e 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,7 +30,8 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
@@ -43,6 +44,8 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
     protected ColumnFamily columnFamily;
     protected ByteBuffer currentSuperColumn;
     protected final CounterId counterid = CounterId.generate();
+    private SSTableFormat.Type formatType = 
DatabaseDescriptor.getSSTableFormat();
+
 
     public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner)
     {
@@ -51,19 +54,18 @@ public abstract class AbstractSSTableSimpleWriter 
implements Closeable
         DatabaseDescriptor.setPartitioner(partitioner);
     }
 
+    protected void setSSTableFormatType(SSTableFormat.Type type)
+    {
+        this.formatType = type;
+    }
+
     protected SSTableWriter getWriter()
     {
-        return new SSTableWriter(
-            makeFilename(directory, metadata.ksName, metadata.cfName),
-            0, // We don't care about the bloom filter
-            ActiveRepairService.UNREPAIRED_SSTABLE,
-            metadata,
-            DatabaseDescriptor.getPartitioner(),
-            new MetadataCollector(metadata.comparator));
+        return 
SSTableWriter.create(Descriptor.fromFilename(makeFilename(directory, 
metadata.ksName, metadata.cfName, formatType)), 0, 
ActiveRepairService.UNREPAIRED_SSTABLE);
     }
 
     // find available generation and pick up filename from that
-    protected static String makeFilename(File directory, final String 
keyspace, final String columnFamily)
+    protected static String makeFilename(File directory, final String 
keyspace, final String columnFamily, final SSTableFormat.Type fmt)
     {
         final Set<Descriptor> existing = new HashSet<Descriptor>();
         directory.list(new FilenameFilter()
@@ -84,7 +86,7 @@ public abstract class AbstractSSTableSimpleWriter implements 
Closeable
         int maxGen = 0;
         for (Descriptor desc : existing)
             maxGen = Math.max(maxGen, desc.generation);
-        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, 
Descriptor.Type.TEMP).filenameFor(Component.DATA);
+        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, 
Descriptor.Type.TEMP, fmt).filenameFor(Component.DATA);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index bf4da24..f7d467e 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Pair;
@@ -272,6 +273,8 @@ public class CQLSSTableWriter implements Closeable
         private File directory;
         private IPartitioner partitioner = new Murmur3Partitioner();
 
+        protected SSTableFormat.Type formatType = null;
+
         private CFMetaData schema;
         private UpdateStatement insert;
         private List<ColumnSpecification> boundNames;
@@ -279,7 +282,7 @@ public class CQLSSTableWriter implements Closeable
         private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
-        private Builder() {}
+        protected Builder() {}
 
         /**
          * The directory where to write the sstables.
@@ -484,6 +487,10 @@ public class CQLSSTableWriter implements Closeable
             AbstractSSTableSimpleWriter writer = sorted
                                                ? new 
SSTableSimpleWriter(directory, schema, partitioner)
                                                : new BufferedWriter(directory, 
schema, partitioner, bufferSizeInMB);
+
+            if (formatType != null)
+                writer.setSSTableFormatType(formatType);
+
             return new CQLSSTableWriter(writer, insert, boundNames);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 6911b48..03522c3 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -22,13 +22,18 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.StringTokenizer;
 
+import com.google.common.base.CharMatcher;
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang.StringUtils;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
 
@@ -41,92 +46,6 @@ import static 
org.apache.cassandra.io.sstable.Component.separator;
  */
 public class Descriptor
 {
-    // versions are denoted as [major][minor].  Minor versions must be 
forward-compatible:
-    // new fields are allowed in e.g. the metadata component, but fields can't 
be removed
-    // or have their size changed.
-    //
-    // Minor versions were introduced with version "hb" for Cassandra 1.0.3; 
prior to that,
-    // we always incremented the major version.
-    public static class Version
-    {
-        // This needs to be at the begining for initialization sake
-        public static final String current_version = "la";
-
-        // ja (2.0.0): super columns are serialized as composites (note that 
there is no real format change,
-        //               this is mostly a marker to know if we should expect 
super columns or not. We do need
-        //               a major version bump however, because we should not 
allow streaming of super columns
-        //               into this new format)
-        //             tracks max local deletiontime in sstable metadata
-        //             records bloom_filter_fp_chance in metadata component
-        //             remove data size and column count from data file 
(CASSANDRA-4180)
-        //             tracks max/min column values (according to comparator)
-        // jb (2.0.1): switch from crc32 to adler32 for compression checksums
-        //             checksum the compressed data
-        // ka (2.1.0): new Statistics.db file format
-        //             index summaries can be downsampled and the sampling 
level is persisted
-        //             switch uncompressed checksums to adler32
-        //             tracks presense of legacy (local and remote) counter 
shards
-        // la (3.0.0): new file name format
-
-        public static final Version CURRENT = new Version(current_version);
-
-        private final String version;
-
-        public final boolean isLatestVersion;
-        public final boolean hasPostCompressionAdlerChecksums;
-        public final boolean hasSamplingLevel;
-        public final boolean newStatsFile;
-        public final boolean hasAllAdlerChecksums;
-        public final boolean hasRepairedAt;
-        public final boolean tracksLegacyCounterShards;
-        public final boolean newFileName;
-
-        public Version(String version)
-        {
-            this.version = version;
-            isLatestVersion = version.compareTo(current_version) == 0;
-            hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
-            hasSamplingLevel = version.compareTo("ka") >= 0;
-            newStatsFile = version.compareTo("ka") >= 0;
-            hasAllAdlerChecksums = version.compareTo("ka") >= 0;
-            hasRepairedAt = version.compareTo("ka") >= 0;
-            tracksLegacyCounterShards = version.compareTo("ka") >= 0;
-            newFileName = version.compareTo("la") >= 0;
-        }
-
-        /**
-         * @param ver SSTable version
-         * @return True if the given version string matches the format.
-         * @see #version
-         */
-        static boolean validate(String ver)
-        {
-            return ver != null && ver.matches("[a-z]+");
-        }
-
-        public boolean isCompatible()
-        {
-            return version.compareTo("ja") >= 0 && version.charAt(0) <= 
CURRENT.version.charAt(0);
-        }
-
-        @Override
-        public String toString()
-        {
-            return version;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            return o == this || o instanceof Version && 
version.equals(((Version) o).version);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return version.hashCode();
-        }
-    }
 
     public static enum Type
     {
@@ -140,6 +59,7 @@ public class Descriptor
         }
     }
 
+
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
     public final Version version;
@@ -147,6 +67,7 @@ public class Descriptor
     public final String cfname;
     public final int generation;
     public final Type type;
+    public final SSTableFormat.Type formatType;
     private final int hashCode;
 
     /**
@@ -154,29 +75,41 @@ public class Descriptor
      */
     public Descriptor(File directory, String ksname, String cfname, int 
generation, Type temp)
     {
-        this(Version.CURRENT, directory, ksname, cfname, generation, temp);
+        this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), 
directory, ksname, cfname, generation, temp, 
DatabaseDescriptor.getSSTableFormat());
     }
 
-    public Descriptor(String version, File directory, String ksname, String 
cfname, int generation, Type temp)
+    public Descriptor(File directory, String ksname, String cfname, int 
generation, Type temp, SSTableFormat.Type formatType)
     {
-        this(new Version(version), directory, ksname, cfname, generation, 
temp);
+        this(formatType.info.getLatestVersion(), directory, ksname, cfname, 
generation, temp, formatType);
     }
 
-    public Descriptor(Version version, File directory, String ksname, String 
cfname, int generation, Type temp)
+    public Descriptor(String version, File directory, String ksname, String 
cfname, int generation, Type temp, SSTableFormat.Type formatType)
     {
-        assert version != null && directory != null && ksname != null && 
cfname != null;
+        this(formatType.info.getVersion(version), directory, ksname, cfname, 
generation, temp, formatType);
+    }
+
+    public Descriptor(Version version, File directory, String ksname, String 
cfname, int generation, Type temp, SSTableFormat.Type formatType)
+    {
+        assert version != null && directory != null && ksname != null && 
cfname != null && 
formatType.info.getLatestVersion().getClass().equals(version.getClass());
         this.version = version;
         this.directory = directory;
         this.ksname = ksname;
         this.cfname = cfname;
         this.generation = generation;
-        type = temp;
-        hashCode = Objects.hashCode(directory, generation, ksname, cfname, 
temp);
+        this.type = temp;
+        this.formatType = formatType;
+
+        hashCode = Objects.hashCode(version, directory, generation, ksname, 
cfname, temp, formatType);
     }
 
     public Descriptor withGeneration(int newGeneration)
     {
-        return new Descriptor(version, directory, ksname, cfname, 
newGeneration, type);
+        return new Descriptor(version, directory, ksname, cfname, 
newGeneration, type, formatType);
+    }
+
+    public Descriptor withFormatType(SSTableFormat.Type newType)
+    {
+        return new Descriptor(newType.info.getLatestVersion(), directory, 
ksname, cfname, generation, type, newType);
     }
 
     public String filenameFor(Component component)
@@ -194,7 +127,7 @@ public class Descriptor
 
     private void appendFileName(StringBuilder buff)
     {
-        if (!version.newFileName)
+        if (!version.hasNewFileName())
         {
             buff.append(ksname).append(separator);
             buff.append(cfname).append(separator);
@@ -203,6 +136,8 @@ public class Descriptor
             buff.append(type.marker).append(separator);
         buff.append(version).append(separator);
         buff.append(generation);
+        if (formatType != SSTableFormat.Type.LEGACY)
+            buff.append(separator).append(formatType.name);
     }
 
     public String relativeFilenameFor(Component component)
@@ -213,6 +148,11 @@ public class Descriptor
         return buff.toString();
     }
 
+    public SSTableFormat getFormat()
+    {
+        return formatType.info;
+    }
+
     /**
      * @param suffix A component suffix, such as 'Data.db'/'Index.db'/etc
      * @return A filename for this descriptor with the given suffix.
@@ -233,6 +173,11 @@ public class Descriptor
         return fromFilename(file.getParentFile(), file.getName(), false).left;
     }
 
+    public static Descriptor fromFilename(String filename, SSTableFormat.Type 
formatType)
+    {
+        return fromFilename(filename).withFormatType(formatType);
+    }
+
     public static Descriptor fromFilename(String filename, boolean 
skipComponent)
     {
         File file = new File(filename);
@@ -249,7 +194,7 @@ public class Descriptor
      *
      * <ul>
      *     
<li>&lt;ksname&gt;-&lt;cfname&gt;-(tmp-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt;
 for cassandra 2.0 and before</li>
-     *     <li>(&lt;tmp 
marker&gt;-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 2.1 
and later</li>
+     *     <li>(&lt;tmp 
marker&gt;-)?&lt;version&gt;-&lt;gen&gt;-&lt;component&gt; for cassandra 3.0 
and later</li>
      * </ul>
      *
      * If this is for SSTable of secondary index, directory should ends with 
index name for 2.1+.
@@ -278,32 +223,42 @@ public class Descriptor
         // component suffix
         String component = skipComponent ? null : tokenStack.pop();
 
+        nexttok = tokenStack.pop();
+        // generation OR Type
+        SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
+        if (!CharMatcher.DIGIT.matchesAllOf(nexttok))
+        {
+            fmt = SSTableFormat.Type.validate(nexttok);
+            nexttok = tokenStack.pop();
+        }
+
         // generation
-        int generation = Integer.parseInt(tokenStack.pop());
+        int generation = Integer.parseInt(nexttok);
 
         // version
         nexttok = tokenStack.pop();
-        if (!Version.validate(nexttok))
+        Version version = fmt.info.getVersion(nexttok);
+
+        if (!version.validate(nexttok))
             throw new UnsupportedOperationException("SSTable " + name + " is 
too old to open.  Upgrade to 2.0 first, and run upgradesstables");
-        Version version = new Version(nexttok);
 
         // optional temporary marker
-        Type type = Type.FINAL;
+        Type type = Descriptor.Type.FINAL;
         nexttok = tokenStack.peek();
-        if (Type.TEMP.marker.equals(nexttok))
+        if (Descriptor.Type.TEMP.marker.equals(nexttok))
         {
-            type = Type.TEMP;
+            type = Descriptor.Type.TEMP;
             tokenStack.pop();
         }
-        else if (Type.TEMPLINK.marker.equals(nexttok))
+        else if (Descriptor.Type.TEMPLINK.marker.equals(nexttok))
         {
-            type = Type.TEMPLINK;
+            type = Descriptor.Type.TEMPLINK;
             tokenStack.pop();
         }
 
         // ks/cf names
         String ksname, cfname;
-        if (version.newFileName)
+        if (version.hasNewFileName())
         {
             // for 2.1+ read ks and cf names from directory
             File cfDirectory = parentDirectory;
@@ -332,7 +287,7 @@ public class Descriptor
         }
         assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + 
directory;
 
-        return Pair.create(new Descriptor(version, parentDirectory, ksname, 
cfname, generation, type), component);
+        return Pair.create(new Descriptor(version, parentDirectory, ksname, 
cfname, generation, type, fmt), component);
     }
 
     /**
@@ -341,12 +296,12 @@ public class Descriptor
      */
     public Descriptor asType(Type type)
     {
-        return new Descriptor(version, directory, ksname, cfname, generation, 
type);
+        return new Descriptor(version, directory, ksname, cfname, generation, 
type, formatType);
     }
 
     public IMetadataSerializer getMetadataSerializer()
     {
-        if (version.newStatsFile)
+        if (version.hasNewStatsFile())
             return new MetadataSerializer();
         else
             return new LegacyMetadataSerializer();
@@ -378,6 +333,7 @@ public class Descriptor
                        && that.generation == this.generation
                        && that.ksname.equals(this.ksname)
                        && that.cfname.equals(this.cfname)
+                       && that.formatType == this.formatType
                        && that.type == this.type;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java 
b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index b0bbfc4..4518e23 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -67,32 +67,6 @@ public class IndexHelper
     }
 
     /**
-     * Deserialize the index into a structure and return it
-     *
-     * @param in input source
-     * @param type the comparator type for the column family
-     *
-     * @return ArrayList<IndexInfo> - list of de-serialized indexes
-     * @throws IOException if an I/O error occurs.
-     */
-    public static List<IndexInfo> deserializeIndex(FileDataInput in, CType 
type) throws IOException
-    {
-        int columnIndexSize = in.readInt();
-        if (columnIndexSize == 0)
-            return Collections.<IndexInfo>emptyList();
-        ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
-        FileMark mark = in.mark();
-        ISerializer<IndexInfo> serializer = type.indexSerializer();
-        while (in.bytesPastMark(mark) < columnIndexSize)
-        {
-            indexList.add(serializer.deserialize(in));
-        }
-        assert in.bytesPastMark(mark) == columnIndexSize;
-
-        return indexList;
-    }
-
-    /**
      * The index of the IndexInfo in which a scan starting with @name should 
begin.
      *
      * @param name

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java 
b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index e39d75d..468b54c 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index 6cb8653..a1fda57 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index dee024a..50b9545 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -208,7 +208,7 @@ public abstract class SSTable
     }
 
     /** @return An estimate of the number of keys contained in the given index 
file. */
-    long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
+    protected long estimateRowsFromIndex(RandomAccessReader ifile) throws 
IOException
     {
         // collect sizes for the first 10000 keys, or first 10 megabytes of 
data
         final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, 
ifile.length());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index 785e23b..bb84e4c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index b784a7e..6300749 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -21,16 +21,19 @@ import java.io.*;
 import java.util.Iterator;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.serializers.MarshalException;
 
-public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
+    public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
 {
     private final DecoratedKey key;
     private final DataInput in;
-    public final long dataSize; // we [still] require this so compaction can 
tell if it's safe to read the row into memory
     public final ColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
@@ -43,11 +46,10 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataSize length of row data
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key, long dataSize)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key)
     {
-        this(sstable, file, key, dataSize, false);
+        this(sstable, file, key, false);
     }
 
     /**
@@ -55,21 +57,19 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
      * @param sstable SSTable we are reading ffrom.
      * @param file Reading using this file.
      * @param key Key of this row.
-     * @param dataSize length of row data
      * @param checkData if true, do its best to deserialize and check the 
coherence of row data
      */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key, long dataSize, boolean checkData)
+    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key, boolean checkData)
     {
-        this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, 
sstable, ColumnSerializer.Flag.LOCAL);
+        this(sstable.metadata, file, file.getPath(), key, checkData, sstable, 
ColumnSerializer.Flag.LOCAL);
     }
 
     // sstable may be null *if* checkData is false
     // If it is null, we assume the data is in the current file format
     private SSTableIdentityIterator(CFMetaData metadata,
-                                    DataInput in,
+                                    FileDataInput in,
                                     String filename,
                                     DecoratedKey key,
-                                    long dataSize,
                                     boolean checkData,
                                     SSTableReader sstable,
                                     ColumnSerializer.Flag flag)
@@ -78,11 +78,10 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         this.in = in;
         this.filename = filename;
         this.key = key;
-        this.dataSize = dataSize;
         this.flag = flag;
         this.validateColumns = checkData;
 
-        Descriptor.Version dataVersion = sstable == null ? 
Descriptor.Version.CURRENT : sstable.descriptor.version;
+        Version dataVersion = sstable == null ? 
DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : 
sstable.descriptor.version;
         int expireBefore = (int) (System.currentTimeMillis() / 1000);
         columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 6cb7f03..2a1b66f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.FBUtilities;

Reply via email to