Repository: cassandra
Updated Branches:
  refs/heads/trunk 5bc2f0130 -> fb221095c


Remove DatabaseDescriptor dependency from Sequentialwriter

patch by yukim; reviewed by snazy for CASSANDRA-11579


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb221095
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb221095
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb221095

Branch: refs/heads/trunk
Commit: fb221095cb2a18cf8f027a8a084700d606bb9ca3
Parents: 5bc2f01
Author: Yuki Morishita <[email protected]>
Authored: Mon Jun 27 10:51:10 2016 +0900
Committer: Yuki Morishita <[email protected]>
Committed: Mon Jun 27 10:55:47 2016 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |   7 +-
 .../index/sasi/disk/OnDiskIndexBuilder.java     |   7 +-
 .../io/compress/CompressedSequentialWriter.java |  39 +++--
 .../io/sstable/format/big/BigTableWriter.java   |  62 ++++----
 .../cassandra/io/util/ChecksumWriter.java       | 103 +++++++++++++
 .../io/util/ChecksummedSequentialWriter.java    |  24 +--
 .../io/util/DataIntegrityMetadata.java          |  86 -----------
 .../cassandra/io/util/SequentialWriter.java     |  82 ++++------
 .../io/util/SequentialWriterOption.java         | 154 +++++++++++++++++++
 .../apache/cassandra/db/RowIndexEntryTest.java  |  14 +-
 .../hints/ChecksummedDataInputTest.java         |   6 +-
 .../index/sasi/disk/TokenTreeTest.java          |  16 +-
 .../CompressedRandomAccessReaderTest.java       |  21 +--
 .../CompressedSequentialWriterTest.java         |  58 ++++++-
 .../cassandra/io/sstable/DescriptorTest.java    |   5 +-
 .../io/util/BufferedRandomAccessFileTest.java   |   8 +-
 .../util/ChecksummedRandomAccessReaderTest.java |  28 ++--
 .../util/ChecksummedSequentialWriterTest.java   |   4 +-
 .../cassandra/io/util/DataOutputTest.java       |   6 +-
 .../cassandra/io/util/MmappedRegionsTest.java   |   9 +-
 .../io/util/RandomAccessReaderTest.java         |   8 +-
 .../cassandra/io/util/SequentialWriterTest.java |  50 +++++-
 .../compression/CompressedInputStreamTest.java  |   9 +-
 24 files changed, 539 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d40cab4..5b72016 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.8
+ * Remove DatabaseDescriptor dependencies from SequentialWriter 
(CASSANDRA-11579)
  * Move skip_stop_words filter before stemming (CASSANDRA-12078)
  * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
  * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 1b48d4f..cb2ad8a 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -84,6 +84,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
 
     private static volatile IStreamFactory streamFactory = new IStreamFactory()
     {
+        private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+                                                                    
.trickleFsync(DatabaseDescriptor.getTrickleFsync())
+                                                                    
.trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 
1024)
+                                                                    
.finishOnClose(true).build();
+
         public InputStream getInputStream(File dataPath, File crcPath) throws 
IOException
         {
             return new ChecksummedRandomAccessReader.Builder(dataPath, 
crcPath).build();
@@ -91,7 +96,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
 
         public OutputStream getOutputStream(File dataPath, File crcPath)
         {
-            return SequentialWriter.open(dataPath, crcPath).finishOnClose();
+            return new ChecksummedSequentialWriter(dataPath, crcPath, null, 
writerOption);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java 
b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index 8acbb05..4946f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.index.sasi.sa.TermIterator;
 import org.apache.cassandra.index.sasi.sa.SuffixSA;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -131,6 +130,10 @@ public class OnDiskIndexBuilder
     public static final int SUPER_BLOCK_SIZE = 64;
     public static final int IS_PARTIAL_BIT = 15;
 
+    private static final SequentialWriterOption WRITER_OPTION = 
SequentialWriterOption.newBuilder()
+                                                                               
       .bufferSize(BLOCK_SIZE)
+                                                                               
       .build();
+
     private final List<MutableLevel<InMemoryPointerTerm>> levels = new 
ArrayList<>();
     private MutableLevel<InMemoryDataTerm> dataLevel;
 
@@ -263,7 +266,7 @@ public class OnDiskIndexBuilder
 
         try
         {
-            out = new SequentialWriter(file, BLOCK_SIZE, BufferType.ON_HEAP);
+            out = new SequentialWriter(file, WRITER_OPTION);
 
             out.writeUTF(descriptor.version.toString());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 1f33d53..9bdb1b4 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,29 +17,27 @@
  */
 package org.apache.cassandra.io.compress;
 
-import static org.apache.cassandra.utils.Throwables.merge;
-
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.util.Optional;
 import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 public class CompressedSequentialWriter extends SequentialWriter
 {
-    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
+    private final ChecksumWriter crcMetadata;
 
     // holds offset in the file where current chunk should be written
     // changed only by flush() method where data buffer gets compressed and 
stored to the file
@@ -60,14 +58,34 @@ public class CompressedSequentialWriter extends 
SequentialWriter
     private final MetadataCollector sstableMetadataCollector;
 
     private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
+    private final Optional<File> digestFile;
 
+    /**
+     * Create CompressedSequentialWriter without digest file.
+     *
+     * @param file File to write
+     * @param offsetsPath File name to write compression metadata
+     * @param digestFile File to write digest
+     * @param option Write option (buffer size and type will be set the same 
as compression params)
+     * @param parameters Compression mparameters
+     * @param sstableMetadataCollector Metadata collector
+     */
     public CompressedSequentialWriter(File file,
                                       String offsetsPath,
+                                      File digestFile,
+                                      SequentialWriterOption option,
                                       CompressionParams parameters,
                                       MetadataCollector 
sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength(), 
parameters.getSstableCompressor().preferredBufferType());
+        super(file, SequentialWriterOption.newBuilder()
+                            .bufferSize(option.bufferSize())
+                            .bufferType(option.bufferType())
+                            .bufferSize(parameters.chunkLength())
+                            
.bufferType(parameters.getSstableCompressor().preferredBufferType())
+                            .finishOnClose(option.finishOnClose())
+                            .build());
         this.compressor = parameters.getSstableCompressor();
+        this.digestFile = Optional.ofNullable(digestFile);
 
         // buffer for compression should be the same size as buffer itself
         compressed = 
compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity()));
@@ -76,7 +94,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         metadataWriter = CompressionMetadata.Writer.open(parameters, 
offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new 
DataOutputStream(Channels.newOutputStream(channel)));
+        crcMetadata = new ChecksumWriter(new 
DataOutputStream(Channels.newOutputStream(channel)));
     }
 
     @Override
@@ -287,8 +305,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         protected void doPrepare()
         {
             syncInternal();
-            if (descriptor != null)
-                crcMetadata.writeFullChecksum(descriptor);
+            digestFile.ifPresent(crcMetadata::writeFullChecksum);
             sstableMetadataCollector.addCompressionRatio(compressedSize, 
uncompressedSize);
             metadataWriter.finalizeLength(current(), 
chunkCount).prepareToCommit();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 8645158..c1d9bbc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -60,10 +60,15 @@ public class BigTableWriter extends SSTableWriter
     private DataPosition dataMark;
     private long lastEarlyOpenLength = 0;
 
-    public BigTableWriter(Descriptor descriptor, 
-                          Long keyCount, 
-                          Long repairedAt, 
-                          CFMetaData metadata, 
+    private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+                                                        
.trickleFsync(DatabaseDescriptor.getTrickleFsync())
+                                                        
.trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 
1024)
+                                                        .build();
+
+    public BigTableWriter(Descriptor descriptor,
+                          long keyCount,
+                          long repairedAt,
+                          CFMetaData metadata,
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
                           Collection<SSTableFlushObserver> observers,
@@ -74,18 +79,23 @@ public class BigTableWriter extends SSTableWriter
 
         if (compression)
         {
-            dataFile = SequentialWriter.open(getFilename(),
+            dataFile = new CompressedSequentialWriter(new File(getFilename()),
                                              
descriptor.filenameFor(Component.COMPRESSION_INFO),
+                                             new 
File(descriptor.filenameFor(descriptor.digestComponent)),
+                                             writerOption,
                                              metadata.params.compression,
                                              metadataCollector);
             dbuilder = 
SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
         }
         else
         {
-            dataFile = SequentialWriter.open(new File(getFilename()), new 
File(descriptor.filenameFor(Component.CRC)));
+            dataFile = new ChecksummedSequentialWriter(new File(getFilename()),
+                                                       new 
File(descriptor.filenameFor(Component.CRC)),
+                                                       new 
File(descriptor.filenameFor(descriptor.digestComponent)),
+                                                       writerOption);
             dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
         }
-        iwriter = new IndexWriter(keyCount, dataFile);
+        iwriter = new IndexWriter(keyCount);
 
         columnIndexWriter = new ColumnIndex(this.header, dataFile, 
descriptor.version, this.observers, 
getRowIndexEntrySerializer().indexInfoSerializer());
     }
@@ -338,7 +348,7 @@ public class BigTableWriter extends SSTableWriter
             iwriter.prepareToCommit();
 
             // write sstable statistics
-            dataFile.setDescriptor(descriptor).prepareToCommit();
+            dataFile.prepareToCommit();
             writeMetadata(descriptor, finalizeMetadata());
 
             // save the table of components
@@ -370,13 +380,13 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
-    private static void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
+    private void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
     {
         File file = new File(desc.filenameFor(Component.STATS));
-        try (SequentialWriter out = SequentialWriter.open(file))
+        try (SequentialWriter out = new SequentialWriter(file, writerOption))
         {
             desc.getMetadataSerializer().serialize(components, out, 
desc.version);
-            out.setDescriptor(desc).finish();
+            out.finish();
         }
         catch (IOException e)
         {
@@ -410,27 +420,15 @@ public class BigTableWriter extends SSTableWriter
         public final IFilter bf;
         private DataPosition mark;
 
-        IndexWriter(long keyCount, final SequentialWriter dataFile)
+        IndexWriter(long keyCount)
         {
-            indexFile = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+            indexFile = new SequentialWriter(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption);
             builder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
             summary = new IndexSummaryBuilder(keyCount, 
metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, 
metadata.params.bloomFilterFpChance, true, 
descriptor.version.hasOldBfHashOrder());
             // register listeners to be alerted when the data files are flushed
-            indexFile.setPostFlushListener(new Runnable()
-            {
-                public void run()
-                {
-                    summary.markIndexSynced(indexFile.getLastFlushOffset());
-                }
-            });
-            dataFile.setPostFlushListener(new Runnable()
-            {
-                public void run()
-                {
-                    summary.markDataSynced(dataFile.getLastFlushOffset());
-                }
-            });
+            indexFile.setPostFlushListener(() -> 
summary.markIndexSynced(indexFile.getLastFlushOffset()));
+            dataFile.setPostFlushListener(() -> 
summary.markDataSynced(dataFile.getLastFlushOffset()));
         }
 
         // finds the last (-offset) decorated key that can be guaranteed to 
occur fully in the flushed portion of the index file
@@ -501,15 +499,15 @@ public class BigTableWriter extends SSTableWriter
             flushBf();
 
             // truncate index file
-            long position = iwriter.indexFile.position();
-            iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
-            FileUtils.truncate(iwriter.indexFile.getPath(), position);
+            long position = indexFile.position();
+            indexFile.prepareToCommit();
+            FileUtils.truncate(indexFile.getPath(), position);
 
             // save summary
             summary.prepareToCommit();
-            try (IndexSummary summary = 
iwriter.summary.build(getPartitioner()))
+            try (IndexSummary indexSummary = summary.build(getPartitioner()))
             {
-                SSTableReader.saveSummary(descriptor, first, last, 
iwriter.builder, dbuilder, summary);
+                SSTableReader.saveSummary(descriptor, first, last, builder, 
dbuilder, indexSummary);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/ChecksumWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksumWriter.java 
b/src/java/org/apache/cassandra/io/util/ChecksumWriter.java
new file mode 100644
index 0000000..dc5eaea
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ChecksumWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.zip.CRC32;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Charsets;
+
+import org.apache.cassandra.io.FSWriteError;
+
+public class ChecksumWriter
+{
+    private final CRC32 incrementalChecksum = new CRC32();
+    private final DataOutput incrementalOut;
+    private final CRC32 fullChecksum = new CRC32();
+
+    public ChecksumWriter(DataOutput incrementalOut)
+    {
+        this.incrementalOut = incrementalOut;
+    }
+
+    public void writeChunkSize(int length)
+    {
+        try
+        {
+            incrementalOut.writeInt(length);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    // checksumIncrementalResult indicates if the checksum we compute for this 
buffer should itself be
+    // included in the full checksum, translating to if the partial checksum 
is serialized along with the
+    // data it checksums (in which case the file checksum as calculated by 
external tools would mismatch if
+    // we did not include it), or independently.
+
+    // CompressedSequentialWriters serialize the partial checksums inline with 
the compressed data chunks they
+    // corroborate, whereas ChecksummedSequentialWriters serialize them to a 
different file.
+    public void appendDirect(ByteBuffer bb, boolean checksumIncrementalResult)
+    {
+        try
+        {
+            ByteBuffer toAppend = bb.duplicate();
+            toAppend.mark();
+            incrementalChecksum.update(toAppend);
+            toAppend.reset();
+
+            int incrementalChecksumValue = (int) 
incrementalChecksum.getValue();
+            incrementalOut.writeInt(incrementalChecksumValue);
+
+            fullChecksum.update(toAppend);
+            if (checksumIncrementalResult)
+            {
+                ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+                byteBuffer.putInt(incrementalChecksumValue);
+                assert byteBuffer.arrayOffset() == 0;
+                fullChecksum.update(byteBuffer.array(), 0, 
byteBuffer.array().length);
+            }
+            incrementalChecksum.reset();
+
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public void writeFullChecksum(@Nonnull File digestFile)
+    {
+        try (BufferedWriter out = Files.newBufferedWriter(digestFile.toPath(), 
Charsets.UTF_8))
+        {
+            out.write(String.valueOf(fullChecksum.getValue()));
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, digestFile);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index fd88151..f89e7cc 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -19,20 +19,25 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-
-import org.apache.cassandra.io.compress.BufferType;
+import java.util.Optional;
 
 public class ChecksummedSequentialWriter extends SequentialWriter
 {
+    private static final SequentialWriterOption CRC_WRITER_OPTION = 
SequentialWriterOption.newBuilder()
+                                                                               
           .bufferSize(8 * 1024)
+                                                                               
           .build();
+
     private final SequentialWriter crcWriter;
-    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
+    private final ChecksumWriter crcMetadata;
+    private final Optional<File> digestFile;
 
-    public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
+    public ChecksummedSequentialWriter(File file, File crcPath, File 
digestFile, SequentialWriterOption option)
     {
-        super(file, bufferSize, BufferType.ON_HEAP);
-        crcWriter = new SequentialWriter(crcPath, 8 * 1024, 
BufferType.ON_HEAP);
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter);
+        super(file, option);
+        crcWriter = new SequentialWriter(crcPath, CRC_WRITER_OPTION);
+        crcMetadata = new ChecksumWriter(crcWriter);
         crcMetadata.writeChunkSize(buffer.capacity());
+        this.digestFile = Optional.ofNullable(digestFile);
     }
 
     @Override
@@ -63,9 +68,8 @@ public class ChecksummedSequentialWriter extends 
SequentialWriter
         protected void doPrepare()
         {
             syncInternal();
-            if (descriptor != null)
-                crcMetadata.writeFullChecksum(descriptor);
-            crcWriter.setDescriptor(descriptor).prepareToCommit();
+            digestFile.ifPresent(crcMetadata::writeFullChecksum);
+            crcWriter.prepareToCommit();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java 
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 0c48d13..0eecef3 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -17,21 +17,12 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.BufferedWriter;
 import java.io.Closeable;
-import java.io.DataOutput;
 import java.io.File;
-import java.io.IOError;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
-import com.google.common.base.Charsets;
-
-import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ChecksumType;
@@ -142,81 +133,4 @@ public class DataIntegrityMetadata
                                dataReader::close);
         }
     }
-
-
-    public static class ChecksumWriter
-    {
-        private final CRC32 incrementalChecksum = new CRC32();
-        private final DataOutput incrementalOut;
-        private final CRC32 fullChecksum = new CRC32();
-
-        public ChecksumWriter(DataOutput incrementalOut)
-        {
-            this.incrementalOut = incrementalOut;
-        }
-
-        public void writeChunkSize(int length)
-        {
-            try
-            {
-                incrementalOut.writeInt(length);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        // checksumIncrementalResult indicates if the checksum we compute for 
this buffer should itself be
-        // included in the full checksum, translating to if the partial 
checksum is serialized along with the
-        // data it checksums (in which case the file checksum as calculated by 
external tools would mismatch if
-        // we did not include it), or independently.
-
-        // CompressedSequentialWriters serialize the partial checksums inline 
with the compressed data chunks they
-        // corroborate, whereas ChecksummedSequentialWriters serialize them to 
a different file.
-        public void appendDirect(ByteBuffer bb, boolean 
checksumIncrementalResult)
-        {
-            try
-            {
-
-                ByteBuffer toAppend = bb.duplicate();
-                toAppend.mark();
-                incrementalChecksum.update(toAppend);
-                toAppend.reset();
-
-                int incrementalChecksumValue = (int) 
incrementalChecksum.getValue();
-                incrementalOut.writeInt(incrementalChecksumValue);
-
-                fullChecksum.update(toAppend);
-                if (checksumIncrementalResult)
-                {
-                    ByteBuffer byteBuffer = ByteBuffer.allocate(4);
-                    byteBuffer.putInt(incrementalChecksumValue);
-                    assert byteBuffer.arrayOffset() == 0;
-                    fullChecksum.update(byteBuffer.array(), 0, 
byteBuffer.array().length);
-                }
-                incrementalChecksum.reset();
-
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public void writeFullChecksum(Descriptor descriptor)
-        {
-            if (descriptor.digestComponent == null)
-                throw new NullPointerException("Null digest component for " + 
descriptor.ksname + '.' + descriptor.cfname + " file " + 
descriptor.baseFilename());
-            File outFile = new 
File(descriptor.filenameFor(descriptor.digestComponent));
-            try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), 
Charsets.UTF_8))
-            {
-                out.write(String.valueOf(fullChecksum.getValue()));
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, outFile);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 45e4cfa..e71f2fa 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -17,32 +17,24 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.utils.Throwables.merge;
 
-import org.apache.cassandra.utils.SyncUtil;
-
 /**
  * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on 
close; we may also
  * fsync incrementally if Config.trickle_fsync is enabled.
  */
 public class SequentialWriter extends BufferedDataOutputStreamPlus implements 
Transactional
 {
-    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
     // absolute path to the given file
     private final String filePath;
 
@@ -53,8 +45,7 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
 
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer 
flushing by kernel causing read
     // latency spikes
-    private boolean trickleFsync;
-    private int trickleFsyncByteInterval;
+    private final SequentialWriterOption option;
     private int bytesSinceTrickleFsync = 0;
 
     protected long lastFlushOffset;
@@ -62,8 +53,6 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
     protected Runnable runPostFlush;
 
     private final TransactionalProxy txnProxy = txnProxy();
-    private boolean finishOnClose;
-    protected Descriptor descriptor;
 
     // due to lack of multiple-inheritance, we proxy our transactional 
implementation
     protected class TransactionalProxy extends AbstractTransactional
@@ -102,7 +91,8 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
     }
 
     // TODO: we should specify as a parameter if we permit an existing file or 
not
-    private static FileChannel openChannel(File file) {
+    private static FileChannel openChannel(File file)
+    {
         try
         {
             if (file.exists())
@@ -130,43 +120,31 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
         }
     }
 
-    public SequentialWriter(File file, int bufferSize, BufferType bufferType)
-    {
-        super(openChannel(file), bufferType.allocate(bufferSize));
-        strictFlushing = true;
-        fchannel = (FileChannel)channel;
-
-        filePath = file.getAbsolutePath();
-
-        this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
-        this.trickleFsyncByteInterval = 
DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
-    }
-
     /**
-     * Open a heap-based, non-compressed SequentialWriter
+     * Create heap-based, non-compressed SequenialWriter with default buffer 
size(64k).
+     *
+     * @param file File to write
      */
-    public static SequentialWriter open(File file)
+    public SequentialWriter(File file)
     {
-        return new SequentialWriter(file, DEFAULT_BUFFER_SIZE, 
BufferType.ON_HEAP);
+       this(file, SequentialWriterOption.DEFAULT);
     }
 
-    public static ChecksummedSequentialWriter open(File file, File crcPath)
+    /**
+     * Create SequentialWriter for given file with specific writer option.
+     *
+     * @param file File to write
+     * @param option Writer option
+     */
+    public SequentialWriter(File file, SequentialWriterOption option)
     {
-        return new ChecksummedSequentialWriter(file, DEFAULT_BUFFER_SIZE, 
crcPath);
-    }
+        super(openChannel(file), option.allocateBuffer());
+        strictFlushing = true;
+        fchannel = (FileChannel)channel;
 
-    public static CompressedSequentialWriter open(String dataFilePath,
-                                                  String offsetsPath,
-                                                  CompressionParams parameters,
-                                                  MetadataCollector 
sstableMetadataCollector)
-    {
-        return new CompressedSequentialWriter(new File(dataFilePath), 
offsetsPath, parameters, sstableMetadataCollector);
-    }
+        filePath = file.getAbsolutePath();
 
-    public SequentialWriter finishOnClose()
-    {
-        finishOnClose = true;
-        return this;
+        this.option = option;
     }
 
     public void skipBytes(int numBytes) throws IOException
@@ -212,10 +190,10 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
     {
         flushData();
 
-        if (trickleFsync)
+        if (option.trickleFsync())
         {
             bytesSinceTrickleFsync += buffer.position();
-            if (bytesSinceTrickleFsync >= trickleFsyncByteInterval)
+            if (bytesSinceTrickleFsync >= option.trickleFsyncByteInterval())
             {
                 syncDataOnlyInternal();
                 bytesSinceTrickleFsync = 0;
@@ -348,6 +326,7 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
             throw new FSReadError(e, getPath());
         }
 
+        bufferOffset = truncateTarget;
         resetBuffer();
     }
 
@@ -361,6 +340,7 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
         try
         {
             fchannel.truncate(toSize);
+            lastFlushOffset = toSize;
         }
         catch (IOException e)
         {
@@ -373,12 +353,6 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
         return channel.isOpen();
     }
 
-    public SequentialWriter setDescriptor(Descriptor descriptor)
-    {
-        this.descriptor = descriptor;
-        return this;
-    }
-
     public final void prepareToCommit()
     {
         txnProxy.prepareToCommit();
@@ -397,7 +371,7 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
     @Override
     public final void close()
     {
-        if (finishOnClose)
+        if (option.finishOnClose())
             txnProxy.finish();
         else
             txnProxy.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java
new file mode 100644
index 0000000..61f375b
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import org.apache.cassandra.io.compress.BufferType;
+
+/**
+ * SequentialWriter option
+ */
+public class SequentialWriterOption
+{
+    /**
+     * Default write option.
+     *
+     * <ul>
+     *   <li>buffer size: 64 KB
+     *   <li>buffer type: on heap
+     *   <li>trickle fsync: false
+     *   <li>trickle fsync byte interval: 10 MB
+     *   <li>finish on close: false
+     * </ul>
+     */
+    public static final SequentialWriterOption DEFAULT = 
SequentialWriterOption.newBuilder().build();
+
+    private final int bufferSize;
+    private final BufferType bufferType;
+    private final boolean trickleFsync;
+    private final int trickleFsyncByteInterval;
+    private final boolean finishOnClose;
+
+    private SequentialWriterOption(int bufferSize,
+                                   BufferType bufferType,
+                                   boolean trickleFsync,
+                                   int trickleFsyncByteInterval,
+                                   boolean finishOnClose)
+    {
+        this.bufferSize = bufferSize;
+        this.bufferType = bufferType;
+        this.trickleFsync = trickleFsync;
+        this.trickleFsyncByteInterval = trickleFsyncByteInterval;
+        this.finishOnClose = finishOnClose;
+    }
+
+    public static Builder newBuilder()
+    {
+        return new Builder();
+    }
+
+    public int bufferSize()
+    {
+        return bufferSize;
+    }
+
+    public BufferType bufferType()
+    {
+        return bufferType;
+    }
+
+    public boolean trickleFsync()
+    {
+        return trickleFsync;
+    }
+
+    public int trickleFsyncByteInterval()
+    {
+        return trickleFsyncByteInterval;
+    }
+
+    public boolean finishOnClose()
+    {
+        return finishOnClose;
+    }
+
+    /**
+     * Allocate buffer using set buffer type and buffer size.
+     *
+     * @return allocated ByteBuffer
+     */
+    public ByteBuffer allocateBuffer()
+    {
+        return bufferType.allocate(bufferSize);
+    }
+
+    public static class Builder
+    {
+        /* default buffer size: 64k */
+        private int bufferSize = 64 * 1024;
+        /* default buffer type: on heap */
+        private BufferType bufferType = BufferType.ON_HEAP;
+        /* default: no trickle fsync */
+        private boolean trickleFsync = false;
+        /* default tricle fsync byte interval: 10MB */
+        private int trickleFsyncByteInterval = 10 * 1024 * 1024;
+        private boolean finishOnClose = false;
+
+        /* construct throguh SequentialWriteOption.newBuilder */
+        private Builder() {}
+
+        public SequentialWriterOption build()
+        {
+            return new SequentialWriterOption(bufferSize, bufferType, 
trickleFsync,
+                                   trickleFsyncByteInterval, finishOnClose);
+        }
+
+        public Builder bufferSize(int bufferSize)
+        {
+            this.bufferSize = bufferSize;
+            return this;
+        }
+
+        public Builder bufferType(BufferType bufferType)
+        {
+            this.bufferType = Objects.requireNonNull(bufferType);
+            return this;
+        }
+
+        public Builder trickleFsync(boolean trickleFsync)
+        {
+            this.trickleFsync = trickleFsync;
+            return this;
+        }
+
+        public Builder trickleFsyncByteInterval(int trickleFsyncByteInterval)
+        {
+            this.trickleFsyncByteInterval = trickleFsyncByteInterval;
+            return this;
+        }
+
+        public Builder finishOnClose(boolean finishOnClose)
+        {
+            this.finishOnClose = finishOnClose;
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java 
b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 244018e..79eb449 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -57,12 +57,7 @@ import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.serializers.LongSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -166,13 +161,14 @@ public class RowIndexEntryTest extends CQLTester
 
         DoubleSerializer() throws IOException
         {
+            SequentialWriterOption option = 
SequentialWriterOption.newBuilder().bufferSize(1024).build();
             File f = File.createTempFile("RowIndexEntryTest-", "db");
-            dataWriterNew = new SequentialWriter(f, 1024, BufferType.ON_HEAP);
+            dataWriterNew = new SequentialWriter(f, option);
             columnIndex = new org.apache.cassandra.db.ColumnIndex(header, 
dataWriterNew, version, Collections.emptyList(),
                                                                   
rieSerializer.indexInfoSerializer());
 
             f = File.createTempFile("RowIndexEntryTest-", "db");
-            dataWriterOld = new SequentialWriter(f, 1024, BufferType.ON_HEAP);
+            dataWriterOld = new SequentialWriter(f, option);
         }
 
         public void close() throws Exception
@@ -424,7 +420,7 @@ public class RowIndexEntryTest extends CQLTester
 
         File tempFile = File.createTempFile("row_index_entry_test", null);
         tempFile.deleteOnExit();
-        SequentialWriter writer = SequentialWriter.open(tempFile);
+        SequentialWriter writer = new SequentialWriter(tempFile);
         ColumnIndex columnIndex = 
RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(),
 writer, header, Collections.emptySet(), BigFormat.latestVersion);
         Pre_C_11206_RowIndexEntry withIndex = 
Pre_C_11206_RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
         IndexInfo.Serializer indexSerializer = 
cfs.metadata.serializers().indexInfoSerializer(BigFormat.latestVersion, header);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java 
b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
index 6b3b5c8..5a48b21 100644
--- a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
+++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
@@ -77,7 +77,7 @@ public class ChecksummedDataInputTest
         // save the buffer to file to create a RAR
         File file = File.createTempFile("testReadMethods", "1");
         file.deleteOnExit();
-        try (SequentialWriter writer = SequentialWriter.open(file))
+        try (SequentialWriter writer = new SequentialWriter(file))
         {
             writer.write(buffer);
             writer.writeInt((int) crc.getValue());
@@ -152,7 +152,7 @@ public class ChecksummedDataInputTest
         // save the buffer to file to create a RAR
         File file = File.createTempFile("testResetCrc", "1");
         file.deleteOnExit();
-        try (SequentialWriter writer = SequentialWriter.open(file))
+        try (SequentialWriter writer = new SequentialWriter(file))
         {
             writer.write(buffer);
             writer.finish();
@@ -208,7 +208,7 @@ public class ChecksummedDataInputTest
         // save the buffer to file to create a RAR
         File file = File.createTempFile("testFailedCrc", "1");
         file.deleteOnExit();
-        try (SequentialWriter writer = SequentialWriter.open(file))
+        try (SequentialWriter writer = new SequentialWriter(file))
         {
             writer.write(buffer);
             writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
index 67e54f4..b26bb44 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
@@ -34,8 +34,8 @@ import org.apache.cassandra.index.sasi.utils.MappedBuffer;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
-import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SequentialWriterOption;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SequentialWriter;
@@ -84,6 +84,8 @@ public class TokenTreeTest
 
     final static SortedMap<Long, LongSet> tokens = bigTokensMap;
 
+    final static SequentialWriterOption DEFAULT_OPT = 
SequentialWriterOption.newBuilder().bufferSize(4096).build();
+
     @Test
     public void testSerializedSizeDynamic() throws Exception
     {
@@ -103,7 +105,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-size-test", 
"tt");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();
@@ -134,7 +136,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-iterate-test1", 
"tt");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();
@@ -210,7 +212,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-iterate-test2", 
"tt");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();
@@ -269,7 +271,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-skip-past-test", 
"tt");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();
@@ -413,7 +415,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-", "db");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();
@@ -631,7 +633,7 @@ public class TokenTreeTest
         final File treeFile = File.createTempFile("token-tree-get-test", "tt");
         treeFile.deleteOnExit();
 
-        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 
DEFAULT_OPT))
         {
             builder.write(writer);
             writer.sync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index a9a0cb0..309083b 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -29,11 +29,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.MmappedRegions;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.SyncUtil;
@@ -78,7 +74,10 @@ public class CompressedRandomAccessReaderTest
         {
 
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(new ClusteringComparator(BytesType.instance));
-            try(CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata", 
CompressionParams.snappy(32), sstableMetadataCollector))
+            try(CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata",
+                                                                               
    null, SequentialWriterOption.DEFAULT,
+                                                                               
    CompressionParams.snappy(32),
+                                                                               
    sstableMetadataCollector))
             {
 
                 for (int i = 0; i < 20; i++)
@@ -122,8 +121,10 @@ public class CompressedRandomAccessReaderTest
         {
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(new ClusteringComparator(BytesType.instance));
             try(SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", 
CompressionParams.snappy(), sstableMetadataCollector)
-                : SequentialWriter.open(f))
+                ? new CompressedSequentialWriter(f, filename + ".metadata",
+                                                 null, 
SequentialWriterOption.DEFAULT,
+                                                 CompressionParams.snappy(), 
sstableMetadataCollector)
+                : new SequentialWriter(f))
             {
                 writer.write("The quick ".getBytes());
                 DataPosition mark = writer.mark();
@@ -192,7 +193,9 @@ public class CompressedRandomAccessReaderTest
         assertTrue(metadata.createNewFile());
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
-        try (SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(), CompressionParams.snappy(), sstableMetadataCollector))
+        try (SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(),
+                                                                      null, 
SequentialWriterOption.DEFAULT,
+                                                                      
CompressionParams.snappy(), sstableMetadataCollector))
         {
             writer.write(CONTENT.getBytes());
             writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 7af84f0..9959c7b 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -27,6 +27,7 @@ import java.util.*;
 import static org.apache.commons.io.FileUtils.readFileToByteArray;
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.io.Files;
 import org.junit.After;
 import org.junit.Test;
 
@@ -37,10 +38,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataPosition;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriterTest;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.utils.ChecksumType;
 
@@ -92,7 +90,10 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
 
             byte[] dataPre = new byte[bytesToTest];
             byte[] rawPost = new byte[bytesToTest];
-            try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, 
sstableMetadataCollector);)
+            try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata",
+                                                                               
     null, SequentialWriterOption.DEFAULT,
+                                                                               
     compressionParameters,
+                                                                               
     sstableMetadataCollector))
             {
                 Random r = new Random(42);
 
@@ -159,6 +160,49 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
         writers.clear();
     }
 
+    @Test
+    @Override
+    public void resetAndTruncateTest()
+    {
+        File tempFile = new File(Files.createTempDir(), "reset.txt");
+        File offsetsFile = 
FileUtils.createTempFile("compressedsequentialwriter.offset", "test");
+        final int bufferSize = 48;
+        final int writeSize = 64;
+        byte[] toWrite = new byte[writeSize];
+        try (SequentialWriter writer = new 
CompressedSequentialWriter(tempFile, offsetsFile.getPath(),
+                                                                      null, 
SequentialWriterOption.DEFAULT,
+                                                                      
CompressionParams.lz4(bufferSize),
+                                                                      new 
MetadataCollector(new ClusteringComparator(UTF8Type.instance))))
+        {
+            // write bytes greather than buffer
+            writer.write(toWrite);
+            long flushedOffset = writer.getLastFlushOffset();
+            assertEquals(writeSize, writer.position());
+            // mark thi position
+            DataPosition pos = writer.mark();
+            // write another
+            writer.write(toWrite);
+            // another buffer should be flushed
+            assertEquals(flushedOffset * 2, writer.getLastFlushOffset());
+            assertEquals(writeSize * 2, writer.position());
+            // reset writer
+            writer.resetAndTruncate(pos);
+            // current position and flushed size should be changed
+            assertEquals(writeSize, writer.position());
+            assertEquals(flushedOffset, writer.getLastFlushOffset());
+            // write another byte less than buffer
+            writer.write(new byte[]{0});
+            assertEquals(writeSize + 1, writer.position());
+            // flush off set should not be increase
+            assertEquals(flushedOffset, writer.getLastFlushOffset());
+            writer.finish();
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+    }
+
     protected TestableTransaction newTest() throws IOException
     {
         TestableCSW sw = new TestableCSW();
@@ -178,8 +222,8 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
 
         private TestableCSW(File file, File offsetsFile) throws IOException
         {
-            this(file, offsetsFile, new CompressedSequentialWriter(file,
-                                                                   
offsetsFile.getPath(),
+            this(file, offsetsFile, new CompressedSequentialWriter(file, 
offsetsFile.getPath(),
+                                                                   null, 
SequentialWriterOption.DEFAULT,
                                                                    
CompressionParams.lz4(BUFFER_SIZE),
                                                                    new 
MetadataCollector(new ClusteringComparator(UTF8Type.instance))));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java 
b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index 184d637..f769293 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -119,8 +120,8 @@ public class DescriptorTest
     {
         // Descriptor should be equal when parent directory points to the same 
directory
         File dir = new File(".");
-        Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1);
-        Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 
1);
+        Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1, 
SSTableFormat.Type.BIG);
+        Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 
1, SSTableFormat.Type.BIG);
         assertEquals(desc1, desc2);
         assertEquals(desc1.hashCode(), desc2.hashCode());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java 
b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 4ecbdcc..8cdd4ea 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -131,7 +131,7 @@ public class BufferedRandomAccessFileTest
     public void testReadAndWriteOnCapacity() throws IOException
     {
         File tmpFile = File.createTempFile("readtest", "bin");
-        SequentialWriter w = SequentialWriter.open(tmpFile);
+        SequentialWriter w = new SequentialWriter(tmpFile);
 
         // Fully write the file and sync..
         byte[] in = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE);
@@ -157,7 +157,7 @@ public class BufferedRandomAccessFileTest
     public void testLength() throws IOException
     {
         File tmpFile = File.createTempFile("lengthtest", "bin");
-        SequentialWriter w = SequentialWriter.open(tmpFile);
+        SequentialWriter w = new SequentialWriter(tmpFile);
         assertEquals(0, w.length());
 
         // write a chunk smaller then our buffer, so will not be flushed
@@ -562,7 +562,7 @@ public class BufferedRandomAccessFileTest
     public void testSetNegativeLength() throws IOException, 
IllegalArgumentException
     {
         File tmpFile = File.createTempFile("set_negative_length", "bin");
-        try (SequentialWriter file = SequentialWriter.open(tmpFile))
+        try (SequentialWriter file = new SequentialWriter(tmpFile))
         {
             file.truncate(-8L);
         }
@@ -573,7 +573,7 @@ public class BufferedRandomAccessFileTest
         File tempFile = File.createTempFile(name, null);
         tempFile.deleteOnExit();
 
-        return SequentialWriter.open(tempFile);
+        return new SequentialWriter(tempFile);
     }
 
     private File writeTemporaryFile(byte[] data) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java 
b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
index 57428af..0657f7f 100644
--- 
a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
+++ 
b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
@@ -27,10 +27,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
-import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
-import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
 
 public class ChecksummedRandomAccessReaderTest
 {
@@ -43,9 +39,11 @@ public class ChecksummedRandomAccessReaderTest
         final byte[] expected = new byte[70 * 1024];   // bit more than crc 
chunk size, so we can test rebuffering.
         ThreadLocalRandom.current().nextBytes(expected);
 
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(expected);
-        writer.finish();
+        try (SequentialWriter writer = new ChecksummedSequentialWriter(data, 
crc, null, SequentialWriterOption.DEFAULT))
+        {
+            writer.write(expected);
+            writer.finish();
+        }
 
         assert data.exists();
 
@@ -69,9 +67,11 @@ public class ChecksummedRandomAccessReaderTest
         final byte[] dataBytes = new byte[70 * 1024];   // bit more than crc 
chunk size
         ThreadLocalRandom.current().nextBytes(dataBytes);
 
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(dataBytes);
-        writer.finish();
+        try (SequentialWriter writer = new ChecksummedSequentialWriter(data, 
crc, null, SequentialWriterOption.DEFAULT))
+        {
+            writer.write(dataBytes);
+            writer.finish();
+        }
 
         assert data.exists();
 
@@ -101,9 +101,11 @@ public class ChecksummedRandomAccessReaderTest
         final byte[] expected = new byte[5 * 1024];
         Arrays.fill(expected, (byte) 0);
 
-        SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
-        writer.write(expected);
-        writer.finish();
+        try (SequentialWriter writer = new ChecksummedSequentialWriter(data, 
crc, null, SequentialWriterOption.DEFAULT))
+        {
+            writer.write(expected);
+            writer.finish();
+        }
 
         assert data.exists();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java 
b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
index bea3aac..65ffdba 100644
--- 
a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -59,7 +59,9 @@ public class ChecksummedSequentialWriterTest extends 
SequentialWriterTest
 
         private TestableCSW(File file, File crcFile) throws IOException
         {
-            this(file, crcFile, new ChecksummedSequentialWriter(file, 
BUFFER_SIZE, crcFile));
+            this(file, crcFile, new ChecksummedSequentialWriter(file, crcFile, 
null, SequentialWriterOption.newBuilder()
+                                                                               
                            .bufferSize(BUFFER_SIZE)
+                                                                               
                            .build()));
         }
 
         private TestableCSW(File file, File crcFile, SequentialWriter sw) 
throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java 
b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 1fb5597..e082b19 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class DataOutputTest
@@ -380,8 +379,9 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32, 
BufferType.ON_HEAP);
-        DataOutputStreamPlus write = new 
WrappedDataOutputStreamPlus(writer.finishOnClose());
+        SequentialWriterOption option = 
SequentialWriterOption.newBuilder().bufferSize(32).finishOnClose(true).build();
+        final SequentialWriter writer = new SequentialWriter(file, option);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
         DataInput canon = testWrite(write);
         write.flush();
         write.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java 
b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
index 2394ad5..f34c00f 100644
--- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
@@ -63,7 +63,7 @@ public class MmappedRegionsTest
         File ret = File.createTempFile(fileName, "1");
         ret.deleteOnExit();
 
-        try (SequentialWriter writer = SequentialWriter.open(ret))
+        try (SequentialWriter writer = new SequentialWriter(ret))
         {
             writer.write(buffer);
             writer.finish();
@@ -298,10 +298,9 @@ public class MmappedRegionsTest
         cf.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
-        try(SequentialWriter writer = new CompressedSequentialWriter(f,
-                                                                     
cf.getAbsolutePath(),
-                                                                     
CompressionParams.snappy(),
-                                                                     
sstableMetadataCollector))
+        try(SequentialWriter writer = new CompressedSequentialWriter(f, 
cf.getAbsolutePath(),
+                                                                     null, 
SequentialWriterOption.DEFAULT,
+                                                                     
CompressionParams.snappy(), sstableMetadataCollector))
         {
             writer.write(buffer);
             writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java 
b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
index c5073c0..32ce554 100644
--- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@ -263,7 +263,7 @@ public class RandomAccessReaderTest
         final File f = File.createTempFile("testReadFully", "1");
         f.deleteOnExit();
 
-        try(SequentialWriter writer = SequentialWriter.open(f))
+        try(SequentialWriter writer = new SequentialWriter(f))
         {
             long numWritten = 0;
             while (numWritten < params.fileLength)
@@ -326,7 +326,7 @@ public class RandomAccessReaderTest
         File f = File.createTempFile("testReadBytes", "1");
         final String expected = "The quick brown fox jumps over the lazy dog";
 
-        try(SequentialWriter writer = SequentialWriter.open(f))
+        try(SequentialWriter writer = new SequentialWriter(f))
         {
             writer.write(expected.getBytes());
             writer.finish();
@@ -355,7 +355,7 @@ public class RandomAccessReaderTest
         final String expected = "The quick brown fox jumps over the lazy dog";
         final int numIterations = 10;
 
-        try(SequentialWriter writer = SequentialWriter.open(f))
+        try(SequentialWriter writer = new SequentialWriter(f))
         {
             for (int i = 0; i < numIterations; i++)
                 writer.write(expected.getBytes());
@@ -435,7 +435,7 @@ public class RandomAccessReaderTest
         Random r = new Random(seed);
         r.nextBytes(expected);
 
-        try(SequentialWriter writer = SequentialWriter.open(f))
+        try(SequentialWriter writer = new SequentialWriter(f))
         {
             writer.write(expected);
             writer.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java 
b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index f5a366e..2797384 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 
 import static org.apache.commons.io.FileUtils.*;
+import static org.junit.Assert.assertEquals;
 
 public class SequentialWriterTest extends AbstractTransactionalTest
 {
@@ -71,7 +72,10 @@ public class SequentialWriterTest extends 
AbstractTransactionalTest
 
         protected TestableSW(File file) throws IOException
         {
-            this(file, new SequentialWriter(file, 8 << 10, 
BufferType.OFF_HEAP));
+            this(file, new SequentialWriter(file, 
SequentialWriterOption.newBuilder()
+                                                                        
.bufferSize(8 << 10)
+                                                                        
.bufferType(BufferType.OFF_HEAP)
+                                                                        
.build()));
         }
 
         protected TestableSW(File file, SequentialWriter sw) throws IOException
@@ -118,6 +122,47 @@ public class SequentialWriterTest extends 
AbstractTransactionalTest
         }
     }
 
+    @Test
+    public void resetAndTruncateTest()
+    {
+        File tempFile = new File(Files.createTempDir(), "reset.txt");
+        final int bufferSize = 48;
+        final int writeSize = 64;
+        byte[] toWrite = new byte[writeSize];
+        SequentialWriterOption option = 
SequentialWriterOption.newBuilder().bufferSize(bufferSize).build();
+        try (SequentialWriter writer = new SequentialWriter(tempFile, option))
+        {
+            // write bytes greather than buffer
+            writer.write(toWrite);
+            assertEquals(bufferSize, writer.getLastFlushOffset());
+            assertEquals(writeSize, writer.position());
+            // mark thi position
+            DataPosition pos = writer.mark();
+            // write another
+            writer.write(toWrite);
+            // another buffer should be flushed
+            assertEquals(bufferSize * 2, writer.getLastFlushOffset());
+            assertEquals(writeSize * 2, writer.position());
+            // reset writer
+            writer.resetAndTruncate(pos);
+            // current position and flushed size should be changed
+            assertEquals(writeSize, writer.position());
+            assertEquals(writeSize, writer.getLastFlushOffset());
+            // write another byte less than buffer
+            writer.write(new byte[]{0});
+            assertEquals(writeSize + 1, writer.position());
+            // flush off set should not be increase
+            assertEquals(writeSize, writer.getLastFlushOffset());
+            writer.finish();
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+        // final file size check
+        assertEquals(writeSize + 1, tempFile.length());
+    }
+
     /**
      * Tests that the output stream exposed by SequentialWriter behaves as 
expected
      */
@@ -127,7 +172,8 @@ public class SequentialWriterTest extends 
AbstractTransactionalTest
         File tempFile = new File(Files.createTempDir(), "test.txt");
         Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists());
 
-        try (DataOutputStream os = new 
DataOutputStream(SequentialWriter.open(tempFile).finishOnClose()))
+        SequentialWriterOption option = 
SequentialWriterOption.newBuilder().finishOnClose(true).build();
+        try (DataOutputStream os = new DataOutputStream(new 
SequentialWriter(tempFile, option)))
         {
             os.writeUTF("123");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index a3300ac..562416e 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -19,14 +19,13 @@ package org.apache.cassandra.streaming.compression;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.SequentialWriterOption;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -72,7 +71,11 @@ public class CompressedInputStreamTest
         MetadataCollector collector = new MetadataCollector(new 
ClusteringComparator(BytesType.instance));
         CompressionParams param = CompressionParams.snappy(32);
         Map<Long, Long> index = new HashMap<Long, Long>();
-        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector))
+        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp,
+                                                                               
 desc.filenameFor(Component.COMPRESSION_INFO),
+                                                                               
 null,
+                                                                               
 SequentialWriterOption.DEFAULT,
+                                                                               
 param, collector))
         {
             for (long l = 0L; l < 1000; l++)
             {

Reply via email to