Repository: cassandra Updated Branches: refs/heads/trunk 5bc2f0130 -> fb221095c
Remove DatabaseDescriptor dependency from Sequentialwriter patch by yukim; reviewed by snazy for CASSANDRA-11579 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb221095 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb221095 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb221095 Branch: refs/heads/trunk Commit: fb221095cb2a18cf8f027a8a084700d606bb9ca3 Parents: 5bc2f01 Author: Yuki Morishita <[email protected]> Authored: Mon Jun 27 10:51:10 2016 +0900 Committer: Yuki Morishita <[email protected]> Committed: Mon Jun 27 10:55:47 2016 +0900 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 7 +- .../index/sasi/disk/OnDiskIndexBuilder.java | 7 +- .../io/compress/CompressedSequentialWriter.java | 39 +++-- .../io/sstable/format/big/BigTableWriter.java | 62 ++++---- .../cassandra/io/util/ChecksumWriter.java | 103 +++++++++++++ .../io/util/ChecksummedSequentialWriter.java | 24 +-- .../io/util/DataIntegrityMetadata.java | 86 ----------- .../cassandra/io/util/SequentialWriter.java | 82 ++++------ .../io/util/SequentialWriterOption.java | 154 +++++++++++++++++++ .../apache/cassandra/db/RowIndexEntryTest.java | 14 +- .../hints/ChecksummedDataInputTest.java | 6 +- .../index/sasi/disk/TokenTreeTest.java | 16 +- .../CompressedRandomAccessReaderTest.java | 21 +-- .../CompressedSequentialWriterTest.java | 58 ++++++- .../cassandra/io/sstable/DescriptorTest.java | 5 +- .../io/util/BufferedRandomAccessFileTest.java | 8 +- .../util/ChecksummedRandomAccessReaderTest.java | 28 ++-- .../util/ChecksummedSequentialWriterTest.java | 4 +- .../cassandra/io/util/DataOutputTest.java | 6 +- .../cassandra/io/util/MmappedRegionsTest.java | 9 +- .../io/util/RandomAccessReaderTest.java | 8 +- .../cassandra/io/util/SequentialWriterTest.java | 50 +++++- .../compression/CompressedInputStreamTest.java | 9 +- 24 files changed, 539 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d40cab4..5b72016 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.8 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579) * Move skip_stop_words filter before stemming (CASSANDRA-12078) * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957) * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 1b48d4f..cb2ad8a 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -84,6 +84,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K private static volatile IStreamFactory streamFactory = new IStreamFactory() { + private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(DatabaseDescriptor.getTrickleFsync()) + .trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024) + .finishOnClose(true).build(); + public InputStream getInputStream(File dataPath, File crcPath) throws IOException { return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build(); @@ -91,7 +96,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K public OutputStream getOutputStream(File dataPath, File crcPath) { - return SequentialWriter.open(dataPath, crcPath).finishOnClose(); + return new ChecksummedSequentialWriter(dataPath, crcPath, null, writerOption); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java index 8acbb05..4946f06 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java @@ -31,7 +31,6 @@ import org.apache.cassandra.index.sasi.sa.TermIterator; import org.apache.cassandra.index.sasi.sa.SuffixSA; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -131,6 +130,10 @@ public class OnDiskIndexBuilder public static final int SUPER_BLOCK_SIZE = 64; public static final int IS_PARTIAL_BIT = 15; + private static final SequentialWriterOption WRITER_OPTION = SequentialWriterOption.newBuilder() + .bufferSize(BLOCK_SIZE) + .build(); + private final List<MutableLevel<InMemoryPointerTerm>> levels = new ArrayList<>(); private MutableLevel<InMemoryDataTerm> dataLevel; @@ -263,7 +266,7 @@ public class OnDiskIndexBuilder try { - out = new SequentialWriter(file, BLOCK_SIZE, BufferType.ON_HEAP); + out = new SequentialWriter(file, WRITER_OPTION); out.writeUTF(descriptor.version.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 1f33d53..9bdb1b4 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -17,29 +17,27 @@ */ package org.apache.cassandra.io.compress; -import static org.apache.cassandra.utils.Throwables.merge; - import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.util.Optional; import java.util.zip.CRC32; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.DataIntegrityMetadata; -import org.apache.cassandra.io.util.DataPosition; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; +import static org.apache.cassandra.utils.Throwables.merge; + public class CompressedSequentialWriter extends SequentialWriter { - private final DataIntegrityMetadata.ChecksumWriter crcMetadata; + private final ChecksumWriter crcMetadata; // holds offset in the file where current chunk should be written // changed only by flush() method where data buffer gets compressed and stored to the file @@ -60,14 +58,34 @@ public class CompressedSequentialWriter extends SequentialWriter private final MetadataCollector sstableMetadataCollector; private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4); + private final Optional<File> digestFile; + /** + * Create CompressedSequentialWriter without digest file. + * + * @param file File to write + * @param offsetsPath File name to write compression metadata + * @param digestFile File to write digest + * @param option Write option (buffer size and type will be set the same as compression params) + * @param parameters Compression mparameters + * @param sstableMetadataCollector Metadata collector + */ public CompressedSequentialWriter(File file, String offsetsPath, + File digestFile, + SequentialWriterOption option, CompressionParams parameters, MetadataCollector sstableMetadataCollector) { - super(file, parameters.chunkLength(), parameters.getSstableCompressor().preferredBufferType()); + super(file, SequentialWriterOption.newBuilder() + .bufferSize(option.bufferSize()) + .bufferType(option.bufferType()) + .bufferSize(parameters.chunkLength()) + .bufferType(parameters.getSstableCompressor().preferredBufferType()) + .finishOnClose(option.finishOnClose()) + .build()); this.compressor = parameters.getSstableCompressor(); + this.digestFile = Optional.ofNullable(digestFile); // buffer for compression should be the same size as buffer itself compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity())); @@ -76,7 +94,7 @@ public class CompressedSequentialWriter extends SequentialWriter metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath); this.sstableMetadataCollector = sstableMetadataCollector; - crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel))); + crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel))); } @Override @@ -287,8 +305,7 @@ public class CompressedSequentialWriter extends SequentialWriter protected void doPrepare() { syncInternal(); - if (descriptor != null) - crcMetadata.writeFullChecksum(descriptor); + digestFile.ifPresent(crcMetadata::writeFullChecksum); sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize); metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 8645158..c1d9bbc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -60,10 +60,15 @@ public class BigTableWriter extends SSTableWriter private DataPosition dataMark; private long lastEarlyOpenLength = 0; - public BigTableWriter(Descriptor descriptor, - Long keyCount, - Long repairedAt, - CFMetaData metadata, + private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(DatabaseDescriptor.getTrickleFsync()) + .trickleFsyncByteInterval(DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024) + .build(); + + public BigTableWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + CFMetaData metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, @@ -74,18 +79,23 @@ public class BigTableWriter extends SSTableWriter if (compression) { - dataFile = SequentialWriter.open(getFilename(), + dataFile = new CompressedSequentialWriter(new File(getFilename()), descriptor.filenameFor(Component.COMPRESSION_INFO), + new File(descriptor.filenameFor(descriptor.digestComponent)), + writerOption, metadata.params.compression, metadataCollector); dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); } else { - dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC))); + dataFile = new ChecksummedSequentialWriter(new File(getFilename()), + new File(descriptor.filenameFor(Component.CRC)), + new File(descriptor.filenameFor(descriptor.digestComponent)), + writerOption); dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false); } - iwriter = new IndexWriter(keyCount, dataFile); + iwriter = new IndexWriter(keyCount); columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer()); } @@ -338,7 +348,7 @@ public class BigTableWriter extends SSTableWriter iwriter.prepareToCommit(); // write sstable statistics - dataFile.setDescriptor(descriptor).prepareToCommit(); + dataFile.prepareToCommit(); writeMetadata(descriptor, finalizeMetadata()); // save the table of components @@ -370,13 +380,13 @@ public class BigTableWriter extends SSTableWriter } } - private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) + private void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components) { File file = new File(desc.filenameFor(Component.STATS)); - try (SequentialWriter out = SequentialWriter.open(file)) + try (SequentialWriter out = new SequentialWriter(file, writerOption)) { desc.getMetadataSerializer().serialize(components, out, desc.version); - out.setDescriptor(desc).finish(); + out.finish(); } catch (IOException e) { @@ -410,27 +420,15 @@ public class BigTableWriter extends SSTableWriter public final IFilter bf; private DataPosition mark; - IndexWriter(long keyCount, final SequentialWriter dataFile) + IndexWriter(long keyCount) { - indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + indexFile = new SequentialWriter(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), writerOption); builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL); bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder()); // register listeners to be alerted when the data files are flushed - indexFile.setPostFlushListener(new Runnable() - { - public void run() - { - summary.markIndexSynced(indexFile.getLastFlushOffset()); - } - }); - dataFile.setPostFlushListener(new Runnable() - { - public void run() - { - summary.markDataSynced(dataFile.getLastFlushOffset()); - } - }); + indexFile.setPostFlushListener(() -> summary.markIndexSynced(indexFile.getLastFlushOffset())); + dataFile.setPostFlushListener(() -> summary.markDataSynced(dataFile.getLastFlushOffset())); } // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file @@ -501,15 +499,15 @@ public class BigTableWriter extends SSTableWriter flushBf(); // truncate index file - long position = iwriter.indexFile.position(); - iwriter.indexFile.setDescriptor(descriptor).prepareToCommit(); - FileUtils.truncate(iwriter.indexFile.getPath(), position); + long position = indexFile.position(); + indexFile.prepareToCommit(); + FileUtils.truncate(indexFile.getPath(), position); // save summary summary.prepareToCommit(); - try (IndexSummary summary = iwriter.summary.build(getPartitioner())) + try (IndexSummary indexSummary = summary.build(getPartitioner())) { - SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary); + SSTableReader.saveSummary(descriptor, first, last, builder, dbuilder, indexSummary); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/ChecksumWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksumWriter.java b/src/java/org/apache/cassandra/io/util/ChecksumWriter.java new file mode 100644 index 0000000..dc5eaea --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ChecksumWriter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.zip.CRC32; + +import javax.annotation.Nonnull; + +import com.google.common.base.Charsets; + +import org.apache.cassandra.io.FSWriteError; + +public class ChecksumWriter +{ + private final CRC32 incrementalChecksum = new CRC32(); + private final DataOutput incrementalOut; + private final CRC32 fullChecksum = new CRC32(); + + public ChecksumWriter(DataOutput incrementalOut) + { + this.incrementalOut = incrementalOut; + } + + public void writeChunkSize(int length) + { + try + { + incrementalOut.writeInt(length); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + // checksumIncrementalResult indicates if the checksum we compute for this buffer should itself be + // included in the full checksum, translating to if the partial checksum is serialized along with the + // data it checksums (in which case the file checksum as calculated by external tools would mismatch if + // we did not include it), or independently. + + // CompressedSequentialWriters serialize the partial checksums inline with the compressed data chunks they + // corroborate, whereas ChecksummedSequentialWriters serialize them to a different file. + public void appendDirect(ByteBuffer bb, boolean checksumIncrementalResult) + { + try + { + ByteBuffer toAppend = bb.duplicate(); + toAppend.mark(); + incrementalChecksum.update(toAppend); + toAppend.reset(); + + int incrementalChecksumValue = (int) incrementalChecksum.getValue(); + incrementalOut.writeInt(incrementalChecksumValue); + + fullChecksum.update(toAppend); + if (checksumIncrementalResult) + { + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + byteBuffer.putInt(incrementalChecksumValue); + assert byteBuffer.arrayOffset() == 0; + fullChecksum.update(byteBuffer.array(), 0, byteBuffer.array().length); + } + incrementalChecksum.reset(); + + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public void writeFullChecksum(@Nonnull File digestFile) + { + try (BufferedWriter out = Files.newBufferedWriter(digestFile.toPath(), Charsets.UTF_8)) + { + out.write(String.valueOf(fullChecksum.getValue())); + } + catch (IOException e) + { + throw new FSWriteError(e, digestFile); + } + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java index fd88151..f89e7cc 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java @@ -19,20 +19,25 @@ package org.apache.cassandra.io.util; import java.io.File; import java.nio.ByteBuffer; - -import org.apache.cassandra.io.compress.BufferType; +import java.util.Optional; public class ChecksummedSequentialWriter extends SequentialWriter { + private static final SequentialWriterOption CRC_WRITER_OPTION = SequentialWriterOption.newBuilder() + .bufferSize(8 * 1024) + .build(); + private final SequentialWriter crcWriter; - private final DataIntegrityMetadata.ChecksumWriter crcMetadata; + private final ChecksumWriter crcMetadata; + private final Optional<File> digestFile; - public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath) + public ChecksummedSequentialWriter(File file, File crcPath, File digestFile, SequentialWriterOption option) { - super(file, bufferSize, BufferType.ON_HEAP); - crcWriter = new SequentialWriter(crcPath, 8 * 1024, BufferType.ON_HEAP); - crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter); + super(file, option); + crcWriter = new SequentialWriter(crcPath, CRC_WRITER_OPTION); + crcMetadata = new ChecksumWriter(crcWriter); crcMetadata.writeChunkSize(buffer.capacity()); + this.digestFile = Optional.ofNullable(digestFile); } @Override @@ -63,9 +68,8 @@ public class ChecksummedSequentialWriter extends SequentialWriter protected void doPrepare() { syncInternal(); - if (descriptor != null) - crcMetadata.writeFullChecksum(descriptor); - crcWriter.setDescriptor(descriptor).prepareToCommit(); + digestFile.ifPresent(crcMetadata::writeFullChecksum); + crcWriter.prepareToCommit(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 0c48d13..0eecef3 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -17,21 +17,12 @@ */ package org.apache.cassandra.io.util; -import java.io.BufferedWriter; import java.io.Closeable; -import java.io.DataOutput; import java.io.File; -import java.io.IOError; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.Checksum; -import com.google.common.base.Charsets; - -import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.ChecksumType; @@ -142,81 +133,4 @@ public class DataIntegrityMetadata dataReader::close); } } - - - public static class ChecksumWriter - { - private final CRC32 incrementalChecksum = new CRC32(); - private final DataOutput incrementalOut; - private final CRC32 fullChecksum = new CRC32(); - - public ChecksumWriter(DataOutput incrementalOut) - { - this.incrementalOut = incrementalOut; - } - - public void writeChunkSize(int length) - { - try - { - incrementalOut.writeInt(length); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - // checksumIncrementalResult indicates if the checksum we compute for this buffer should itself be - // included in the full checksum, translating to if the partial checksum is serialized along with the - // data it checksums (in which case the file checksum as calculated by external tools would mismatch if - // we did not include it), or independently. - - // CompressedSequentialWriters serialize the partial checksums inline with the compressed data chunks they - // corroborate, whereas ChecksummedSequentialWriters serialize them to a different file. - public void appendDirect(ByteBuffer bb, boolean checksumIncrementalResult) - { - try - { - - ByteBuffer toAppend = bb.duplicate(); - toAppend.mark(); - incrementalChecksum.update(toAppend); - toAppend.reset(); - - int incrementalChecksumValue = (int) incrementalChecksum.getValue(); - incrementalOut.writeInt(incrementalChecksumValue); - - fullChecksum.update(toAppend); - if (checksumIncrementalResult) - { - ByteBuffer byteBuffer = ByteBuffer.allocate(4); - byteBuffer.putInt(incrementalChecksumValue); - assert byteBuffer.arrayOffset() == 0; - fullChecksum.update(byteBuffer.array(), 0, byteBuffer.array().length); - } - incrementalChecksum.reset(); - - } - catch (IOException e) - { - throw new IOError(e); - } - } - - public void writeFullChecksum(Descriptor descriptor) - { - if (descriptor.digestComponent == null) - throw new NullPointerException("Null digest component for " + descriptor.ksname + '.' + descriptor.cfname + " file " + descriptor.baseFilename()); - File outFile = new File(descriptor.filenameFor(descriptor.digestComponent)); - try (BufferedWriter out =Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8)) - { - out.write(String.valueOf(fullChecksum.getValue())); - } - catch (IOException e) - { - throw new FSWriteError(e, outFile); - } - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 45e4cfa..e71f2fa 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -17,32 +17,24 @@ */ package org.apache.cassandra.io.util; -import java.io.*; +import java.io.File; +import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.BufferType; -import org.apache.cassandra.io.compress.CompressedSequentialWriter; -import org.apache.cassandra.schema.CompressionParams; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.concurrent.Transactional; import static org.apache.cassandra.utils.Throwables.merge; -import org.apache.cassandra.utils.SyncUtil; - /** * Adds buffering, mark, and fsyncing to OutputStream. We always fsync on close; we may also * fsync incrementally if Config.trickle_fsync is enabled. */ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Transactional { - private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; - // absolute path to the given file private final String filePath; @@ -53,8 +45,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read // latency spikes - private boolean trickleFsync; - private int trickleFsyncByteInterval; + private final SequentialWriterOption option; private int bytesSinceTrickleFsync = 0; protected long lastFlushOffset; @@ -62,8 +53,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr protected Runnable runPostFlush; private final TransactionalProxy txnProxy = txnProxy(); - private boolean finishOnClose; - protected Descriptor descriptor; // due to lack of multiple-inheritance, we proxy our transactional implementation protected class TransactionalProxy extends AbstractTransactional @@ -102,7 +91,8 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr } // TODO: we should specify as a parameter if we permit an existing file or not - private static FileChannel openChannel(File file) { + private static FileChannel openChannel(File file) + { try { if (file.exists()) @@ -130,43 +120,31 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr } } - public SequentialWriter(File file, int bufferSize, BufferType bufferType) - { - super(openChannel(file), bufferType.allocate(bufferSize)); - strictFlushing = true; - fchannel = (FileChannel)channel; - - filePath = file.getAbsolutePath(); - - this.trickleFsync = DatabaseDescriptor.getTrickleFsync(); - this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024; - } - /** - * Open a heap-based, non-compressed SequentialWriter + * Create heap-based, non-compressed SequenialWriter with default buffer size(64k). + * + * @param file File to write */ - public static SequentialWriter open(File file) + public SequentialWriter(File file) { - return new SequentialWriter(file, DEFAULT_BUFFER_SIZE, BufferType.ON_HEAP); + this(file, SequentialWriterOption.DEFAULT); } - public static ChecksummedSequentialWriter open(File file, File crcPath) + /** + * Create SequentialWriter for given file with specific writer option. + * + * @param file File to write + * @param option Writer option + */ + public SequentialWriter(File file, SequentialWriterOption option) { - return new ChecksummedSequentialWriter(file, DEFAULT_BUFFER_SIZE, crcPath); - } + super(openChannel(file), option.allocateBuffer()); + strictFlushing = true; + fchannel = (FileChannel)channel; - public static CompressedSequentialWriter open(String dataFilePath, - String offsetsPath, - CompressionParams parameters, - MetadataCollector sstableMetadataCollector) - { - return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector); - } + filePath = file.getAbsolutePath(); - public SequentialWriter finishOnClose() - { - finishOnClose = true; - return this; + this.option = option; } public void skipBytes(int numBytes) throws IOException @@ -212,10 +190,10 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr { flushData(); - if (trickleFsync) + if (option.trickleFsync()) { bytesSinceTrickleFsync += buffer.position(); - if (bytesSinceTrickleFsync >= trickleFsyncByteInterval) + if (bytesSinceTrickleFsync >= option.trickleFsyncByteInterval()) { syncDataOnlyInternal(); bytesSinceTrickleFsync = 0; @@ -348,6 +326,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr throw new FSReadError(e, getPath()); } + bufferOffset = truncateTarget; resetBuffer(); } @@ -361,6 +340,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr try { fchannel.truncate(toSize); + lastFlushOffset = toSize; } catch (IOException e) { @@ -373,12 +353,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr return channel.isOpen(); } - public SequentialWriter setDescriptor(Descriptor descriptor) - { - this.descriptor = descriptor; - return this; - } - public final void prepareToCommit() { txnProxy.prepareToCommit(); @@ -397,7 +371,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr @Override public final void close() { - if (finishOnClose) + if (option.finishOnClose()) txnProxy.finish(); else txnProxy.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java b/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java new file mode 100644 index 0000000..61f375b --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/SequentialWriterOption.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; +import java.util.Objects; + +import org.apache.cassandra.io.compress.BufferType; + +/** + * SequentialWriter option + */ +public class SequentialWriterOption +{ + /** + * Default write option. + * + * <ul> + * <li>buffer size: 64 KB + * <li>buffer type: on heap + * <li>trickle fsync: false + * <li>trickle fsync byte interval: 10 MB + * <li>finish on close: false + * </ul> + */ + public static final SequentialWriterOption DEFAULT = SequentialWriterOption.newBuilder().build(); + + private final int bufferSize; + private final BufferType bufferType; + private final boolean trickleFsync; + private final int trickleFsyncByteInterval; + private final boolean finishOnClose; + + private SequentialWriterOption(int bufferSize, + BufferType bufferType, + boolean trickleFsync, + int trickleFsyncByteInterval, + boolean finishOnClose) + { + this.bufferSize = bufferSize; + this.bufferType = bufferType; + this.trickleFsync = trickleFsync; + this.trickleFsyncByteInterval = trickleFsyncByteInterval; + this.finishOnClose = finishOnClose; + } + + public static Builder newBuilder() + { + return new Builder(); + } + + public int bufferSize() + { + return bufferSize; + } + + public BufferType bufferType() + { + return bufferType; + } + + public boolean trickleFsync() + { + return trickleFsync; + } + + public int trickleFsyncByteInterval() + { + return trickleFsyncByteInterval; + } + + public boolean finishOnClose() + { + return finishOnClose; + } + + /** + * Allocate buffer using set buffer type and buffer size. + * + * @return allocated ByteBuffer + */ + public ByteBuffer allocateBuffer() + { + return bufferType.allocate(bufferSize); + } + + public static class Builder + { + /* default buffer size: 64k */ + private int bufferSize = 64 * 1024; + /* default buffer type: on heap */ + private BufferType bufferType = BufferType.ON_HEAP; + /* default: no trickle fsync */ + private boolean trickleFsync = false; + /* default tricle fsync byte interval: 10MB */ + private int trickleFsyncByteInterval = 10 * 1024 * 1024; + private boolean finishOnClose = false; + + /* construct throguh SequentialWriteOption.newBuilder */ + private Builder() {} + + public SequentialWriterOption build() + { + return new SequentialWriterOption(bufferSize, bufferType, trickleFsync, + trickleFsyncByteInterval, finishOnClose); + } + + public Builder bufferSize(int bufferSize) + { + this.bufferSize = bufferSize; + return this; + } + + public Builder bufferType(BufferType bufferType) + { + this.bufferType = Objects.requireNonNull(bufferType); + return this; + } + + public Builder trickleFsync(boolean trickleFsync) + { + this.trickleFsync = trickleFsync; + return this; + } + + public Builder trickleFsyncByteInterval(int trickleFsyncByteInterval) + { + this.trickleFsyncByteInterval = trickleFsyncByteInterval; + return this; + } + + public Builder finishOnClose(boolean finishOnClose) + { + this.finishOnClose = finishOnClose; + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index 244018e..79eb449 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -57,12 +57,7 @@ import org.apache.cassandra.io.sstable.IndexInfo; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.format.big.BigFormat; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.SegmentedFile; -import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.*; import org.apache.cassandra.serializers.LongSerializer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -166,13 +161,14 @@ public class RowIndexEntryTest extends CQLTester DoubleSerializer() throws IOException { + SequentialWriterOption option = SequentialWriterOption.newBuilder().bufferSize(1024).build(); File f = File.createTempFile("RowIndexEntryTest-", "db"); - dataWriterNew = new SequentialWriter(f, 1024, BufferType.ON_HEAP); + dataWriterNew = new SequentialWriter(f, option); columnIndex = new org.apache.cassandra.db.ColumnIndex(header, dataWriterNew, version, Collections.emptyList(), rieSerializer.indexInfoSerializer()); f = File.createTempFile("RowIndexEntryTest-", "db"); - dataWriterOld = new SequentialWriter(f, 1024, BufferType.ON_HEAP); + dataWriterOld = new SequentialWriter(f, option); } public void close() throws Exception @@ -424,7 +420,7 @@ public class RowIndexEntryTest extends CQLTester File tempFile = File.createTempFile("row_index_entry_test", null); tempFile.deleteOnExit(); - SequentialWriter writer = SequentialWriter.open(tempFile); + SequentialWriter writer = new SequentialWriter(tempFile); ColumnIndex columnIndex = RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion); Pre_C_11206_RowIndexEntry withIndex = Pre_C_11206_RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex); IndexInfo.Serializer indexSerializer = cfs.metadata.serializers().indexInfoSerializer(BigFormat.latestVersion, header); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java index 6b3b5c8..5a48b21 100644 --- a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java +++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java @@ -77,7 +77,7 @@ public class ChecksummedDataInputTest // save the buffer to file to create a RAR File file = File.createTempFile("testReadMethods", "1"); file.deleteOnExit(); - try (SequentialWriter writer = SequentialWriter.open(file)) + try (SequentialWriter writer = new SequentialWriter(file)) { writer.write(buffer); writer.writeInt((int) crc.getValue()); @@ -152,7 +152,7 @@ public class ChecksummedDataInputTest // save the buffer to file to create a RAR File file = File.createTempFile("testResetCrc", "1"); file.deleteOnExit(); - try (SequentialWriter writer = SequentialWriter.open(file)) + try (SequentialWriter writer = new SequentialWriter(file)) { writer.write(buffer); writer.finish(); @@ -208,7 +208,7 @@ public class ChecksummedDataInputTest // save the buffer to file to create a RAR File file = File.createTempFile("testFailedCrc", "1"); file.deleteOnExit(); - try (SequentialWriter writer = SequentialWriter.open(file)) + try (SequentialWriter writer = new SequentialWriter(file)) { writer.write(buffer); writer.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java index 67e54f4..b26bb44 100644 --- a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java @@ -34,8 +34,8 @@ import org.apache.cassandra.index.sasi.utils.MappedBuffer; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; -import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.utils.MurmurHash; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriter; @@ -84,6 +84,8 @@ public class TokenTreeTest final static SortedMap<Long, LongSet> tokens = bigTokensMap; + final static SequentialWriterOption DEFAULT_OPT = SequentialWriterOption.newBuilder().bufferSize(4096).build(); + @Test public void testSerializedSizeDynamic() throws Exception { @@ -103,7 +105,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-size-test", "tt"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); @@ -134,7 +136,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-iterate-test1", "tt"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); @@ -210,7 +212,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-iterate-test2", "tt"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); @@ -269,7 +271,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-skip-past-test", "tt"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); @@ -413,7 +415,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-", "db"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); @@ -631,7 +633,7 @@ public class TokenTreeTest final File treeFile = File.createTempFile("token-tree-get-test", "tt"); treeFile.deleteOnExit(); - try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, BufferType.ON_HEAP)) + try (SequentialWriter writer = new SequentialWriter(treeFile, DEFAULT_OPT)) { builder.write(writer); writer.sync(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index a9a0cb0..309083b 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -29,11 +29,7 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataPosition; -import org.apache.cassandra.io.util.MmappedRegions; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.SyncUtil; @@ -78,7 +74,10 @@ public class CompressedRandomAccessReaderTest { MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); - try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector)) + try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", + null, SequentialWriterOption.DEFAULT, + CompressionParams.snappy(32), + sstableMetadataCollector)) { for (int i = 0; i < 20; i++) @@ -122,8 +121,10 @@ public class CompressedRandomAccessReaderTest { MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); try(SequentialWriter writer = compressed - ? new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(), sstableMetadataCollector) - : SequentialWriter.open(f)) + ? new CompressedSequentialWriter(f, filename + ".metadata", + null, SequentialWriterOption.DEFAULT, + CompressionParams.snappy(), sstableMetadataCollector) + : new SequentialWriter(f)) { writer.write("The quick ".getBytes()); DataPosition mark = writer.mark(); @@ -192,7 +193,9 @@ public class CompressedRandomAccessReaderTest assertTrue(metadata.createNewFile()); MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); - try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), CompressionParams.snappy(), sstableMetadataCollector)) + try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), + null, SequentialWriterOption.DEFAULT, + CompressionParams.snappy(), sstableMetadataCollector)) { writer.write(CONTENT.getBytes()); writer.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 7af84f0..9959c7b 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -27,6 +27,7 @@ import java.util.*; import static org.apache.commons.io.FileUtils.readFileToByteArray; import static org.junit.Assert.assertEquals; +import com.google.common.io.Files; import org.junit.After; import org.junit.Test; @@ -37,10 +38,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataPosition; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SequentialWriterTest; +import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.ChecksumType; @@ -92,7 +90,10 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest byte[] dataPre = new byte[bytesToTest]; byte[] rawPost = new byte[bytesToTest]; - try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", compressionParameters, sstableMetadataCollector);) + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", + null, SequentialWriterOption.DEFAULT, + compressionParameters, + sstableMetadataCollector)) { Random r = new Random(42); @@ -159,6 +160,49 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest writers.clear(); } + @Test + @Override + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + File offsetsFile = FileUtils.createTempFile("compressedsequentialwriter.offset", "test"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; + try (SequentialWriter writer = new CompressedSequentialWriter(tempFile, offsetsFile.getPath(), + null, SequentialWriterOption.DEFAULT, + CompressionParams.lz4(bufferSize), + new MetadataCollector(new ClusteringComparator(UTF8Type.instance)))) + { + // write bytes greather than buffer + writer.write(toWrite); + long flushedOffset = writer.getLastFlushOffset(); + assertEquals(writeSize, writer.position()); + // mark thi position + DataPosition pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(flushedOffset * 2, writer.getLastFlushOffset()); + assertEquals(writeSize * 2, writer.position()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed + assertEquals(writeSize, writer.position()); + assertEquals(flushedOffset, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); + assertEquals(writeSize + 1, writer.position()); + // flush off set should not be increase + assertEquals(flushedOffset, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + } + protected TestableTransaction newTest() throws IOException { TestableCSW sw = new TestableCSW(); @@ -178,8 +222,8 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest private TestableCSW(File file, File offsetsFile) throws IOException { - this(file, offsetsFile, new CompressedSequentialWriter(file, - offsetsFile.getPath(), + this(file, offsetsFile, new CompressedSequentialWriter(file, offsetsFile.getPath(), + null, SequentialWriterOption.DEFAULT, CompressionParams.lz4(BUFFER_SIZE), new MetadataCollector(new ClusteringComparator(UTF8Type.instance)))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java index 184d637..f769293 100644 --- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import org.apache.cassandra.db.Directories; import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -119,8 +120,8 @@ public class DescriptorTest { // Descriptor should be equal when parent directory points to the same directory File dir = new File("."); - Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1); - Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 1); + Descriptor desc1 = new Descriptor(dir, "ks", "cf", 1, SSTableFormat.Type.BIG); + Descriptor desc2 = new Descriptor(dir.getAbsoluteFile(), "ks", "cf", 1, SSTableFormat.Type.BIG); assertEquals(desc1, desc2); assertEquals(desc1.hashCode(), desc2.hashCode()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index 4ecbdcc..8cdd4ea 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -131,7 +131,7 @@ public class BufferedRandomAccessFileTest public void testReadAndWriteOnCapacity() throws IOException { File tmpFile = File.createTempFile("readtest", "bin"); - SequentialWriter w = SequentialWriter.open(tmpFile); + SequentialWriter w = new SequentialWriter(tmpFile); // Fully write the file and sync.. byte[] in = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE); @@ -157,7 +157,7 @@ public class BufferedRandomAccessFileTest public void testLength() throws IOException { File tmpFile = File.createTempFile("lengthtest", "bin"); - SequentialWriter w = SequentialWriter.open(tmpFile); + SequentialWriter w = new SequentialWriter(tmpFile); assertEquals(0, w.length()); // write a chunk smaller then our buffer, so will not be flushed @@ -562,7 +562,7 @@ public class BufferedRandomAccessFileTest public void testSetNegativeLength() throws IOException, IllegalArgumentException { File tmpFile = File.createTempFile("set_negative_length", "bin"); - try (SequentialWriter file = SequentialWriter.open(tmpFile)) + try (SequentialWriter file = new SequentialWriter(tmpFile)) { file.truncate(-8L); } @@ -573,7 +573,7 @@ public class BufferedRandomAccessFileTest File tempFile = File.createTempFile(name, null); tempFile.deleteOnExit(); - return SequentialWriter.open(tempFile); + return new SequentialWriter(tempFile); } private File writeTemporaryFile(byte[] data) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java index 57428af..0657f7f 100644 --- a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java @@ -27,10 +27,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; import static org.junit.Assert.*; -import org.apache.cassandra.io.util.ChecksummedRandomAccessReader; -import org.apache.cassandra.io.util.ChecksummedSequentialWriter; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SequentialWriter; public class ChecksummedRandomAccessReaderTest { @@ -43,9 +39,11 @@ public class ChecksummedRandomAccessReaderTest final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering. ThreadLocalRandom.current().nextBytes(expected); - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(expected); - writer.finish(); + try (SequentialWriter writer = new ChecksummedSequentialWriter(data, crc, null, SequentialWriterOption.DEFAULT)) + { + writer.write(expected); + writer.finish(); + } assert data.exists(); @@ -69,9 +67,11 @@ public class ChecksummedRandomAccessReaderTest final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size ThreadLocalRandom.current().nextBytes(dataBytes); - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(dataBytes); - writer.finish(); + try (SequentialWriter writer = new ChecksummedSequentialWriter(data, crc, null, SequentialWriterOption.DEFAULT)) + { + writer.write(dataBytes); + writer.finish(); + } assert data.exists(); @@ -101,9 +101,11 @@ public class ChecksummedRandomAccessReaderTest final byte[] expected = new byte[5 * 1024]; Arrays.fill(expected, (byte) 0); - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(expected); - writer.finish(); + try (SequentialWriter writer = new ChecksummedSequentialWriter(data, crc, null, SequentialWriterOption.DEFAULT)) + { + writer.write(expected); + writer.finish(); + } assert data.exists(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java index bea3aac..65ffdba 100644 --- a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java @@ -59,7 +59,9 @@ public class ChecksummedSequentialWriterTest extends SequentialWriterTest private TestableCSW(File file, File crcFile) throws IOException { - this(file, crcFile, new ChecksummedSequentialWriter(file, BUFFER_SIZE, crcFile)); + this(file, crcFile, new ChecksummedSequentialWriter(file, crcFile, null, SequentialWriterOption.newBuilder() + .bufferSize(BUFFER_SIZE) + .build())); } private TestableCSW(File file, File crcFile, SequentialWriter sw) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/DataOutputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java index 1fb5597..e082b19 100644 --- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java +++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.junit.Assert; import org.junit.Test; -import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.ByteBufferUtil; public class DataOutputTest @@ -380,8 +379,9 @@ public class DataOutputTest public void testSequentialWriter() throws IOException { File file = FileUtils.createTempFile("dataoutput", "test"); - final SequentialWriter writer = new SequentialWriter(file, 32, BufferType.ON_HEAP); - DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer.finishOnClose()); + SequentialWriterOption option = SequentialWriterOption.newBuilder().bufferSize(32).finishOnClose(true).build(); + final SequentialWriter writer = new SequentialWriter(file, option); + DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer); DataInput canon = testWrite(write); write.flush(); write.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java index 2394ad5..f34c00f 100644 --- a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java +++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java @@ -63,7 +63,7 @@ public class MmappedRegionsTest File ret = File.createTempFile(fileName, "1"); ret.deleteOnExit(); - try (SequentialWriter writer = SequentialWriter.open(ret)) + try (SequentialWriter writer = new SequentialWriter(ret)) { writer.write(buffer); writer.finish(); @@ -298,10 +298,9 @@ public class MmappedRegionsTest cf.deleteOnExit(); MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); - try(SequentialWriter writer = new CompressedSequentialWriter(f, - cf.getAbsolutePath(), - CompressionParams.snappy(), - sstableMetadataCollector)) + try(SequentialWriter writer = new CompressedSequentialWriter(f, cf.getAbsolutePath(), + null, SequentialWriterOption.DEFAULT, + CompressionParams.snappy(), sstableMetadataCollector)) { writer.write(buffer); writer.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java index c5073c0..32ce554 100644 --- a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java @@ -263,7 +263,7 @@ public class RandomAccessReaderTest final File f = File.createTempFile("testReadFully", "1"); f.deleteOnExit(); - try(SequentialWriter writer = SequentialWriter.open(f)) + try(SequentialWriter writer = new SequentialWriter(f)) { long numWritten = 0; while (numWritten < params.fileLength) @@ -326,7 +326,7 @@ public class RandomAccessReaderTest File f = File.createTempFile("testReadBytes", "1"); final String expected = "The quick brown fox jumps over the lazy dog"; - try(SequentialWriter writer = SequentialWriter.open(f)) + try(SequentialWriter writer = new SequentialWriter(f)) { writer.write(expected.getBytes()); writer.finish(); @@ -355,7 +355,7 @@ public class RandomAccessReaderTest final String expected = "The quick brown fox jumps over the lazy dog"; final int numIterations = 10; - try(SequentialWriter writer = SequentialWriter.open(f)) + try(SequentialWriter writer = new SequentialWriter(f)) { for (int i = 0; i < numIterations; i++) writer.write(expected.getBytes()); @@ -435,7 +435,7 @@ public class RandomAccessReaderTest Random r = new Random(seed); r.nextBytes(expected); - try(SequentialWriter writer = SequentialWriter.open(f)) + try(SequentialWriter writer = new SequentialWriter(f)) { writer.write(expected); writer.finish(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java index f5a366e..2797384 100644 --- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java @@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; import static org.apache.commons.io.FileUtils.*; +import static org.junit.Assert.assertEquals; public class SequentialWriterTest extends AbstractTransactionalTest { @@ -71,7 +72,10 @@ public class SequentialWriterTest extends AbstractTransactionalTest protected TestableSW(File file) throws IOException { - this(file, new SequentialWriter(file, 8 << 10, BufferType.OFF_HEAP)); + this(file, new SequentialWriter(file, SequentialWriterOption.newBuilder() + .bufferSize(8 << 10) + .bufferType(BufferType.OFF_HEAP) + .build())); } protected TestableSW(File file, SequentialWriter sw) throws IOException @@ -118,6 +122,47 @@ public class SequentialWriterTest extends AbstractTransactionalTest } } + @Test + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; + SequentialWriterOption option = SequentialWriterOption.newBuilder().bufferSize(bufferSize).build(); + try (SequentialWriter writer = new SequentialWriter(tempFile, option)) + { + // write bytes greather than buffer + writer.write(toWrite); + assertEquals(bufferSize, writer.getLastFlushOffset()); + assertEquals(writeSize, writer.position()); + // mark thi position + DataPosition pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(bufferSize * 2, writer.getLastFlushOffset()); + assertEquals(writeSize * 2, writer.position()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed + assertEquals(writeSize, writer.position()); + assertEquals(writeSize, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); + assertEquals(writeSize + 1, writer.position()); + // flush off set should not be increase + assertEquals(writeSize, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + // final file size check + assertEquals(writeSize + 1, tempFile.length()); + } + /** * Tests that the output stream exposed by SequentialWriter behaves as expected */ @@ -127,7 +172,8 @@ public class SequentialWriterTest extends AbstractTransactionalTest File tempFile = new File(Files.createTempDir(), "test.txt"); Assert.assertFalse("temp file shouldn't exist yet", tempFile.exists()); - try (DataOutputStream os = new DataOutputStream(SequentialWriter.open(tempFile).finishOnClose())) + SequentialWriterOption option = SequentialWriterOption.newBuilder().finishOnClose(true).build(); + try (DataOutputStream os = new DataOutputStream(new SequentialWriter(tempFile, option))) { os.writeUTF("123"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb221095/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index a3300ac..562416e 100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@ -19,14 +19,13 @@ package org.apache.cassandra.streaming.compression; import java.io.*; import java.util.*; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; import org.junit.Test; import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.util.SequentialWriterOption; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -72,7 +71,11 @@ public class CompressedInputStreamTest MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); CompressionParams param = CompressionParams.snappy(32); Map<Long, Long> index = new HashMap<Long, Long>(); - try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector)) + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, + desc.filenameFor(Component.COMPRESSION_INFO), + null, + SequentialWriterOption.DEFAULT, + param, collector)) { for (long l = 0L; l < 1000; l++) {
