Constrain internode message buffer sizes, and improve IO class hierarchy patch by ariel and benedict for CASSANDRA-8670
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/16499ca9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/16499ca9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/16499ca9 Branch: refs/heads/trunk Commit: 16499ca9b0080ea4d3c4ed3bc55c753bacc3c24e Parents: dbe909e Author: Ariel Weisberg <[email protected]> Authored: Tue Mar 31 17:28:15 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Mar 31 17:28:15 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 4 +- .../org/apache/cassandra/cache/OHCProvider.java | 8 + .../apache/cassandra/db/BatchlogManager.java | 2 +- .../org/apache/cassandra/db/SuperColumns.java | 2 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../cassandra/db/commitlog/CommitLog.java | 4 +- .../io/compress/CompressedSequentialWriter.java | 5 +- .../io/sstable/IndexSummaryBuilder.java | 5 +- .../io/sstable/format/SSTableReader.java | 5 +- .../io/sstable/format/big/BigTableWriter.java | 15 +- .../io/sstable/metadata/MetadataSerializer.java | 7 +- .../cassandra/io/util/AbstractDataOutput.java | 329 --------- .../io/util/BufferedDataOutputStreamPlus.java | 301 +++++++++ .../cassandra/io/util/ByteBufferDataInput.java | 1 - .../cassandra/io/util/DataOutputBuffer.java | 95 ++- .../cassandra/io/util/DataOutputByteBuffer.java | 59 -- .../cassandra/io/util/DataOutputPlus.java | 14 +- .../io/util/DataOutputStreamAndChannel.java | 55 -- .../cassandra/io/util/DataOutputStreamPlus.java | 111 ++- .../io/util/FastByteArrayOutputStream.java | 266 -------- .../org/apache/cassandra/io/util/Memory.java | 1 + .../cassandra/io/util/NIODataInputStream.java | 312 +++++++++ .../cassandra/io/util/SafeMemoryWriter.java | 117 +--- .../cassandra/io/util/SequentialWriter.java | 3 +- .../io/util/UnbufferedDataOutputStreamPlus.java | 374 +++++++++++ .../io/util/WrappedDataOutputStreamPlus.java | 68 ++ .../cassandra/net/IncomingTcpConnection.java | 9 +- .../cassandra/net/OutboundTcpConnection.java | 10 +- .../apache/cassandra/service/GCInspector.java | 46 +- .../cassandra/service/pager/PagingState.java | 2 +- .../cassandra/streaming/ConnectionHandler.java | 22 +- .../cassandra/streaming/StreamWriter.java | 11 +- .../compress/CompressedStreamWriter.java | 28 +- .../streaming/messages/CompleteMessage.java | 4 +- .../streaming/messages/IncomingFileMessage.java | 4 +- .../streaming/messages/OutgoingFileMessage.java | 7 +- .../streaming/messages/PrepareMessage.java | 4 +- .../streaming/messages/ReceivedMessage.java | 4 +- .../streaming/messages/RetryMessage.java | 4 +- .../messages/SessionFailedMessage.java | 4 +- .../streaming/messages/StreamMessage.java | 6 +- .../cassandra/thrift/CassandraServer.java | 4 + .../org/apache/cassandra/tools/NodeProbe.java | 2 +- .../org/apache/cassandra/tools/NodeTool.java | 4 +- .../org/apache/cassandra/transport/CBUtil.java | 4 +- .../cassandra/utils/memory/MemoryUtil.java | 47 ++ .../utils/vint/EncodedDataOutputStream.java | 4 +- .../cassandra/AbstractSerializationsTester.java | 8 +- .../apache/cassandra/db/SerializationsTest.java | 19 +- .../cassandra/gms/SerializationsTest.java | 6 +- .../cassandra/io/sstable/IndexSummaryTest.java | 4 + .../metadata/MetadataSerializerTest.java | 7 +- .../io/util/BufferedDataOutputStreamTest.java | 391 +++++++++++ .../cassandra/io/util/DataOutputTest.java | 50 +- .../io/util/NIODataInputStreamTest.java | 667 +++++++++++++++++++ .../cassandra/service/SerializationsTest.java | 5 +- .../apache/cassandra/utils/BloomFilterTest.java | 11 +- .../cassandra/utils/SerializationsTest.java | 6 +- 59 files changed, 2605 insertions(+), 965 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index beb05ab..22bdc5e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) * New tool added to validate all sstables in a node (CASSANDRA-5791) * Push notification when tracing completes for an operation (CASSANDRA-7807) * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 7c7e06a..7a9c3da 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -259,7 +259,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K try { stream = streamFactory.getOutputStream(writerPath); - writer = new DataOutputStreamPlus(stream); + writer = new WrappedDataOutputStreamPlus(stream); } catch (FileNotFoundException e) { @@ -334,7 +334,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K if (!file.isFile()) continue; // someone's been messing with our directory. naughty! - if (file.getName().endsWith(cacheNameFormat) + if (file.getName().endsWith(cacheNameFormat) || file.getName().endsWith(cacheType.toString())) { if (!file.delete()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java index 720121c..95c323a 100644 --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@ -21,9 +21,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; import java.util.Iterator; import java.util.UUID; +import com.google.common.base.Function; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.TypeSizes; @@ -270,5 +273,10 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { throw new UnsupportedOperationException("IMPLEMENT ME"); } + + public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 8eaea52..f5137fd 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -161,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean throw new AssertionError(); // cannot happen. } - return buf.asByteBuffer(); + return buf.buffer(); } private void replayAllFailedBatches() throws ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SuperColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java index 2006cbd..65e153f 100644 --- a/src/java/org/apache/cassandra/db/SuperColumns.java +++ b/src/java/org/apache/cassandra/db/SuperColumns.java @@ -186,7 +186,7 @@ public class SuperColumns { // Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName. // So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly. - // This is why we call toByteBuffer() and rebuild a Composite of the right type before call slice(). + // This is why we call buffer() and rebuild a Composite of the right type before call slice(). slices[i++] = type.make(name.toByteBuffer()).slice(); } return new SliceQueryFilter(slices, false, slices.length, 1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 9fa3c6b..af18b20 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1008,7 +1008,7 @@ public final class SystemKeyspace { DataOutputBuffer out = new DataOutputBuffer(); Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30); - return out.asByteBuffer(); + return out.buffer(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 664e38e..7fa7575 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -40,7 +40,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.ICompressor; -import org.apache.cassandra.io.util.DataOutputByteBuffer; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.metrics.CommitLogMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -251,7 +251,7 @@ public class CommitLog implements CommitLogMBean { ICRC32 checksum = CRC32Factory.instance.create(); final ByteBuffer buffer = alloc.getBuffer(); - DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer); + BufferedDataOutputStreamPlus dos = new BufferedDataOutputStreamPlus(null, buffer); // checksummed length dos.writeInt((int) size); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index fc679d5..eb9dcf8 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -17,10 +17,12 @@ */ package org.apache.cassandra.io.compress; +import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.util.zip.Adler32; import org.apache.cassandra.io.FSReadError; @@ -29,7 +31,6 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DataIntegrityMetadata; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.FBUtilities; @@ -79,7 +80,7 @@ public class CompressedSequentialWriter extends SequentialWriter metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath); this.sstableMetadataCollector = sstableMetadataCollector; - crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel)); + crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel))); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java index 696bbf8..c7c51e5 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable; +import java.io.IOException; import java.nio.ByteOrder; import java.util.Map; import java.util.TreeMap; @@ -151,7 +152,7 @@ public class IndexSummaryBuilder implements AutoCloseable return lastReadableBoundary; } - public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) + public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) throws IOException { return maybeAddEntry(decoratedKey, indexStart, 0, 0); } @@ -164,7 +165,7 @@ public class IndexSummaryBuilder implements AutoCloseable * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record * a value of 0 indicates we are not tracking readable boundaries */ - public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) + public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) throws IOException { if (keysWritten == nextSamplePosition) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index a27adf6..f6cd9b5 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -61,7 +61,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.SelfRefCounted; import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; @@ -863,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (summariesFile.exists()) FileUtils.deleteWithConfirm(summariesFile); - DataOutputStreamAndChannel oStream = null; + DataOutputStreamPlus oStream = null; try { - oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile)); + oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile)); IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel()); ByteBufferUtil.writeWithLength(first.getKey(), oStream); ByteBufferUtil.writeWithLength(last.getKey(), oStream); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 4a981ce..88cb067 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -32,7 +32,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.format.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.AbstractCompactedRow; @@ -107,7 +106,7 @@ public class BigTableWriter extends SSTableWriter return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } - private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) + private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException { metadataCollector.addKey(decoratedKey.getKey()); lastWrittenKey = decoratedKey; @@ -134,15 +133,15 @@ public class BigTableWriter extends SSTableWriter entry = row.write(startPosition, dataFile); if (entry == null) return null; + long endPosition = dataFile.getFilePointer(); + metadataCollector.update(endPosition - startPosition, row.columnStats()); + afterAppend(row.key, endPosition, entry); + return entry; } catch (IOException e) { throw new FSWriteError(e, dataFile.getPath()); } - long endPosition = dataFile.getFilePointer(); - metadataCollector.update(endPosition - startPosition, row.columnStats()); - afterAppend(row.key, endPosition, entry); - return entry; } public void append(DecoratedKey decoratedKey, ColumnFamily cf) @@ -504,7 +503,7 @@ public class BigTableWriter extends SSTableWriter return summary.getLastReadableBoundary(); } - public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) + public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException { bf.add(key); long indexStart = indexFile.getFilePointer(); @@ -545,7 +544,7 @@ public class BigTableWriter extends SSTableWriter { // bloom filter FileOutputStream fos = new FileOutputStream(path); - DataOutputStreamPlus stream = new DataOutputStreamPlus(new BufferedOutputStream(fos)); + DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos); FilterFactory.serialize(bf, stream); stream.flush(); fos.getFD().sync(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 0dcd981..2be69ab 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -22,15 +22,16 @@ import java.util.*; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; @@ -148,7 +149,7 @@ public class MetadataSerializer implements IMetadataSerializer { Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP); - try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS)))) + try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS)))) { serialize(currentComponents, out); out.flush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java deleted file mode 100644 index 8f4bed8..0000000 --- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; - -import org.apache.cassandra.utils.ByteBufferUtil; - -public abstract class AbstractDataOutput extends OutputStream implements DataOutputPlus -{ - /* - !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile. - */ - - /** - * Writes the entire contents of the byte array <code>buffer</code> to - * this RandomAccessFile starting at the current file pointer. - * - * @param buffer - * the buffer to be written. - * - * @throws IOException - * If an error occurs trying to write to this RandomAccessFile. - */ - public void write(byte[] buffer) throws IOException { - write(buffer, 0, buffer.length); - } - - /** - * Writes <code>count</code> bytes from the byte array <code>buffer</code> - * starting at <code>offset</code> to this RandomAccessFile starting at - * the current file pointer.. - * - * @param buffer - * the bytes to be written - * @param offset - * offset in buffer to get bytes - * @param count - * number of bytes in buffer to write - * - * @throws IOException - * If an error occurs attempting to write to this - * RandomAccessFile. - * @throws IndexOutOfBoundsException - * If offset or count are outside of bounds. - */ - public abstract void write(byte[] buffer, int offset, int count) throws IOException; - - /** - * Writes the specified byte <code>oneByte</code> to this RandomAccessFile - * starting at the current file pointer. Only the low order byte of - * <code>oneByte</code> is written. - * - * @param oneByte - * the byte to be written - * - * @throws IOException - * If an error occurs attempting to write to this - * RandomAccessFile. - */ - public abstract void write(int oneByte) throws IOException; - - /** - * Writes a boolean to this output stream. - * - * @param val - * the boolean value to write to the OutputStream - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeBoolean(boolean val) throws IOException { - write(val ? 1 : 0); - } - - /** - * Writes a 8-bit byte to this output stream. - * - * @param val - * the byte value to write to the OutputStream - * - * @throws java.io.IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeByte(int val) throws IOException { - write(val & 0xFF); - } - - /** - * Writes the low order 8-bit bytes from a String to this output stream. - * - * @param str - * the String containing the bytes to write to the OutputStream - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeBytes(String str) throws IOException { - byte bytes[] = new byte[str.length()]; - for (int index = 0; index < str.length(); index++) { - bytes[index] = (byte) (str.charAt(index) & 0xFF); - } - write(bytes); - } - - /** - * Writes the specified 16-bit character to the OutputStream. Only the lower - * 2 bytes are written with the higher of the 2 bytes written first. This - * represents the Unicode value of val. - * - * @param val - * the character to be written - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeChar(int val) throws IOException { - write((val >>> 8) & 0xFF); - write((val >>> 0) & 0xFF); - } - - /** - * Writes the specified 16-bit characters contained in str to the - * OutputStream. Only the lower 2 bytes of each character are written with - * the higher of the 2 bytes written first. This represents the Unicode - * value of each character in str. - * - * @param str - * the String whose characters are to be written. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeChars(String str) throws IOException { - byte newBytes[] = new byte[str.length() * 2]; - for (int index = 0; index < str.length(); index++) { - int newIndex = index == 0 ? index : index * 2; - newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF); - newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF); - } - write(newBytes); - } - - /** - * Writes a 64-bit double to this output stream. The resulting output is the - * 8 bytes resulting from calling Double.doubleToLongBits(). - * - * @param val - * the double to be written. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeDouble(double val) throws IOException { - writeLong(Double.doubleToLongBits(val)); - } - - /** - * Writes a 32-bit float to this output stream. The resulting output is the - * 4 bytes resulting from calling Float.floatToIntBits(). - * - * @param val - * the float to be written. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeFloat(float val) throws IOException { - writeInt(Float.floatToIntBits(val)); - } - - /** - * Writes a 32-bit int to this output stream. The resulting output is the 4 - * bytes, highest order first, of val. - * - * @param val - * the int to be written. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public void writeInt(int val) throws IOException { - write((val >>> 24) & 0xFF); - write((val >>> 16) & 0xFF); - write((val >>> 8) & 0xFF); - write((val >>> 0) & 0xFF); - } - - /** - * Writes a 64-bit long to this output stream. The resulting output is the 8 - * bytes, highest order first, of val. - * - * @param val - * the long to be written. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public void writeLong(long val) throws IOException { - write((int)(val >>> 56) & 0xFF); - write((int)(val >>> 48) & 0xFF); - write((int)(val >>> 40) & 0xFF); - write((int)(val >>> 32) & 0xFF); - write((int)(val >>> 24) & 0xFF); - write((int)(val >>> 16) & 0xFF); - write((int)(val >>> 8) & 0xFF); - write((int) (val >>> 0) & 0xFF); - } - - /** - * Writes the specified 16-bit short to the OutputStream. Only the lower 2 - * bytes are written with the higher of the 2 bytes written first. - * - * @param val - * the short to be written - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public void writeShort(int val) throws IOException { - writeChar(val); - } - - /** - * Writes the specified String out in UTF format. - * - * @param str - * the String to be written in UTF format. - * - * @throws IOException - * If an error occurs attempting to write to this - * DataOutputStream. - */ - public final void writeUTF(String str) throws IOException { - int utfCount = 0, length = str.length(); - for (int i = 0; i < length; i++) { - int charValue = str.charAt(i); - if (charValue > 0 && charValue <= 127) { - utfCount++; - } else if (charValue <= 2047) { - utfCount += 2; - } else { - utfCount += 3; - } - } - if (utfCount > 65535) { - throw new UTFDataFormatException(); //$NON-NLS-1$ - } - byte utfBytes[] = new byte[utfCount + 2]; - int utfIndex = 2; - for (int i = 0; i < length; i++) { - int charValue = str.charAt(i); - if (charValue > 0 && charValue <= 127) { - utfBytes[utfIndex++] = (byte) charValue; - } else if (charValue <= 2047) { - utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6))); - utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); - } else { - utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12))); - utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6))); - utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue)); - } - } - utfBytes[0] = (byte) (utfCount >> 8); - utfBytes[1] = (byte) utfCount; - write(utfBytes); - } - - private byte[] buf; - public synchronized void write(ByteBuffer buffer) throws IOException - { - int len = buffer.remaining(); - if (len < 16) - { - int offset = buffer.position(); - for (int i = 0 ; i < len ; i++) - write(buffer.get(i + offset)); - return; - } - - byte[] buf = this.buf; - if (buf == null) - this.buf = buf = new byte[256]; - - int offset = 0; - while (len > 0) - { - int sublen = Math.min(buf.length, len); - ByteBufferUtil.arrayCopy(buffer, buffer.position() + offset, buf, 0, sublen); - write(buf, 0, sublen); - offset += sublen; - len -= sublen; - } - } - - public void write(Memory memory, long offset, long length) throws IOException - { - for (ByteBuffer buffer : memory.asByteBuffers(offset, length)) - write(buffer); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java new file mode 100644 index 0000000..f4f46a1 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.utils.memory.MemoryUtil; + + +/** + * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes + * before flushing them to an underlying channel. + * + * This class is completely thread unsafe. + */ +public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus +{ + private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32); + + ByteBuffer buffer; + + public BufferedDataOutputStreamPlus(RandomAccessFile ras) + { + this(ras.getChannel()); + } + + public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize) + { + this(ras.getChannel(), bufferSize); + } + + public BufferedDataOutputStreamPlus(FileOutputStream fos) + { + this(fos.getChannel()); + } + + public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize) + { + this(fos.getChannel(), bufferSize); + } + + public BufferedDataOutputStreamPlus(WritableByteChannel wbc) + { + this(wbc, DEFAULT_BUFFER_SIZE); + } + + public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize) + { + this(wbc, ByteBuffer.allocateDirect(bufferSize)); + Preconditions.checkNotNull(wbc); + Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double"); + } + + public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer) + { + super(channel); + this.buffer = buffer; + } + + public BufferedDataOutputStreamPlus(ByteBuffer buffer) + { + super(); + this.buffer = buffer; + } + + @Override + public void write(byte[] b) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException + { + if (b == null) + throw new NullPointerException(); + + // avoid int overflow + if (off < 0 || off > b.length || len < 0 + || len > b.length - off) + throw new IndexOutOfBoundsException(); + + if (len == 0) + return; + + int copied = 0; + while (copied < len) + { + if (buffer.hasRemaining()) + { + int toCopy = Math.min(len - copied, buffer.remaining()); + buffer.put(b, off + copied, toCopy); + copied += toCopy; + } + else + { + doFlush(); + } + } + } + + // ByteBuffer to use for defensive copies + private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer(); + + /* + * Makes a defensive copy of the incoming ByteBuffer and don't modify the position or limit + * even temporarily so it is thread-safe WRT to the incoming buffer + * (non-Javadoc) + * @see org.apache.cassandra.io.util.DataOutputPlus#write(java.nio.ByteBuffer) + */ + @Override + public void write(ByteBuffer toWrite) throws IOException + { + if (toWrite.hasArray()) + { + write(toWrite.array(), toWrite.arrayOffset() + toWrite.position(), toWrite.remaining()); + } + else + { + assert toWrite.isDirect(); + if (toWrite.remaining() > buffer.remaining()) + { + doFlush(); + MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer); + if (toWrite.remaining() > buffer.remaining()) + { + while (hollowBuffer.hasRemaining()) + channel.write(hollowBuffer); + } + else + { + buffer.put(hollowBuffer); + } + } + else + { + MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer); + buffer.put(hollowBuffer); + } + } + } + + + @Override + public void write(int b) throws IOException + { + ensureRemaining(1); + buffer.put((byte) (b & 0xFF)); + } + + @Override + public void writeBoolean(boolean v) throws IOException + { + ensureRemaining(1); + buffer.put(v ? (byte)1 : (byte)0); + } + + @Override + public void writeByte(int v) throws IOException + { + write(v); + } + + @Override + public void writeShort(int v) throws IOException + { + ensureRemaining(2); + buffer.putShort((short) v); + } + + @Override + public void writeChar(int v) throws IOException + { + ensureRemaining(2); + buffer.putChar((char) v); + } + + @Override + public void writeInt(int v) throws IOException + { + ensureRemaining(4); + buffer.putInt(v); + } + + @Override + public void writeLong(long v) throws IOException + { + ensureRemaining(8); + buffer.putLong(v); + } + + @Override + public void writeFloat(float v) throws IOException + { + ensureRemaining(4); + buffer.putFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException + { + ensureRemaining(8); + buffer.putDouble(v); + } + + @Override + public void writeBytes(String s) throws IOException + { + for (int index = 0; index < s.length(); index++) + writeByte(s.charAt(index)); + } + + @Override + public void writeChars(String s) throws IOException + { + for (int index = 0; index < s.length(); index++) + writeChar(s.charAt(index)); + } + + @Override + public void writeUTF(String s) throws IOException + { + UnbufferedDataOutputStreamPlus.writeUTF(s, this); + } + + @Override + public void write(Memory memory, long offset, long length) throws IOException + { + for (ByteBuffer buffer : memory.asByteBuffers(offset, length)) + write(buffer); + } + + protected void doFlush() throws IOException + { + buffer.flip(); + + while (buffer.hasRemaining()) + channel.write(buffer); + + buffer.clear(); + } + + @Override + public void flush() throws IOException + { + doFlush(); + } + + @Override + public void close() throws IOException + { + doFlush(); + channel.close(); + FileUtils.clean(buffer); + buffer = null; + } + + protected void ensureRemaining(int minimum) throws IOException + { + if (buffer.remaining() < minimum) + doFlush(); + } + + @Override + public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException + { + //Don't allow writes to the underlying channel while data is buffered + flush(); + return f.apply(channel); + } + + public BufferedDataOutputStreamPlus order(ByteOrder order) + { + this.buffer.order(order); + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java index 2d36d54..bf926e9 100644 --- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java +++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java @@ -19,7 +19,6 @@ package org.apache.cassandra.io.util; import java.io.*; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index c2eb08a..b556587 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -19,7 +19,7 @@ package org.apache.cassandra.io.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; +import java.nio.channels.WritableByteChannel; /** @@ -28,7 +28,7 @@ import java.util.Arrays; * * This class is completely thread unsafe. */ -public final class DataOutputBuffer extends DataOutputStreamPlus +public class DataOutputBuffer extends BufferedDataOutputStreamPlus { public DataOutputBuffer() { @@ -37,67 +37,88 @@ public final class DataOutputBuffer extends DataOutputStreamPlus public DataOutputBuffer(int size) { - super(new FastByteArrayOutputStream(size)); + super(ByteBuffer.allocate(size)); + } + + protected DataOutputBuffer(ByteBuffer buffer) + { + super(buffer); } @Override - public void write(int b) + public void flush() throws IOException { - try - { - super.write(b); - } - catch (IOException e) - { - throw new AssertionError(e); // FBOS does not throw IOE - } + throw new UnsupportedOperationException(); } @Override - public void write(byte[] b, int off, int len) + protected void doFlush() throws IOException { - try + reallocate(buffer.capacity() * 2); + } + + protected void reallocate(long newSize) + { + assert newSize <= Integer.MAX_VALUE; + ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize); + buffer.flip(); + newBuffer.put(buffer); + buffer = newBuffer; + } + + @Override + protected WritableByteChannel newDefaultChannel() + { + return new GrowingChannel(); + } + + private final class GrowingChannel implements WritableByteChannel + { + public int write(ByteBuffer src) throws IOException { - super.write(b, off, len); + int count = src.remaining(); + reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count)); + buffer.put(src); + return count; } - catch (IOException e) + + public boolean isOpen() + { + return true; + } + + public void close() throws IOException { - throw new AssertionError(e); // FBOS does not throw IOE } } - public void write(ByteBuffer buffer) throws IOException + @Override + public void close() throws IOException { - ((FastByteArrayOutputStream) out).write(buffer); } - /** - * Returns the current contents of the buffer. Data is only valid to - * {@link #getLength()}. - * - * @return the buffer contents - */ - public byte[] getData() + public ByteBuffer buffer() { - return ((FastByteArrayOutputStream) out).buf; + ByteBuffer result = buffer.duplicate(); + result.flip(); + return result; } - public byte[] toByteArray() + public byte[] getData() { - FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out; - return Arrays.copyOfRange(out.buf, 0, out.count); - + return buffer.array(); } - public ByteBuffer asByteBuffer() + public int getLength() { - FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out; - return ByteBuffer.wrap(out.buf, 0, out.count); + return buffer.position(); } - /** @return the length of the valid data currently in the buffer. */ - public int getLength() + public byte[] toByteArray() { - return ((FastByteArrayOutputStream) out).count; + ByteBuffer buffer = buffer(); + byte[] result = new byte[buffer.remaining()]; + buffer.get(result); + return result; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java deleted file mode 100644 index b40d30e..0000000 --- a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.cassandra.utils.ByteBufferUtil; - - -/** - * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing - * its buffer so copies can be avoided. - * - * This class is completely thread unsafe. - */ -public final class DataOutputByteBuffer extends AbstractDataOutput -{ - - final ByteBuffer buffer; - public DataOutputByteBuffer(ByteBuffer buffer) - { - this.buffer = buffer; - } - - @Override - public void write(int b) - { - buffer.put((byte) b); - } - - @Override - public void write(byte[] b, int off, int len) - { - buffer.put(b, off, len); - } - - public void write(ByteBuffer buffer) throws IOException - { - int len = buffer.remaining(); - ByteBufferUtil.arrayCopy(buffer, buffer.position(), this.buffer, this.buffer.position(), len); - this.buffer.position(this.buffer.position() + len); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java index c2901e1..f63c1e5 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java @@ -20,12 +20,24 @@ package org.apache.cassandra.io.util; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import com.google.common.base.Function; + +/** + * Extension to DataOutput that provides for writing ByteBuffer and Memory, potentially with an efficient + * implementation that is zero copy or at least has reduced bounds checking overhead. + */ public interface DataOutputPlus extends DataOutput { - // write the buffer without modifying its position void write(ByteBuffer buffer) throws IOException; void write(Memory memory, long offset, long length) throws IOException; + + /** + * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel + * and forget to flush + */ + <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java deleted file mode 100644 index 30cf38b..0000000 --- a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; - -public class DataOutputStreamAndChannel extends DataOutputStreamPlus -{ - private final WritableByteChannel channel; - public DataOutputStreamAndChannel(OutputStream os, WritableByteChannel channel) - { - super(os); - this.channel = channel; - } - public DataOutputStreamAndChannel(WritableByteChannel channel) - { - this(Channels.newOutputStream(channel), channel); - } - public DataOutputStreamAndChannel(FileOutputStream fos) - { - this(fos, fos.getChannel()); - } - - public void write(ByteBuffer buffer) throws IOException - { - buffer = buffer.duplicate(); - while (buffer.remaining() > 0) - channel.write(buffer); - } - - public WritableByteChannel getChannel() - { - return channel; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java index 7c1c9d8..6de2879 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java @@ -19,36 +19,117 @@ package org.apache.cassandra.io.util; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.utils.ByteBufferUtil; /** - * When possible use {@link DataOutputStreamAndChannel} instead of this class, as it will - * be more efficient. This class is only for situations where it cannot be used + * Abstract base class for DataOutputStreams that accept writes from ByteBuffer or Memory and also provide + * access to the underlying WritableByteChannel associated with their output stream. + * + * If no channel is provided by derived classes then a wrapper channel is provided. */ -public class DataOutputStreamPlus extends AbstractDataOutput implements DataOutputPlus +public abstract class DataOutputStreamPlus extends OutputStream implements DataOutputPlus { - protected final OutputStream out; - public DataOutputStreamPlus(OutputStream out) + //Dummy wrapper channel for derived implementations that don't have a channel + protected final WritableByteChannel channel; + + protected DataOutputStreamPlus() { - this.out = out; + this.channel = newDefaultChannel(); } - public void write(byte[] buffer, int offset, int count) throws IOException + protected DataOutputStreamPlus(WritableByteChannel channel) { - out.write(buffer, offset, count); + this.channel = channel; } - public void write(int oneByte) throws IOException + private static int MAX_BUFFER_SIZE = + Integer.getInteger(Config.PROPERTY_PREFIX + "data_output_stream_plus_temp_buffer_size", 8192); + + /* + * Factored out into separate method to create more flexibility around inlining + */ + protected static byte[] retrieveTemporaryBuffer(int minSize) { - out.write(oneByte); + byte[] bytes = tempBuffer.get(); + if (bytes.length < minSize) + { + // increase in powers of 2, to avoid wasted repeat allocations + bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))]; + tempBuffer.set(bytes); + } + return bytes; } - public void close() throws IOException + private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>() { - out.close(); - } + @Override + public byte[] initialValue() + { + return new byte[16]; + } + }; - public void flush() throws IOException + // Derived classes can override and *construct* a real channel, if it is not possible to provide one to the constructor + protected WritableByteChannel newDefaultChannel() { - out.flush(); + return new WritableByteChannel() + { + + @Override + public boolean isOpen() + { + return true; + } + + @Override + public void close() throws IOException + { + } + + @Override + public int write(ByteBuffer src) throws IOException + { + int toWrite = src.remaining(); + + if (src.hasArray()) + { + DataOutputStreamPlus.this.write(src.array(), src.arrayOffset() + src.position(), src.remaining()); + src.position(src.limit()); + return toWrite; + } + + if (toWrite < 16) + { + int offset = src.position(); + for (int i = 0 ; i < toWrite ; i++) + DataOutputStreamPlus.this.write(src.get(i + offset)); + src.position(src.limit()); + return toWrite; + } + + byte[] buf = retrieveTemporaryBuffer(toWrite); + + int totalWritten = 0; + while (totalWritten < toWrite) + { + int toWriteThisTime = Math.min(buf.length, toWrite - totalWritten); + + ByteBufferUtil.arrayCopy(src, src.position() + totalWritten, buf, 0, toWriteThisTime); + + DataOutputStreamPlus.this.write(buf, 0, toWriteThisTime); + + totalWritten += toWriteThisTime; + } + + src.position(src.limit()); + return totalWritten; + } + + }; } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java b/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java deleted file mode 100644 index 0e509b3..0000000 --- a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; - -import org.apache.cassandra.utils.ByteBufferUtil; - -/* - * This file has been modified from Apache Harmony's ByteArrayOutputStream - * implementation. The synchronized methods of the original have been - * replaced by non-synchronized methods. This makes certain operations - * much FASTer, but also *not thread-safe*. - * - * This file remains formatted the same as the Apache Harmony original to - * make patching easier if any bug fixes are made to the Harmony version. - */ - -/** - * A specialized {@link OutputStream} for class for writing content to an - * (internal) byte array. As bytes are written to this stream, the byte array - * may be expanded to hold more bytes. When the writing is considered to be - * finished, a copy of the byte array can be requested from the class. - * - * @see ByteArrayOutputStream - */ -public class FastByteArrayOutputStream extends OutputStream { - /** - * The byte array containing the bytes written. - */ - protected byte[] buf; - - /** - * The number of bytes written. - */ - protected int count; - - /** - * Constructs a new ByteArrayOutputStream with a default size of 32 bytes. - * If more than 32 bytes are written to this instance, the underlying byte - * array will expand. - */ - public FastByteArrayOutputStream() { - buf = new byte[32]; - } - - /** - * Constructs a new {@code ByteArrayOutputStream} with a default size of - * {@code size} bytes. If more than {@code size} bytes are written to this - * instance, the underlying byte array will expand. - * - * @param size - * initial size for the underlying byte array, must be - * non-negative. - * @throws IllegalArgumentException - * if {@code size < 0}. - */ - public FastByteArrayOutputStream(int size) { - if (size >= 0) { - buf = new byte[size]; - } else { - throw new IllegalArgumentException(); - } - } - - /** - * Closes this stream. This releases system resources used for this stream. - * - * @throws IOException - * if an error occurs while attempting to close this stream. - */ - @Override - public void close() throws IOException { - /** - * Although the spec claims "A closed stream cannot perform output - * operations and cannot be reopened.", this implementation must do - * nothing. - */ - super.close(); - } - - private void expand(int i) { - /* Can the buffer handle @i more bytes, if not expand it */ - if (count + i <= buf.length) { - return; - } - - long expectedExtent = (count + i) * 2L; //long to deal with possible int overflow - int newSize = (int) Math.min(Integer.MAX_VALUE - 8, expectedExtent); // MAX_ARRAY_SIZE - byte[] newbuf = new byte[newSize]; - System.arraycopy(buf, 0, newbuf, 0, count); - buf = newbuf; - } - - /** - * Resets this stream to the beginning of the underlying byte array. All - * subsequent writes will overwrite any bytes previously stored in this - * stream. - */ - public void reset() { - count = 0; - } - - /** - * Returns the total number of bytes written to this stream so far. - * - * @return the number of bytes written to this stream. - */ - public int size() { - return count; - } - - /** - * Returns the contents of this ByteArrayOutputStream as a byte array. Any - * changes made to the receiver after returning will not be reflected in the - * byte array returned to the caller. - * - * @return this stream's current contents as a byte array. - */ - public byte[] toByteArray() { - byte[] newArray = new byte[count]; - System.arraycopy(buf, 0, newArray, 0, count); - return newArray; - } - - /** - * Returns the contents of this ByteArrayOutputStream as a string. Any - * changes made to the receiver after returning will not be reflected in the - * string returned to the caller. - * - * @return this stream's current contents as a string. - */ - - @Override - public String toString() { - return new String(buf, 0, count); - } - - /** - * Returns the contents of this ByteArrayOutputStream as a string. Each byte - * {@code b} in this stream is converted to a character {@code c} using the - * following function: - * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is - * deprecated and either {@link #toString()} or {@link #toString(String)} - * should be used. - * - * @param hibyte - * the high byte of each resulting Unicode character. - * @return this stream's current contents as a string with the high byte set - * to {@code hibyte}. - * @deprecated Use {@link #toString()}. - */ - @Deprecated - public String toString(int hibyte) { - char[] newBuf = new char[size()]; - for (int i = 0; i < newBuf.length; i++) { - newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff)); - } - return new String(newBuf); - } - - /** - * Returns the contents of this ByteArrayOutputStream as a string converted - * according to the encoding declared in {@code enc}. - * - * @param enc - * a string representing the encoding to use when translating - * this stream to a string. - * @return this stream's current contents as an encoded string. - * @throws UnsupportedEncodingException - * if the provided encoding is not supported. - */ - public String toString(String enc) throws UnsupportedEncodingException { - return new String(buf, 0, count, enc); - } - - /** - * Writes {@code count} bytes from the byte array {@code buffer} starting at - * offset {@code index} to this stream. - * - * @param buffer - * the buffer to be written. - * @param offset - * the initial position in {@code buffer} to retrieve bytes. - * @param len - * the number of bytes of {@code buffer} to write. - * @throws NullPointerException - * if {@code buffer} is {@code null}. - * @throws IndexOutOfBoundsException - * if {@code offset < 0} or {@code len < 0}, or if - * {@code offset + len} is greater than the length of - * {@code buffer}. - */ - @Override - public void write(byte[] buffer, int offset, int len) { - // avoid int overflow - if (offset < 0 || offset > buffer.length || len < 0 - || len > buffer.length - offset - || this.count + len < 0) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return; - } - - /* Expand if necessary */ - expand(len); - System.arraycopy(buffer, offset, buf, this.count, len); - this.count += len; - } - - public void write(ByteBuffer buffer) - { - int len = buffer.remaining(); - expand(len); - ByteBufferUtil.arrayCopy(buffer, buffer.position(), buf, this.count, len); - this.count += len; - } - - /** - * Writes the specified byte {@code oneByte} to the OutputStream. Only the - * low order byte of {@code oneByte} is written. - * - * @param oneByte - * the byte to be written. - */ - @Override - public void write(int oneByte) { - if (count == buf.length) { - expand(1); - } - buf[count++] = (byte) oneByte; - } - - /** - * Takes the contents of this stream and writes it to the output stream - * {@code out}. - * - * @param out - * an OutputStream on which to write the contents of this stream. - * @throws IOException - * if an error occurs while writing to {@code out}. - */ - public void writeTo(OutputStream out) throws IOException { - out.write(buf, 0, count); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/Memory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java index 78a3ea5..07d3ca3 100644 --- a/src/java/org/apache/cassandra/io/util/Memory.java +++ b/src/java/org/apache/cassandra/io/util/Memory.java @@ -386,6 +386,7 @@ public class Memory implements AutoCloseable public ByteBuffer[] asByteBuffers(long offset, long length) { + checkBounds(offset, offset + length); if (size() == 0) return NO_BYTE_BUFFERS; http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java new file mode 100644 index 0000000..94ba9ed --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; + +import com.google.common.base.Preconditions; + +/** + * Rough equivalent of BufferedInputStream and DataInputStream wrapping the input stream of a File or Socket + * Created to work around the fact that when BIS + DIS delegate to NIO for socket IO they will allocate large + * thread local direct byte buffers when a large array is used to read. + * + * There may also be some performance improvement due to using a DBB as the underlying buffer for IO and the removal + * of some indirection and delegation when it comes to reading out individual values, but that is not the goal. + * + * Closing NIODataInputStream will invoke close on the ReadableByteChannel provided at construction. + * + * NIODataInputStream is not thread safe. + */ +public class NIODataInputStream extends InputStream implements DataInput, Closeable +{ + private final ReadableByteChannel rbc; + private final ByteBuffer buf; + + + public NIODataInputStream(ReadableByteChannel rbc, int bufferSize) + { + Preconditions.checkNotNull(rbc); + Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double"); + this.rbc = rbc; + buf = ByteBuffer.allocateDirect(bufferSize); + buf.position(0); + buf.limit(0); + } + + @Override + public void readFully(byte[] b) throws IOException + { + readFully(b, 0, b.length); + } + + + @Override + public void readFully(byte[] b, int off, int len) throws IOException + { + int copied = 0; + while (copied < len) + { + int read = read(b, off + copied, len - copied); + if (read < 0) + throw new EOFException(); + copied += read; + } + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + if (b == null) + throw new NullPointerException(); + + // avoid int overflow + if (off < 0 || off > b.length || len < 0 + || len > b.length - off) + throw new IndexOutOfBoundsException(); + + if (len == 0) + return 0; + + int copied = 0; + while (copied < len) + { + if (buf.hasRemaining()) + { + int toCopy = Math.min(len - copied, buf.remaining()); + buf.get(b, off + copied, toCopy); + copied += toCopy; + } + else + { + int read = readNext(); + if (read < 0 && copied == 0) return -1; + if (read <= 0) return copied; + } + } + + return copied; + } + + /* + * Refill the buffer, preserving any unread bytes remaining in the buffer + */ + private int readNext() throws IOException + { + Preconditions.checkState(buf.remaining() != buf.capacity()); + assert(buf.remaining() < 8); + + /* + * If there is data already at the start of the buffer, move the position to the end + * If there is data but not at the start, move it to the start + * Otherwise move the position to 0 so writes start at the beginning of the buffer + * + * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained + * while retrieving a multi-byte value while the position is in the middle. + */ + if (buf.position() == 0 && buf.hasRemaining()) + { + buf.position(buf.limit()); + } + else if (buf.hasRemaining()) + { + ByteBuffer dup = buf.duplicate(); + buf.clear(); + buf.put(dup); + } + else + { + buf.position(0); + } + + buf.limit(buf.capacity()); + + int read = 0; + while ((read = rbc.read(buf)) == 0) {} + + buf.flip(); + + return read; + } + + /* + * Read at least minimum bytes and throw EOF if that fails + */ + private void readMinimum(int minimum) throws IOException + { + assert(buf.remaining() < 8); + while (buf.remaining() < minimum) + { + int read = readNext(); + if (read == -1) + { + //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here + buf.position(0); + buf.limit(0); + throw new EOFException(); + } + } + } + + /* + * Ensure the buffer contains the minimum number of readable bytes + */ + private void prepareReadPrimitive(int minimum) throws IOException + { + if (buf.remaining() < minimum) readMinimum(minimum); + } + + @Override + public int skipBytes(int n) throws IOException + { + int skipped = 0; + + while (skipped < n) + { + int skippedThisTime = (int)skip(n - skipped); + if (skippedThisTime <= 0) break; + skipped += skippedThisTime; + } + + return skipped; + } + + @Override + public boolean readBoolean() throws IOException + { + prepareReadPrimitive(1); + return buf.get() != 0; + } + + @Override + public byte readByte() throws IOException + { + prepareReadPrimitive(1); + return buf.get(); + } + + @Override + public int readUnsignedByte() throws IOException + { + prepareReadPrimitive(1); + return buf.get() & 0xff; + } + + @Override + public short readShort() throws IOException + { + prepareReadPrimitive(2); + return buf.getShort(); + } + + @Override + public int readUnsignedShort() throws IOException + { + return readShort() & 0xFFFF; + } + + @Override + public char readChar() throws IOException + { + prepareReadPrimitive(2); + return buf.getChar(); + } + + @Override + public int readInt() throws IOException + { + prepareReadPrimitive(4); + return buf.getInt(); + } + + @Override + public long readLong() throws IOException + { + prepareReadPrimitive(8); + return buf.getLong(); + } + + @Override + public float readFloat() throws IOException + { + prepareReadPrimitive(4); + return buf.getFloat(); + } + + @Override + public double readDouble() throws IOException + { + prepareReadPrimitive(8); + return buf.getDouble(); + } + + @Override + public String readLine() throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public String readUTF() throws IOException + { + return DataInputStream.readUTF(this); + } + + @Override + public void close() throws IOException + { + rbc.close(); + } + + @Override + public int read() throws IOException + { + return readUnsignedByte(); + } + + @Override + public int available() throws IOException + { + if (rbc instanceof SeekableByteChannel) + { + SeekableByteChannel sbc = (SeekableByteChannel)rbc; + long remainder = Math.max(0, sbc.size() - sbc.position()); + return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining()); + } + return buf.remaining(); + } + + @Override + public void reset() throws IOException + { + throw new IOException("mark/reset not supported"); + } + + @Override + public boolean markSupported() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java index 6c87cf9..1fc374f 100644 --- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java +++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java @@ -22,122 +22,79 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus +public class SafeMemoryWriter extends DataOutputBuffer { - private ByteOrder order = ByteOrder.BIG_ENDIAN; - private SafeMemory buffer; - private long length; + private SafeMemory memory; public SafeMemoryWriter(long initialCapacity) { - buffer = new SafeMemory(initialCapacity); + this(new SafeMemory(initialCapacity)); } - @Override - public void write(byte[] buffer, int offset, int count) - { - long newLength = ensureCapacity(count); - this.buffer.setBytes(this.length, buffer, offset, count); - this.length = newLength; - } - - @Override - public void write(int oneByte) + private SafeMemoryWriter(SafeMemory memory) { - long newLength = ensureCapacity(1); - buffer.setByte(length++, (byte) oneByte); - length = newLength; + super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN)); + this.memory = memory; } - @Override - public void writeShort(int val) throws IOException + public SafeMemory currentBuffer() { - if (order != ByteOrder.nativeOrder()) - val = Short.reverseBytes((short) val); - long newLength = ensureCapacity(2); - buffer.setShort(length, (short) val); - length = newLength; + return memory; } - @Override - public void writeInt(int val) + protected void reallocate(long newCapacity) { - if (order != ByteOrder.nativeOrder()) - val = Integer.reverseBytes(val); - long newLength = ensureCapacity(4); - buffer.setInt(length, val); - length = newLength; - } + if (newCapacity != capacity()) + { + long position = length(); + ByteOrder order = buffer.order(); - @Override - public void writeLong(long val) - { - if (order != ByteOrder.nativeOrder()) - val = Long.reverseBytes(val); - long newLength = ensureCapacity(8); - buffer.setLong(length, val); - length = newLength; - } + SafeMemory oldBuffer = memory; + memory = this.memory.copy(newCapacity); + buffer = tailBuffer(memory); - @Override - public void write(ByteBuffer buffer) - { - long newLength = ensureCapacity(buffer.remaining()); - this.buffer.setBytes(length, buffer); - length = newLength; - } + int newPosition = (int) (position - tailOffset(memory)); + buffer.position(newPosition); + buffer.order(order); - @Override - public void write(Memory memory, long offset, long size) - { - long newLength = ensureCapacity(size); - buffer.put(length, memory, offset, size); - length = newLength; - } - - private long ensureCapacity(long size) - { - long newLength = this.length + size; - if (newLength > buffer.size()) - setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2))); - return newLength; - } - - public SafeMemory currentBuffer() - { - return buffer; + oldBuffer.free(); + } } public void setCapacity(long newCapacity) { - if (newCapacity != capacity()) - { - SafeMemory oldBuffer = buffer; - buffer = this.buffer.copy(newCapacity); - oldBuffer.free(); - } + reallocate(newCapacity); } public void close() { - buffer.close(); + memory.close(); } public long length() { - return length; + return tailOffset(memory) + buffer.position(); } public long capacity() { - return buffer.size(); + return memory.size(); } - // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully - // this would simplify IndexSummary.IndexSummarySerializer.serialize() + @Override public SafeMemoryWriter order(ByteOrder order) { - this.order = order; + super.order(order); return this; } + + private static long tailOffset(Memory memory) + { + return Math.max(0, memory.size - Integer.MAX_VALUE); + } + + private static ByteBuffer tailBuffer(Memory memory) + { + return memory.asByteBuffer(tailOffset(memory), (int) Math.min(memory.size, Integer.MAX_VALUE)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index f8ea92f..c4fef07 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -26,7 +26,6 @@ import java.nio.file.StandardOpenOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; @@ -97,7 +96,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne fd = CLibrary.getfd(channel); directoryFD = CLibrary.tryOpenDirectory(file.getParent()); - stream = new DataOutputStreamAndChannel(this, this); + stream = new WrappedDataOutputStreamPlus(this, this); } /**
