Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/afe3fe3d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/afe3fe3d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/afe3fe3d Branch: refs/heads/cassandra-3.0 Commit: afe3fe3df98f3439bca05cbe132a0e97b66945e4 Parents: a68f8bd 9b97766 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Tue Nov 17 18:56:02 2015 +0000 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Nov 17 18:56:02 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/io/util/RandomAccessReader.java | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/afe3fe3d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index fb8f89a,08db386..4cb9275 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,5 +1,18 @@@ -2.1.12 +2.2.4 + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Make buffered read size configurable (CASSANDRA-10249) * Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879) * Reject incremental repair with subrange repair (CASSANDRA-10422) * Add a nodetool command to refresh size_estimates (CASSANDRA-9579) http://git-wip-us.apache.org/repos/asf/cassandra/blob/afe3fe3d/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 278f55c,d15fe46..751269b --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@@ -23,22 -24,31 +23,23 @@@ import java.nio.ByteBuffer import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.utils.ByteBufferUtil; -public class RandomAccessReader extends RandomAccessFile implements FileDataInput +public class RandomAccessReader extends AbstractDataInput implements FileDataInput { - public static final long CACHE_FLUSH_INTERVAL_IN_BYTES = (long) Math.pow(2, 27); // 128mb - // default buffer size, 64Kb public static final int DEFAULT_BUFFER_SIZE = 65536; + public static final int BUFFER_SIZE = Integer.getInteger("cassandra.rar_buffer_size", DEFAULT_BUFFER_SIZE); - // absolute filesystem path to the file - private final String filePath; - // buffer which will cache file blocks - protected byte[] buffer; + protected ByteBuffer buffer; - // `current` as current position in file // `bufferOffset` is the offset of the beginning of the buffer // `markedPointer` folds the offset of the last file mark - protected long bufferOffset, current = 0, markedPointer; - // `validBufferBytes` is the number of bytes in the buffer that are actually valid; - // this will be LESS than buffer capacity if buffer is not full! - protected int validBufferBytes = 0; + protected long bufferOffset, markedPointer; - // channel liked with the file, used to retrieve data and force updates. - protected final FileChannel channel; + protected final ChannelProxy channel; // this can be overridden at construction to a value shorter than the true length of the file; // if so, it acts as an imposed limit on reads, rather than a convenience property @@@ -55,40 -73,39 +56,40 @@@ if (bufferSize <= 0) throw new IllegalArgumentException("bufferSize must be positive"); - buffer = new byte[bufferSize]; - // we can cache file length in read-only mode - long fileLength = overrideLength; - if (fileLength <= 0) - { - try - { - fileLength = channel.size(); - } - catch (IOException e) - { - throw new FSReadError(e, filePath); - } - } + fileLength = overrideLength <= 0 ? channel.size() : overrideLength; + + buffer = allocateBuffer(bufferSize, bufferType); + buffer.limit(0); + } - this.fileLength = fileLength; - validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations + protected ByteBuffer allocateBuffer(int bufferSize, BufferType bufferType) + { + int size = (int) Math.min(fileLength, bufferSize); + return bufferType.allocate(size); } - public static RandomAccessReader open(File file, long overrideSize, PoolingSegmentedFile owner) + public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner) { - return open(channel, DEFAULT_BUFFER_SIZE, overrideSize, owner); - return open(file, BUFFER_SIZE, overrideSize, owner); ++ return open(channel, BUFFER_SIZE, overrideSize, owner); } public static RandomAccessReader open(File file) { - return open(file, -1L); + try (ChannelProxy channel = new ChannelProxy(file)) + { + return open(channel); + } } - public static RandomAccessReader open(File file, long overrideSize) + public static RandomAccessReader open(ChannelProxy channel) { - return open(file, BUFFER_SIZE, overrideSize, null); + return open(channel, -1L); + } + + public static RandomAccessReader open(ChannelProxy channel, long overrideSize) + { - return open(channel, DEFAULT_BUFFER_SIZE, overrideSize, null); ++ return open(channel, BUFFER_SIZE, overrideSize, null); } @VisibleForTesting @@@ -105,15 -129,7 +106,15 @@@ @VisibleForTesting static RandomAccessReader open(SequentialWriter writer) { - return open(new File(writer.getPath()), BUFFER_SIZE, null); + try (ChannelProxy channel = new ChannelProxy(writer.getPath())) + { - return open(channel, DEFAULT_BUFFER_SIZE, null); ++ return open(channel, BUFFER_SIZE, null); + } + } + + public ChannelProxy getChannel() + { + return channel; } /**