http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
deleted file mode 100644
index 62ac175..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
-import org.apache.cassandra.db.columniterator.LazyColumnIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class SSTableScanner implements ICompactionScanner
-{
-    protected final RandomAccessReader dfile;
-    protected final RandomAccessReader ifile;
-    public final SSTableReader sstable;
-
-    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
-    private AbstractBounds<RowPosition> currentRange;
-
-    private final DataRange dataRange;
-
-    protected Iterator<OnDiskAtomIterator> iterator;
-
-    /**
-     * @param sstable SSTable to scan; must not be null
-     * @param dataRange a single range to scan; must not be null
-     * @param limiter background i/o RateLimiter; may be null
-     */
-    SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter 
limiter)
-    {
-        assert sstable != null;
-
-        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
-        this.ifile = sstable.openIndexReader();
-        this.sstable = sstable;
-        this.dataRange = dataRange;
-
-        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
-        if (dataRange.isWrapAround() && 
!dataRange.stopKey().isMinimum(sstable.partitioner))
-        {
-            // split the wrapping range into two parts: 1) the part that 
starts at the beginning of the sstable, and
-            // 2) the part that comes before the wrap-around
-            boundsList.add(new 
Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), 
dataRange.stopKey(), sstable.partitioner));
-            boundsList.add(new Bounds<>(dataRange.startKey(), 
sstable.partitioner.getMinimumToken().maxKeyBound(), sstable.partitioner));
-        }
-        else
-        {
-            boundsList.add(new Bounds<>(dataRange.startKey(), 
dataRange.stopKey(), sstable.partitioner));
-        }
-        this.rangeIterator = boundsList.iterator();
-    }
-
-    /**
-     * @param sstable SSTable to scan; must not be null
-     * @param tokenRanges A set of token ranges to scan
-     * @param limiter background i/o RateLimiter; may be null
-     */
-    SSTableScanner(SSTableReader sstable, Collection<Range<Token>> 
tokenRanges, RateLimiter limiter)
-    {
-        assert sstable != null;
-
-        this.dfile = limiter == null ? sstable.openDataReader() : 
sstable.openDataReader(limiter);
-        this.ifile = sstable.openIndexReader();
-        this.sstable = sstable;
-        this.dataRange = null;
-
-        List<Range<Token>> normalized = Range.normalize(tokenRanges);
-        List<AbstractBounds<RowPosition>> boundsList = new 
ArrayList<>(normalized.size());
-        for (Range<Token> range : normalized)
-            boundsList.add(new 
Range<RowPosition>(range.left.maxKeyBound(sstable.partitioner),
-                                                  
range.right.maxKeyBound(sstable.partitioner),
-                                                  sstable.partitioner));
-
-        this.rangeIterator = boundsList.iterator();
-    }
-
-    private void seekToCurrentRangeStart()
-    {
-        if (currentRange.left.isMinimum(sstable.partitioner))
-            return;
-
-        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
-        // -1 means the key is before everything in the sstable. So just start 
from the beginning.
-        if (indexPosition == -1)
-        {
-            // Note: this method shouldn't assume we're at the start of the 
sstable already (see #6638) and
-            // the seeks are no-op anyway if we are.
-            ifile.seek(0);
-            dfile.seek(0);
-            return;
-        }
-
-        ifile.seek(indexPosition);
-        try
-        {
-
-            while (!ifile.isEOF())
-            {
-                indexPosition = ifile.getFilePointer();
-                DecoratedKey indexDecoratedKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = 
indexDecoratedKey.compareTo(currentRange.left);
-                // because our range start may be inclusive or exclusive, we 
need to also contains()
-                // instead of just checking (comparison >= 0)
-                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
-                {
-                    // Found, just read the dataPosition and seek into index 
and data files
-                    long dataPosition = ifile.readLong();
-                    ifile.seek(indexPosition);
-                    dfile.seek(dataPosition);
-                    break;
-                }
-                else
-                {
-                    RowIndexEntry.Serializer.skip(ifile);
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new CorruptSSTableException(e, sstable.getFilename());
-        }
-    }
-
-    public void close() throws IOException
-    {
-        FileUtils.close(dfile, ifile);
-    }
-
-    public long getLengthInBytes()
-    {
-        return dfile.length();
-    }
-
-    public long getCurrentPosition()
-    {
-        return dfile.getFilePointer();
-    }
-
-    public String getBackingFiles()
-    {
-        return sstable.toString();
-    }
-
-    public boolean hasNext()
-    {
-        if (iterator == null)
-            iterator = createIterator();
-        return iterator.hasNext();
-    }
-
-    public OnDiskAtomIterator next()
-    {
-        if (iterator == null)
-            iterator = createIterator();
-        return iterator.next();
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    private Iterator<OnDiskAtomIterator> createIterator()
-    {
-        return new KeyScanningIterator();
-    }
-
-    protected class KeyScanningIterator extends 
AbstractIterator<OnDiskAtomIterator>
-    {
-        private DecoratedKey nextKey;
-        private RowIndexEntry nextEntry;
-        private DecoratedKey currentKey;
-        private RowIndexEntry currentEntry;
-
-        protected OnDiskAtomIterator computeNext()
-        {
-            try
-            {
-                if (nextEntry == null)
-                {
-                    do
-                    {
-                        // we're starting the first range or we just passed 
the end of the previous range
-                        if (!rangeIterator.hasNext())
-                            return endOfData();
-
-                        currentRange = rangeIterator.next();
-                        seekToCurrentRangeStart();
-
-                        if (ifile.isEOF())
-                            return endOfData();
-
-                        currentKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                        currentEntry = 
sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(ifile, 
sstable.descriptor.version);
-                    } while (!currentRange.contains(currentKey));
-                }
-                else
-                {
-                    // we're in the middle of a range
-                    currentKey = nextKey;
-                    currentEntry = nextEntry;
-                }
-
-                long readEnd;
-                if (ifile.isEOF())
-                {
-                    nextEntry = null;
-                    nextKey = null;
-                    readEnd = dfile.length();
-                }
-                else
-                {
-                    // we need the position of the start of the next key, 
regardless of whether it falls in the current range
-                    nextKey = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                    nextEntry = 
sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(ifile, 
sstable.descriptor.version);
-                    readEnd = nextEntry.position;
-
-                    if (!currentRange.contains(nextKey))
-                    {
-                        nextKey = null;
-                        nextEntry = null;
-                    }
-                }
-
-                if (dataRange == null || 
dataRange.selectsFullRowFor(currentKey.getKey()))
-                {
-                    dfile.seek(currentEntry.position);
-                    ByteBufferUtil.readWithShortLength(dfile); // key
-                    long dataSize = readEnd - dfile.getFilePointer();
-                    return new SSTableIdentityIterator(sstable, dfile, 
currentKey, dataSize);
-                }
-
-                return new LazyColumnIterator(currentKey, new 
IColumnIteratorFactory()
-                {
-                    public OnDiskAtomIterator create()
-                    {
-                        return 
dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, 
dfile, currentKey, currentEntry);
-                    }
-                });
-
-            }
-            catch (IOException e)
-            {
-                sstable.markSuspect();
-                throw new CorruptSSTableException(e, sstable.getFilename());
-            }
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return getClass().getSimpleName() + "(" +
-               "dfile=" + dfile +
-               " ifile=" + ifile +
-               " sstable=" + sstable +
-               ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 3cfdc7b..bc9f2ca 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.SynchronousQueue;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
@@ -35,6 +36,8 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -96,8 +99,8 @@ public class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     public SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner, long bufferSizeInMB)
     {
         super(directory, metadata, partitioner);
-        this.bufferSize = bufferSizeInMB * 1024L * 1024L;
-        this.diskWriter.start();
+        bufferSize = bufferSizeInMB * 1024L * 1024L;
+        diskWriter.start();
     }
 
     protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) 
throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 87c8e33..3417d68 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
deleted file mode 100644
index 4da967e..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ /dev/null
@@ -1,641 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnIndex;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.CounterCell;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
-import org.apache.cassandra.io.sstable.metadata.MetadataType;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.io.util.SequentialWriter;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FilterFactory;
-import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.StreamingHistogram;
-
-public class SSTableWriter extends SSTable
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(SSTableWriter.class);
-
-    // not very random, but the only value that can't be mistaken for a legal 
column-name length
-    public static final int END_OF_ROW = 0x0000;
-
-    private IndexWriter iwriter;
-    private SegmentedFile.Builder dbuilder;
-    private final SequentialWriter dataFile;
-    private DecoratedKey lastWrittenKey;
-    private FileMark dataMark;
-    private final MetadataCollector sstableMetadataCollector;
-    private final long repairedAt;
-
-    public SSTableWriter(String filename, long keyCount, long repairedAt, int 
sstableLevel)
-    {
-        this(filename,
-             keyCount,
-             repairedAt,
-             Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
-             StorageService.getPartitioner(),
-             new 
MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator).sstableLevel(sstableLevel));
-    }
-
-    public SSTableWriter(String filename, long keyCount)
-    {
-        this(filename, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
-    }
-
-    private static Set<Component> components(CFMetaData metadata)
-    {
-        Set<Component> components = new 
HashSet<Component>(Arrays.asList(Component.DATA,
-                                                                         
Component.PRIMARY_INDEX,
-                                                                         
Component.STATS,
-                                                                         
Component.SUMMARY,
-                                                                         
Component.TOC,
-                                                                         
Component.DIGEST));
-
-        if (metadata.getBloomFilterFpChance() < 1.0)
-            components.add(Component.FILTER);
-
-        if (metadata.compressionParameters().sstableCompressor != null)
-        {
-            components.add(Component.COMPRESSION_INFO);
-        }
-        else
-        {
-            // it would feel safer to actually add this component later in 
maybeWriteDigest(),
-            // but the components are unmodifiable after construction
-            components.add(Component.CRC);
-        }
-        return components;
-    }
-
-    public SSTableWriter(String filename,
-                         long keyCount,
-                         long repairedAt,
-                         CFMetaData metadata,
-                         IPartitioner<?> partitioner,
-                         MetadataCollector sstableMetadataCollector)
-    {
-        super(Descriptor.fromFilename(filename),
-              components(metadata),
-              metadata,
-              partitioner);
-        this.repairedAt = repairedAt;
-        iwriter = new IndexWriter(keyCount);
-
-        if (compression)
-        {
-            dataFile = SequentialWriter.open(getFilename(),
-                                             
descriptor.filenameFor(Component.COMPRESSION_INFO),
-                                             metadata.compressionParameters(),
-                                             sstableMetadataCollector);
-            dbuilder = 
SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
-        }
-        else
-        {
-            dataFile = SequentialWriter.open(new File(getFilename()), new 
File(descriptor.filenameFor(Component.CRC)));
-            dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-        }
-
-        this.sstableMetadataCollector = sstableMetadataCollector;
-    }
-
-    public void mark()
-    {
-        dataMark = dataFile.mark();
-        iwriter.mark();
-    }
-
-    public void resetAndTruncate()
-    {
-        dataFile.resetAndTruncate(dataMark);
-        iwriter.resetAndTruncate();
-    }
-
-    /**
-     * Perform sanity checks on @param decoratedKey and @return the position 
in the data file before any data is written
-     */
-    private long beforeAppend(DecoratedKey decoratedKey)
-    {
-        assert decoratedKey != null : "Keys must not be null"; // empty keys 
ARE allowed b/c of indexed column values
-        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
-            throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
-        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
-    }
-
-    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, 
RowIndexEntry index)
-    {
-        sstableMetadataCollector.addKey(decoratedKey.getKey());
-        lastWrittenKey = decoratedKey;
-        last = lastWrittenKey;
-        if (first == null)
-            first = lastWrittenKey;
-
-        if (logger.isTraceEnabled())
-            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
-        iwriter.append(decoratedKey, index);
-        dbuilder.addPotentialBoundary(dataPosition);
-    }
-
-    /**
-     * @param row
-     * @return null if the row was compacted away entirely; otherwise, the PK 
index entry for this row
-     */
-    public RowIndexEntry append(AbstractCompactedRow row)
-    {
-        long currentPosition = beforeAppend(row.key);
-        RowIndexEntry entry;
-        try
-        {
-            entry = row.write(currentPosition, dataFile.stream);
-            if (entry == null)
-                return null;
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
-        sstableMetadataCollector.update(dataFile.getFilePointer() - 
currentPosition, row.columnStats());
-        afterAppend(row.key, currentPosition, entry);
-        return entry;
-    }
-
-    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
-    {
-        long startPosition = beforeAppend(decoratedKey);
-        try
-        {
-            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, 
dataFile.stream);
-            afterAppend(decoratedKey, startPosition, entry);
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
-        sstableMetadataCollector.update(dataFile.getFilePointer() - 
startPosition, cf.getColumnStats());
-    }
-
-    public static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, 
DecoratedKey key, DataOutputPlus out) throws IOException
-    {
-        assert cf.hasColumns() || cf.isMarkedForDelete();
-
-        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, 
key.getKey(), out);
-        ColumnIndex index = builder.build(cf);
-
-        out.writeShort(END_OF_ROW);
-        return RowIndexEntry.create(startPosition, 
cf.deletionInfo().getTopLevelDeletion(), index);
-    }
-
-    /**
-     * @throws IOException if a read from the DataInput fails
-     * @throws FSWriteError if a write to the dataFile fails
-     */
-    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in, Descriptor.Version version) throws IOException
-    {
-        long currentPosition = beforeAppend(key);
-
-        ColumnStats.MaxLongTracker maxTimestampTracker = new 
ColumnStats.MaxLongTracker(Long.MAX_VALUE);
-        ColumnStats.MinLongTracker minTimestampTracker = new 
ColumnStats.MinLongTracker(Long.MIN_VALUE);
-        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new 
ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
-        List<ByteBuffer> minColumnNames = Collections.emptyList();
-        List<ByteBuffer> maxColumnNames = Collections.emptyList();
-        StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        boolean hasLegacyCounterShards = false;
-
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
-        cf.delete(DeletionTime.serializer.deserialize(in));
-
-        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, 
key.getKey(), dataFile.stream);
-
-        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < 
Integer.MAX_VALUE)
-        {
-            
tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
-            
maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
-            
minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-            
maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
-        }
-
-        Iterator<RangeTombstone> rangeTombstoneIterator = 
cf.deletionInfo().rangeIterator();
-        while (rangeTombstoneIterator.hasNext())
-        {
-            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
-            tombstones.update(rangeTombstone.getLocalDeletionTime());
-            minTimestampTracker.update(rangeTombstone.timestamp());
-            maxTimestampTracker.update(rangeTombstone.timestamp());
-            
maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
-            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, 
rangeTombstone.min, metadata.comparator);
-            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, 
rangeTombstone.max, metadata.comparator);
-        }
-
-        Iterator<OnDiskAtom> iter = metadata.getOnDiskIterator(in, 
ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version);
-        try
-        {
-            while (iter.hasNext())
-            {
-                OnDiskAtom atom = iter.next();
-                if (atom == null)
-                    break;
-
-                if (atom instanceof CounterCell)
-                {
-                    atom = ((CounterCell) atom).markLocalToBeCleared();
-                    hasLegacyCounterShards = hasLegacyCounterShards || 
((CounterCell) atom).hasLegacyShards();
-                }
-
-                int deletionTime = atom.getLocalDeletionTime();
-                if (deletionTime < Integer.MAX_VALUE)
-                    tombstones.update(deletionTime);
-                minTimestampTracker.update(atom.timestamp());
-                maxTimestampTracker.update(atom.timestamp());
-                minColumnNames = 
ColumnNameHelper.minComponents(minColumnNames, atom.name(), 
metadata.comparator);
-                maxColumnNames = 
ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), 
metadata.comparator);
-                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
-
-                columnIndexer.add(atom); // This write the atom on disk too
-            }
-
-            columnIndexer.maybeWriteEmptyRowHeader();
-            dataFile.stream.writeShort(END_OF_ROW);
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, dataFile.getPath());
-        }
-
-        sstableMetadataCollector.updateMinTimestamp(minTimestampTracker.get())
-                                .updateMaxTimestamp(maxTimestampTracker.get())
-                                
.updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
-                                .addRowSize(dataFile.getFilePointer() - 
currentPosition)
-                                
.addColumnCount(columnIndexer.writtenAtomCount())
-                                .mergeTombstoneHistogram(tombstones)
-                                .updateMinColumnNames(minColumnNames)
-                                .updateMaxColumnNames(maxColumnNames)
-                                
.updateHasLegacyCounterShards(hasLegacyCounterShards);
-        afterAppend(key, currentPosition, 
RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), 
columnIndexer.build()));
-        return currentPosition;
-    }
-
-    /**
-     * After failure, attempt to close the index writer and data file before 
deleting all temp components for the sstable
-     */
-    public void abort()
-    {
-        abort(true);
-    }
-    public void abort(boolean closeBf)
-    {
-        assert descriptor.type.isTemporary;
-        if (iwriter == null && dataFile == null)
-            return;
-        if (iwriter != null)
-        {
-            FileUtils.closeQuietly(iwriter.indexFile);
-            if (closeBf)
-            {
-                iwriter.bf.close();
-            }
-        }
-        if (dataFile!= null)
-            FileUtils.closeQuietly(dataFile);
-
-        Set<Component> components = SSTable.componentsFor(descriptor);
-        try
-        {
-            if (!components.isEmpty())
-                SSTable.delete(descriptor, components);
-        }
-        catch (FSWriteError e)
-        {
-            logger.error(String.format("Failed deleting temp components for 
%s", descriptor), e);
-            throw e;
-        }
-    }
-
-    // we use this method to ensure any managed data we may have retained 
references to during the write are no
-    // longer referenced, so that we do not need to enclose the expensive call 
to closeAndOpenReader() in a transaction
-    public void isolateReferences()
-    {
-        // currently we only maintain references to first/last/lastWrittenKey 
from the data provided; all other
-        // data retention is done through copying
-        first = getMinimalKey(first);
-        last = lastWrittenKey = getMinimalKey(last);
-    }
-
-    public SSTableReader openEarly(long maxDataAge)
-    {
-        StatsMetadata sstableMetadata = (StatsMetadata) 
sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
-                                                  
metadata.getBloomFilterFpChance(),
-                                                  
repairedAt).get(MetadataType.STATS);
-
-        // find the max (exclusive) readable key
-        DecoratedKey exclusiveUpperBoundOfReadableIndex = 
iwriter.getMaxReadableKey(0);
-        if (exclusiveUpperBoundOfReadableIndex == null)
-            return null;
-
-        // create temp links if they don't already exist
-        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
-        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
-        {
-            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new 
File(link.filenameFor(Component.PRIMARY_INDEX)));
-            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.DATA)), new 
File(link.filenameFor(Component.DATA)));
-        }
-
-        // open the reader early, giving it a FINAL descriptor type so that it 
is indistinguishable for other consumers
-        SegmentedFile ifile = 
iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = 
dbuilder.openEarly(link.filenameFor(Component.DATA));
-        SSTableReader sstable = 
SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
-                                                           components, 
metadata,
-                                                           partitioner, ifile,
-                                                           dfile, 
iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
-                                                           iwriter.bf, 
maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
-
-        // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
-        sstable.first = getMinimalKey(first);
-        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
-        DecoratedKey inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(1);
-        if (inclusiveUpperBoundOfReadableData == null)
-            return null;
-        int offset = 2;
-        while (true)
-        {
-            RowIndexEntry indexEntry = 
sstable.getPosition(inclusiveUpperBoundOfReadableData, 
SSTableReader.Operator.GT);
-            if (indexEntry != null && indexEntry.position <= 
dataFile.getLastFlushOffset())
-                break;
-            inclusiveUpperBoundOfReadableData = 
iwriter.getMaxReadableKey(offset++);
-            if (inclusiveUpperBoundOfReadableData == null)
-                return null;
-        }
-        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
-        return sstable;
-    }
-
-    public SSTableReader closeAndOpenReader()
-    {
-        return closeAndOpenReader(System.currentTimeMillis());
-    }
-
-    public SSTableReader closeAndOpenReader(long maxDataAge)
-    {
-        return closeAndOpenReader(maxDataAge, this.repairedAt);
-    }
-
-    public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
-    {
-        Pair<Descriptor, StatsMetadata> p = close(repairedAt);
-        Descriptor newdesc = p.left;
-        StatsMetadata sstableMetadata = p.right;
-
-        // finalize in-memory state for the reader
-        SegmentedFile ifile = 
iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = 
dbuilder.complete(newdesc.filenameFor(Component.DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
-                                                           components,
-                                                           metadata,
-                                                           partitioner,
-                                                           ifile,
-                                                           dfile,
-                                                           
iwriter.summary.build(partitioner),
-                                                           iwriter.bf,
-                                                           maxDataAge,
-                                                           sstableMetadata,
-                                                           
SSTableReader.OpenReason.NORMAL);
-        sstable.first = getMinimalKey(first);
-        sstable.last = getMinimalKey(last);
-        // try to save the summaries to disk
-        sstable.saveSummary(iwriter.builder, dbuilder);
-        iwriter = null;
-        dbuilder = null;
-        return sstable;
-    }
-
-    // Close the writer and return the descriptor to the new sstable and it's 
metadata
-    public Pair<Descriptor, StatsMetadata> close()
-    {
-        return close(this.repairedAt);
-    }
-
-    private Pair<Descriptor, StatsMetadata> close(long repairedAt)
-    {
-
-        // index and filter
-        iwriter.close();
-        // main data, close will truncate if necessary
-        dataFile.close();
-        dataFile.writeFullChecksum(descriptor);
-        // write sstable statistics
-        Map<MetadataType, MetadataComponent> metadataComponents = 
sstableMetadataCollector.finalizeMetadata(
-                                                                               
     partitioner.getClass().getCanonicalName(),
-                                                                               
     metadata.getBloomFilterFpChance(),
-                                                                               
     repairedAt);
-        writeMetadata(descriptor, metadataComponents);
-
-        // save the table of components
-        SSTable.appendTOC(descriptor, components);
-
-        // remove the 'tmp' marker from all components
-        return Pair.create(rename(descriptor, components), (StatsMetadata) 
metadataComponents.get(MetadataType.STATS));
-
-    }
-
-
-    private static void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
-    {
-        SequentialWriter out = SequentialWriter.open(new 
File(desc.filenameFor(Component.STATS)));
-        try
-        {
-            desc.getMetadataSerializer().serialize(components, out.stream);
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, out.getPath());
-        }
-        finally
-        {
-            out.close();
-        }
-    }
-
-    static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
-    {
-        Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
-        rename(tmpdesc, newdesc, components);
-        return newdesc;
-    }
-
-    public static void rename(Descriptor tmpdesc, Descriptor newdesc, 
Set<Component> components)
-    {
-        for (Component component : Sets.difference(components, 
Sets.newHashSet(Component.DATA, Component.SUMMARY)))
-        {
-            FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), 
newdesc.filenameFor(component));
-        }
-
-        // do -Data last because -Data present should mean the sstable was 
completely renamed before crash
-        FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), 
newdesc.filenameFor(Component.DATA));
-
-        // rename it without confirmation because summary can be available for 
loadNewSSTables but not for closeAndOpenReader
-        FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), 
newdesc.filenameFor(Component.SUMMARY));
-    }
-
-    public long getFilePointer()
-    {
-        return dataFile.getFilePointer();
-    }
-
-    public long getOnDiskFilePointer()
-    {
-        return dataFile.getOnDiskFilePointer();
-    }
-
-    /**
-     * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
-     */
-    class IndexWriter implements Closeable
-    {
-        private final SequentialWriter indexFile;
-        public final SegmentedFile.Builder builder;
-        public final IndexSummaryBuilder summary;
-        public final IFilter bf;
-        private FileMark mark;
-
-        IndexWriter(long keyCount)
-        {
-            indexFile = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
-            builder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
-            summary = new IndexSummaryBuilder(keyCount, 
metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
-            bf = FilterFactory.getFilter(keyCount, 
metadata.getBloomFilterFpChance(), true);
-        }
-
-        // finds the last (-offset) decorated key that can be guaranteed to 
occur fully in the flushed portion of the index file
-        DecoratedKey getMaxReadableKey(int offset)
-        {
-            long maxIndexLength = indexFile.getLastFlushOffset();
-            return summary.getMaxReadableKey(maxIndexLength, offset);
-        }
-
-        public void append(DecoratedKey key, RowIndexEntry indexEntry)
-        {
-            bf.add(key.getKey());
-            long indexPosition = indexFile.getFilePointer();
-            try
-            {
-                ByteBufferUtil.writeWithShortLength(key.getKey(), 
indexFile.stream);
-                
metadata.comparator.rowIndexEntrySerializer().serialize(indexEntry, 
indexFile.stream);
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, indexFile.getPath());
-            }
-
-            if (logger.isTraceEnabled())
-                logger.trace("wrote index entry: {} at {}", indexEntry, 
indexPosition);
-
-            summary.maybeAddEntry(key, indexPosition);
-            builder.addPotentialBoundary(indexPosition);
-        }
-
-        /**
-         * Closes the index and bloomfilter, making the public state of this 
writer valid for consumption.
-         */
-        public void close()
-        {
-            if (components.contains(Component.FILTER))
-            {
-                String path = descriptor.filenameFor(Component.FILTER);
-                try
-                {
-                    // bloom filter
-                    FileOutputStream fos = new FileOutputStream(path);
-                    DataOutputStreamAndChannel stream = new 
DataOutputStreamAndChannel(fos);
-                    FilterFactory.serialize(bf, stream);
-                    stream.flush();
-                    fos.getFD().sync();
-                    stream.close();
-                }
-                catch (IOException e)
-                {
-                    throw new FSWriteError(e, path);
-                }
-            }
-
-            // index
-            long position = indexFile.getFilePointer();
-            indexFile.close(); // calls force
-            FileUtils.truncate(indexFile.getPath(), position);
-        }
-
-        public void mark()
-        {
-            mark = indexFile.mark();
-        }
-
-        public void resetAndTruncate()
-        {
-            // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
-            // we can't reset dbuilder either, but that is the last thing 
called in afterappend so
-            // we assume that if that worked then we won't be trying to reset.
-            indexFile.resetAndTruncate(mark);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "IndexWriter(" + descriptor + ")";
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
new file mode 100644
index 0000000..ca003b6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.FileDataInput;
+
+import java.util.Iterator;
+
+/**
+ * Provides the accessors to data on disk.
+ */
+public interface SSTableFormat
+{
+    static boolean enableSSTableDevelopmentTestMode = 
Boolean.valueOf(System.getProperty("cassandra.test.sstableformatdevelopment","false"));
+
+
+    Version getLatestVersion();
+    Version getVersion(String version);
+
+    SSTableWriter.Factory getWriterFactory();
+    SSTableReader.Factory getReaderFactory();
+
+    Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, 
ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version);
+
+    AbstractCompactedRow getCompactedRowWriter(CompactionController 
controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators);
+
+    RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm);
+
+    public static enum Type
+    {
+        //Used internally to refer to files with no
+        //format flag in the filename
+        LEGACY("big", BigFormat.instance),
+
+        //The original sstable format
+        BIG("big", BigFormat.instance);
+
+        public final SSTableFormat info;
+        public final String name;
+        private Type(String name, SSTableFormat info)
+        {
+            //Since format comes right after generation
+            //we disallow formats with numeric names
+            assert !CharMatcher.DIGIT.matchesAllOf(name);
+
+            this.name = name;
+            this.info = info;
+        }
+
+        public static Type validate(String name)
+        {
+            for (Type valid : Type.values())
+            {
+                //This is used internally for old sstables
+                if (valid == LEGACY)
+                    continue;
+
+                if (valid.name.equalsIgnoreCase(name))
+                    return valid;
+            }
+
+            throw new IllegalArgumentException("No Type constant " + name);
+        }
+    }
+}

Reply via email to