8630: fixed coverity defects
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/967a2cfe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/967a2cfe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/967a2cfe Branch: refs/heads/trunk Commit: 967a2cfe179548835d5e8c1640889420ce0d40ce Parents: 9f0509e Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Mon Sep 7 09:44:46 2015 +0800 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Sep 10 13:55:06 2015 +0100 ---------------------------------------------------------------------- .../cassandra/hints/ChecksummedDataInput.java | 2 +- .../compress/CompressedRandomAccessReader.java | 30 +++---- .../cassandra/io/util/RandomAccessReader.java | 91 +++++++++----------- .../cassandra/utils/memory/BufferPool.java | 2 +- 4 files changed, 56 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/967a2cfe/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 543f14e..95ea256 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -137,7 +137,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR private void updateCrc() { - if (crcPosition == buffer.position() | crcUpdateDisabled) + if (crcPosition == buffer.position() || crcUpdateDisabled) return; assert crcPosition >= 0 && crcPosition < buffer.position(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/967a2cfe/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 0242871..7294923 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -48,29 +48,13 @@ public class CompressedRandomAccessReader extends RandomAccessReader protected CompressedRandomAccessReader(Builder builder) { - super(builder.initializeBuffers(false)); + super(builder); this.metadata = builder.metadata; this.checksum = metadata.checksumType.newInstance(); - initializeBuffer(); - } - - @Override - protected int getBufferSize(RandomAccessReader.Builder builder) - { - // this is the chunk data length, throttling is OK with this - return builder.bufferSize; - } - - @Override - protected void initializeBuffer() - { - buffer = allocateBuffer(bufferSize); - buffer.limit(0); - if (regions == null) { - compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())); + compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), bufferType); checksumBytes = ByteBuffer.wrap(new byte[4]); } } @@ -110,7 +94,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader if (compressed.capacity() < chunk.length) { BufferPool.put(compressed); - compressed = allocateBuffer(chunk.length); + compressed = allocateBuffer(chunk.length, bufferType); } else { @@ -278,6 +262,14 @@ public class CompressedRandomAccessReader extends RandomAccessReader } @Override + protected ByteBuffer createBuffer() + { + buffer = allocateBuffer(bufferSize, bufferType); + buffer.limit(0); + return buffer; + } + + @Override public RandomAccessReader build() { return new CompressedRandomAccessReader(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/967a2cfe/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index e9b0ee4..43589d0 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -33,9 +33,13 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa // The default buffer size when the client doesn't specify it public static final int DEFAULT_BUFFER_SIZE = 4096; - // The maximum buffer size when the limiter is not null, i.e. when throttling - // is enabled. This is required to avoid aquiring permits that are too large. - public static final int MAX_THROTTLED_BUFFER_SIZE = 1 << 16; // 64k + // The maximum buffer size, we will never buffer more than this size. Further, + // when the limiter is not null, i.e. when throttling is enabled, we read exactly + // this size, since when throttling the intention is to eventually read everything, + // see CASSANDRA-8630 + // NOTE: this size is chosen both for historical consistency, as a reasonable upper bound, + // and because our BufferPool currently has a maximum allocation size of this. + public static final int MAX_BUFFER_SIZE = 1 << 16; // 64k // the IO channel to the file, we do not own a reference to this due to // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to @@ -57,7 +61,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa protected final int bufferSize; // the buffer type for buffered readers - private final BufferType bufferType; + protected final BufferType bufferType; // offset from the beginning of the file protected long bufferOffset; @@ -67,42 +71,18 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa protected RandomAccessReader(Builder builder) { - super(null); + super(builder.createBuffer()); this.channel = builder.channel; this.regions = builder.regions; this.limiter = builder.limiter; this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() : builder.overrideLength; - this.bufferSize = getBufferSize(builder); + this.bufferSize = builder.bufferSize; this.bufferType = builder.bufferType; - - if (builder.bufferSize <= 0) - throw new IllegalArgumentException("bufferSize must be positive"); - - if (builder.initializeBuffers) - initializeBuffer(); - } - - protected int getBufferSize(Builder builder) - { - if (builder.limiter == null) - return builder.bufferSize; - - // limit to ensure more accurate throttling - return Math.min(MAX_THROTTLED_BUFFER_SIZE, builder.bufferSize); - } - - protected void initializeBuffer() - { - if (regions == null) - buffer = allocateBuffer(bufferSize); - else - buffer = regions.floor(0).buffer.duplicate(); - - buffer.limit(0); + this.buffer = builder.buffer; } - protected ByteBuffer allocateBuffer(int size) + protected static ByteBuffer allocateBuffer(int size, BufferType bufferType) { return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN); } @@ -396,35 +376,56 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa // The type of the buffer for buffered readers public BufferType bufferType; + // The buffer + public ByteBuffer buffer; + // The mmap segments for mmap readers public MmappedRegions regions; // An optional limiter that will throttle the amount of data we read public RateLimiter limiter; - public boolean initializeBuffers; - public Builder(ChannelProxy channel) { this.channel = channel; this.overrideLength = -1L; - this.bufferSize = getBufferSize(DEFAULT_BUFFER_SIZE); + this.bufferSize = DEFAULT_BUFFER_SIZE; this.bufferType = BufferType.OFF_HEAP; this.regions = null; this.limiter = null; - this.initializeBuffers = true; } /** The buffer size is typically already page aligned but if that is not the case - * make sure that it is a multiple of the page size, 4096. + * make sure that it is a multiple of the page size, 4096. Also limit it to the maximum + * buffer size unless we are throttling, in which case we may as well read the maximum + * directly since the intention is to read the full file, see CASSANDRA-8630. * */ - private static int getBufferSize(int size) + private void setBufferSize() { - if ((size & ~4095) != size) + if (limiter != null) + { + bufferSize = MAX_BUFFER_SIZE; + return; + } + + if ((bufferSize & ~4095) != bufferSize) { // should already be a page size multiple but if that's not case round it up - size = (size + 4095) & ~4095; + bufferSize = (bufferSize + 4095) & ~4095; } - return size; + + bufferSize = Math.min(MAX_BUFFER_SIZE, bufferSize); + } + + protected ByteBuffer createBuffer() + { + setBufferSize(); + + buffer = regions == null + ? allocateBuffer(bufferSize, bufferType) + : regions.floor(0).buffer.duplicate(); + + buffer.limit(0); + return buffer; } public Builder overrideLength(long overrideLength) @@ -441,7 +442,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa if (bufferSize <= 0) throw new IllegalArgumentException("bufferSize must be positive"); - this.bufferSize = getBufferSize(bufferSize); + this.bufferSize = bufferSize; return this; } @@ -463,12 +464,6 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa return this; } - public Builder initializeBuffers(boolean initializeBuffers) - { - this.initializeBuffers = initializeBuffers; - return this; - } - public RandomAccessReader build() { return new RandomAccessReader(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/967a2cfe/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index a64cfb0..f972059 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -44,7 +44,7 @@ import org.apache.cassandra.utils.concurrent.Ref; */ public class BufferPool { - /** The size of a page aligned buffer, 64kbit */ + /** The size of a page aligned buffer, 64KiB */ static final int CHUNK_SIZE = 64 << 10; @VisibleForTesting