http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index 16f791a..7365d40 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -17,49 +17,61 @@ */ package org.apache.cassandra.io.util; -import com.google.common.util.concurrent.RateLimiter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.apache.cassandra.cache.ChunkCache; import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.io.compress.*; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.compress.CompressedRandomAccessReader; -import org.apache.cassandra.io.compress.CompressedSequentialWriter; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.Ref; public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile { - private static final Logger logger = LoggerFactory.getLogger(CompressedSegmentedFile.class); - private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap; - public final CompressionMetadata metadata; - private final MmappedRegions regions; - public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata) + public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, Config.DiskAccessMode mode) { this(channel, - bufferSize, metadata, - useMmap + mode == DiskAccessMode.mmap ? MmappedRegions.map(channel, metadata) : null); } - public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, MmappedRegions regions) + public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + this(channel, metadata, regions, createRebufferer(channel, metadata, regions)); + } + + private static RebuffererFactory createRebufferer(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + return ChunkCache.maybeWrap(chunkReader(channel, metadata, regions)); + } + + public static ChunkReader chunkReader(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + return regions != null + ? new Mmap(channel, metadata, regions) + : new Standard(channel, metadata); + } + + public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, RebuffererFactory rebufferer) { - super(new Cleanup(channel, metadata, regions), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength); + super(new Cleanup(channel, metadata, regions, rebufferer), channel, rebufferer, metadata.compressedFileLength); this.metadata = metadata; - this.regions = regions; } private CompressedSegmentedFile(CompressedSegmentedFile copy) { super(copy); this.metadata = copy.metadata; - this.regions = copy.regions; } public ChannelProxy channel() @@ -67,33 +79,21 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse return channel; } - public MmappedRegions regions() - { - return regions; - } - private static final class Cleanup extends SegmentedFile.Cleanup { final CompressionMetadata metadata; - private final MmappedRegions regions; - protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions, ReaderFileProxy rebufferer) { - super(channel); + super(channel, rebufferer); this.metadata = metadata; - this.regions = regions; } public void tidy() { - Throwable err = regions == null ? null : regions.close(null); - if (err != null) + if (ChunkCache.instance != null) { - JVMStabilityInspector.inspectThrowable(err); - - // This is not supposed to happen - logger.error("Error while closing mmapped regions", err); + ChunkCache.instance.invalidateFile(name()); } - metadata.close(); super.tidy(); @@ -114,9 +114,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse public static class Builder extends SegmentedFile.Builder { final CompressedSequentialWriter writer; + final Config.DiskAccessMode mode; + public Builder(CompressedSequentialWriter writer) { this.writer = writer; + this.mode = DatabaseDescriptor.getDiskAccessMode(); } protected CompressionMetadata metadata(String path, long overrideLength) @@ -129,7 +132,7 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { - return new CompressedSegmentedFile(channel, bufferSize, metadata(channel.filePath(), overrideLength)); + return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength), mode); } } @@ -140,18 +143,216 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse super.dropPageCache(metadata.chunkFor(before).offset); } - public RandomAccessReader createReader() + public CompressionMetadata getMetadata() { - return new CompressedRandomAccessReader.Builder(this).build(); + return metadata; } - public RandomAccessReader createReader(RateLimiter limiter) + public long dataLength() { - return new CompressedRandomAccessReader.Builder(this).limiter(limiter).build(); + return metadata.dataLength; } - public CompressionMetadata getMetadata() + @VisibleForTesting + public abstract static class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader { - return metadata; + final CompressionMetadata metadata; + + public CompressedChunkReader(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata.dataLength); + this.metadata = metadata; + assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two + } + + @VisibleForTesting + public double getCrcCheckChance() + { + return metadata.parameters.getCrcCheckChance(); + } + + @Override + public String toString() + { + return String.format("CompressedChunkReader.%s(%s - %s, chunk length %d, data length %d)", + getClass().getSimpleName(), + channel.filePath(), + metadata.compressor().getClass().getSimpleName(), + metadata.chunkLength(), + metadata.dataLength); + } + + @Override + public int chunkSize() + { + return metadata.chunkLength(); + } + + @Override + public boolean alignmentRequired() + { + return true; + } + + @Override + public BufferType preferredBufferType() + { + return metadata.compressor().preferredBufferType(); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return BufferManagingRebufferer.on(this); + } + } + + static class Standard extends CompressedChunkReader + { + // we read the raw compressed bytes into this buffer, then uncompressed them into the provided one. + private final ThreadLocal<ByteBuffer> compressedHolder; + + public Standard(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel, metadata); + compressedHolder = ThreadLocal.withInitial(this::allocateBuffer); + } + + public ByteBuffer allocateBuffer() + { + return allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())); + } + + public ByteBuffer allocateBuffer(int size) + { + return metadata.compressor().preferredBufferType().allocate(size); + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + ByteBuffer compressed = compressedHolder.get(); + + if (compressed.capacity() < chunk.length) + { + compressed = allocateBuffer(chunk.length); + compressedHolder.set(compressed); + } + else + { + compressed.clear(); + } + + compressed.limit(chunk.length); + if (channel.read(compressed, chunk.offset) != chunk.length) + throw new CorruptBlockException(channel.filePath(), chunk); + + compressed.flip(); + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressed, uncompressed); + } + catch (IOException e) + { + throw new CorruptBlockException(channel.filePath(), chunk); + } + finally + { + uncompressed.flip(); + } + + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressed.rewind(); + int checksum = (int) metadata.checksumType.of(compressed); + + compressed.clear().limit(Integer.BYTES); + if (channel.read(compressed, chunk.offset + chunk.length) != Integer.BYTES + || compressed.getInt(0) != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + } + } + + static class Mmap extends CompressedChunkReader + { + protected final MmappedRegions regions; + + public Mmap(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) + { + super(channel, metadata); + this.regions = regions; + } + + @Override + public void readChunk(long position, ByteBuffer uncompressed) + { + try + { + // accesses must always be aligned + assert (position & -uncompressed.capacity()) == position; + assert position <= fileLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + MmappedRegions.Region region = regions.floor(chunk.offset); + long segmentOffset = region.offset(); + int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); + ByteBuffer compressedChunk = region.buffer(); + + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + uncompressed.clear(); + + try + { + metadata.compressor().uncompress(compressedChunk, uncompressed); + } + catch (IOException e) + { + throw new CorruptBlockException(channel.filePath(), chunk); + } + finally + { + uncompressed.flip(); + } + + if (getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); + + int checksum = (int) metadata.checksumType.of(compressedChunk); + + compressedChunk.limit(compressedChunk.capacity()); + if (compressedChunk.getInt() != checksum) + throw new CorruptBlockException(channel.filePath(), chunk); + } + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, channel.filePath()); + } + + } + + public void close() + { + regions.closeQuietly(); + super.close(); + } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index eb84a89..0c48d13 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -34,6 +34,7 @@ import com.google.common.base.Charsets; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.Throwables; public class DataIntegrityMetadata @@ -45,21 +46,21 @@ public class DataIntegrityMetadata public static class ChecksumValidator implements Closeable { - private final Checksum checksum; + private final ChecksumType checksumType; private final RandomAccessReader reader; public final int chunkSize; private final String dataFilename; public ChecksumValidator(Descriptor descriptor) throws IOException { - this(descriptor.version.uncompressedChecksumType().newInstance(), + this(descriptor.version.uncompressedChecksumType(), RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))), descriptor.filenameFor(Component.DATA)); } - public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException + public ChecksumValidator(ChecksumType checksumType, RandomAccessReader reader, String dataFilename) throws IOException { - this.checksum = checksum; + this.checksumType = checksumType; this.reader = reader; this.dataFilename = dataFilename; chunkSize = reader.readInt(); @@ -79,9 +80,7 @@ public class DataIntegrityMetadata public void validate(byte[] bytes, int start, int end) throws IOException { - checksum.update(bytes, start, end); - int current = (int) checksum.getValue(); - checksum.reset(); + int current = (int) checksumType.of(bytes, start, end); int actual = reader.readInt(); if (current != actual) throw new IOException("Corrupted File : " + dataFilename); http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ICompressedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java index 43cef8c..e69487c 100644 --- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java +++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java @@ -23,6 +23,4 @@ public interface ICompressedFile { ChannelProxy channel(); CompressionMetadata getMetadata(); - MmappedRegions regions(); - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java b/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java new file mode 100644 index 0000000..e69da70 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/LimitingRebufferer.java @@ -0,0 +1,106 @@ +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.RateLimiter; + +/** + * Rebufferer wrapper that applies rate limiting. + * + * Instantiated once per RandomAccessReader, thread-unsafe. + * The instances reuse themselves as the BufferHolder to avoid having to return a new object for each rebuffer call. + */ +public class LimitingRebufferer implements Rebufferer, Rebufferer.BufferHolder +{ + final private Rebufferer wrapped; + final private RateLimiter limiter; + final private int limitQuant; + + private BufferHolder bufferHolder; + private ByteBuffer buffer; + private long offset; + + public LimitingRebufferer(Rebufferer wrapped, RateLimiter limiter, int limitQuant) + { + this.wrapped = wrapped; + this.limiter = limiter; + this.limitQuant = limitQuant; + } + + @Override + public BufferHolder rebuffer(long position) + { + bufferHolder = wrapped.rebuffer(position); + buffer = bufferHolder.buffer(); + offset = bufferHolder.offset(); + int posInBuffer = Ints.checkedCast(position - offset); + int remaining = buffer.limit() - posInBuffer; + if (remaining == 0) + return this; + + if (remaining > limitQuant) + { + buffer.limit(posInBuffer + limitQuant); // certainly below current limit + remaining = limitQuant; + } + limiter.acquire(remaining); + return this; + } + + @Override + public ChannelProxy channel() + { + return wrapped.channel(); + } + + @Override + public long fileLength() + { + return wrapped.fileLength(); + } + + @Override + public double getCrcCheckChance() + { + return wrapped.getCrcCheckChance(); + } + + @Override + public void close() + { + wrapped.close(); + } + + @Override + public void closeReader() + { + wrapped.closeReader(); + } + + @Override + public String toString() + { + return "LimitingRebufferer[" + limiter.toString() + "]:" + wrapped.toString(); + } + + // BufferHolder methods + + @Override + public ByteBuffer buffer() + { + return buffer; + } + + @Override + public long offset() + { + return offset; + } + + @Override + public void release() + { + bufferHolder.release(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmapRebufferer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmapRebufferer.java b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java new file mode 100644 index 0000000..6c39cb1 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/MmapRebufferer.java @@ -0,0 +1,49 @@ +package org.apache.cassandra.io.util; + +/** + * Rebufferer for memory-mapped files. Thread-safe and shared among reader instances. + * This is simply a thin wrapper around MmappedRegions as the buffers there can be used directly after duplication. + */ +class MmapRebufferer extends AbstractReaderFileProxy implements Rebufferer, RebuffererFactory +{ + protected final MmappedRegions regions; + + public MmapRebufferer(ChannelProxy channel, long fileLength, MmappedRegions regions) + { + super(channel, fileLength); + this.regions = regions; + } + + @Override + public BufferHolder rebuffer(long position) + { + return regions.floor(position); + } + + @Override + public Rebufferer instantiateRebufferer() + { + return this; + } + + @Override + public void close() + { + regions.closeQuietly(); + } + + @Override + public void closeReader() + { + // Instance is shared among readers. Nothing to release. + } + + @Override + public String toString() + { + return String.format("%s(%s - data length %d)", + getClass().getSimpleName(), + channel.filePath(), + fileLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmappedRegions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java index 8f6cd92..f269b84 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedRegions.java +++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java @@ -22,8 +22,11 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; @@ -190,8 +193,20 @@ public class MmappedRegions extends SharedCloseableImpl assert !isCleanedUp() : "Attempted to use closed region"; return state.floor(position); } + + public void closeQuietly() + { + Throwable err = close(null); + if (err != null) + { + JVMStabilityInspector.inspectThrowable(err); + + // This is not supposed to happen + LoggerFactory.getLogger(getClass()).error("Error while closing mmapped regions", err); + } + } - public static final class Region + public static final class Region implements Rebufferer.BufferHolder { public final long offset; public final ByteBuffer buffer; @@ -202,15 +217,25 @@ public class MmappedRegions extends SharedCloseableImpl this.buffer = buffer; } - public long bottom() + public ByteBuffer buffer() + { + return buffer.duplicate(); + } + + public long offset() { return offset; } - public long top() + public long end() { return offset + buffer.capacity(); } + + public void release() + { + // only released after no readers are present + } } private static final class State @@ -260,7 +285,7 @@ public class MmappedRegions extends SharedCloseableImpl private Region floor(long position) { - assert 0 <= position && position < length : String.format("%d >= %d", position, length); + assert 0 <= position && position <= length : String.format("%d > %d", position, length); int idx = Arrays.binarySearch(offsets, 0, last +1, position); assert idx != -1 : String.format("Bad position %d for regions %s, last %d in %s", position, Arrays.toString(offsets), last, channel); http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 5f56ff6..d514bf8 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -19,30 +19,29 @@ package org.apache.cassandra.io.util; import java.io.*; -import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.utils.JVMStabilityInspector; public class MmappedSegmentedFile extends SegmentedFile { private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class); - private final MmappedRegions regions; + public MmappedSegmentedFile(ChannelProxy channel, long length, MmappedRegions regions) + { + this(channel, new MmapRebufferer(channel, length, regions), length); + } - public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, MmappedRegions regions) + public MmappedSegmentedFile(ChannelProxy channel, RebuffererFactory rebufferer, long length) { - super(new Cleanup(channel, regions), channel, bufferSize, length); - this.regions = regions; + super(new Cleanup(channel, rebufferer), channel, rebufferer, length); } private MmappedSegmentedFile(MmappedSegmentedFile copy) { super(copy); - this.regions = copy.regions; } public MmappedSegmentedFile sharedCopy() @@ -50,49 +49,6 @@ public class MmappedSegmentedFile extends SegmentedFile return new MmappedSegmentedFile(this); } - public RandomAccessReader createReader() - { - return new RandomAccessReader.Builder(channel) - .overrideLength(length) - .regions(regions) - .build(); - } - - public RandomAccessReader createReader(RateLimiter limiter) - { - return new RandomAccessReader.Builder(channel) - .overrideLength(length) - .bufferSize(bufferSize) - .regions(regions) - .limiter(limiter) - .build(); - } - - private static final class Cleanup extends SegmentedFile.Cleanup - { - private final MmappedRegions regions; - - Cleanup(ChannelProxy channel, MmappedRegions regions) - { - super(channel); - this.regions = regions; - } - - public void tidy() - { - Throwable err = regions.close(null); - if (err != null) - { - JVMStabilityInspector.inspectThrowable(err); - - // This is not supposed to happen - logger.error("Error while closing mmapped regions", err); - } - - super.tidy(); - } - } - /** * Overrides the default behaviour to create segments of a maximum size. */ @@ -110,7 +66,7 @@ public class MmappedSegmentedFile extends SegmentedFile long length = overrideLength > 0 ? overrideLength : channel.size(); updateRegions(channel, length); - return new MmappedSegmentedFile(channel, bufferSize, length, regions.sharedCopy()); + return new MmappedSegmentedFile(channel, length, regions.sharedCopy()); } private void updateRegions(ChannelProxy channel, long length) http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index e6383cf..725b367 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -21,11 +21,13 @@ import java.io.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.RateLimiter; -import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.util.Rebufferer.BufferHolder; import org.apache.cassandra.utils.memory.BufferPool; public class RandomAccessReader extends RebufferingInputStream implements FileDataInput @@ -41,60 +43,22 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa // and because our BufferPool currently has a maximum allocation size of this. public static final int MAX_BUFFER_SIZE = 1 << 16; // 64k - // the IO channel to the file, we do not own a reference to this due to - // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to - // ensure that the channel stays open and that it is closed afterwards - protected final ChannelProxy channel; - - // optional memory mapped regions for the channel - protected final MmappedRegions regions; - - // An optional limiter that will throttle the amount of data we read - protected final RateLimiter limiter; - - // the file length, this can be overridden at construction to a value shorter - // than the true length of the file; if so, it acts as an imposed limit on reads, - // required when opening sstables early not to read past the mark - private final long fileLength; - - // the buffer size for buffered readers - protected final int bufferSize; - - // the buffer type for buffered readers - protected final BufferType bufferType; - - // offset from the beginning of the file - protected long bufferOffset; - // offset of the last file mark protected long markedPointer; - protected RandomAccessReader(Builder builder) - { - super(builder.createBuffer()); + @VisibleForTesting + final Rebufferer rebufferer; + BufferHolder bufferHolder = Rebufferer.EMPTY; - this.channel = builder.channel; - this.regions = builder.regions; - this.limiter = builder.limiter; - this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() : builder.overrideLength; - this.bufferSize = builder.bufferSize; - this.bufferType = builder.bufferType; - this.buffer = builder.buffer; - } - - protected static ByteBuffer allocateBuffer(int size, BufferType bufferType) + protected RandomAccessReader(Rebufferer rebufferer) { - return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN); + super(Rebufferer.EMPTY.buffer()); + this.rebufferer = rebufferer; } - protected void releaseBuffer() + public static ByteBuffer allocateBuffer(int size, BufferType bufferType) { - if (buffer != null) - { - if (regions == null) - BufferPool.put(buffer); - buffer = null; - } + return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN); } /** @@ -105,80 +69,40 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa if (isEOF()) return; - if (regions == null) - reBufferStandard(); - else - reBufferMmap(); - - if (limiter != null) - limiter.acquire(buffer.remaining()); - - assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering"; + reBufferAt(current()); } - protected void reBufferStandard() + public void reBufferAt(long position) { - bufferOffset += buffer.position(); - assert bufferOffset < fileLength; - - buffer.clear(); - long position = bufferOffset; - long limit = bufferOffset; + bufferHolder.release(); + bufferHolder = rebufferer.rebuffer(position); + buffer = bufferHolder.buffer(); + buffer.position(Ints.checkedCast(position - bufferHolder.offset())); - long pageAligedPos = position & ~4095; - // Because the buffer capacity is a multiple of the page size, we read less - // the first time and then we should read at page boundaries only, - // unless the user seeks elsewhere - long upperLimit = Math.min(fileLength, pageAligedPos + buffer.capacity()); - buffer.limit((int)(upperLimit - position)); - while (buffer.hasRemaining() && limit < upperLimit) - { - int n = channel.read(buffer, position); - if (n < 0) - throw new FSReadError(new IOException("Unexpected end of file"), channel.filePath()); - - position += n; - limit = bufferOffset + buffer.position(); - } - - buffer.flip(); - } - - protected void reBufferMmap() - { - long position = bufferOffset + buffer.position(); - assert position < fileLength; - - MmappedRegions.Region region = regions.floor(position); - bufferOffset = region.bottom(); - buffer = region.buffer.duplicate(); - buffer.position(Ints.checkedCast(position - bufferOffset)); - - if (limiter != null && bufferSize < buffer.remaining()) - { // ensure accurate throttling - buffer.limit(buffer.position() + bufferSize); - } + assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering"; } @Override public long getFilePointer() { + if (buffer == null) // closed already + return rebufferer.fileLength(); return current(); } protected long current() { - return bufferOffset + (buffer == null ? 0 : buffer.position()); + return bufferHolder.offset() + buffer.position(); } public String getPath() { - return channel.filePath(); + return getChannel().filePath(); } public ChannelProxy getChannel() { - return channel; + return rebufferer.channel(); } @Override @@ -242,12 +166,14 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa @Override public void close() { - //make idempotent + // close needs to be idempotent. if (buffer == null) return; - bufferOffset += buffer.position(); - releaseBuffer(); + bufferHolder.release(); + rebufferer.closeReader(); + buffer = null; + bufferHolder = null; //For performance reasons we don't keep a reference to the file //channel so we don't close it @@ -256,7 +182,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa @Override public String toString() { - return getClass().getSimpleName() + "(filePath='" + channel + "')"; + return getClass().getSimpleName() + ':' + rebufferer.toString(); } /** @@ -281,26 +207,17 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa if (buffer == null) throw new IllegalStateException("Attempted to seek in a closed RAR"); - if (newPosition >= length()) // it is save to call length() in read-only mode - { - if (newPosition > length()) - throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode", - newPosition, getPath(), length())); - buffer.limit(0); - bufferOffset = newPosition; - return; - } - + long bufferOffset = bufferHolder.offset(); if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit()) { buffer.position((int) (newPosition - bufferOffset)); return; } - // Set current location to newPosition and clear buffer so reBuffer calculates from newPosition - bufferOffset = newPosition; - buffer.clear(); - reBuffer(); - assert current() == newPosition; + + if (newPosition > length()) + throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in read-only mode", + newPosition, getPath(), length())); + reBufferAt(newPosition); } /** @@ -353,7 +270,7 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa public long length() { - return fileLength; + return rebufferer.fileLength(); } public long getPosition() @@ -361,17 +278,38 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa return current(); } + public double getCrcCheckChance() + { + return rebufferer.getCrcCheckChance(); + } + + protected static Rebufferer instantiateRebufferer(RebuffererFactory fileRebufferer, RateLimiter limiter) + { + Rebufferer rebufferer = fileRebufferer.instantiateRebufferer(); + + if (limiter != null) + rebufferer = new LimitingRebufferer(rebufferer, limiter, MAX_BUFFER_SIZE); + + return rebufferer; + } + + public static RandomAccessReader build(SegmentedFile file, RateLimiter limiter) + { + return new RandomAccessReader(instantiateRebufferer(file.rebuffererFactory(), limiter)); + } + + public static Builder builder(ChannelProxy channel) + { + return new Builder(channel); + } + public static class Builder { // The NIO file channel or an empty channel public final ChannelProxy channel; - // We override the file length when we open sstables early, so that we do not - // read past the early mark - public long overrideLength; - // The size of the buffer for buffered readers - public int bufferSize; + protected int bufferSize; // The type of the buffer for buffered readers public BufferType bufferType; @@ -379,20 +317,20 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa // The buffer public ByteBuffer buffer; + // An optional limiter that will throttle the amount of data we read + public RateLimiter limiter; + // The mmap segments for mmap readers public MmappedRegions regions; - // An optional limiter that will throttle the amount of data we read - public RateLimiter limiter; + // Compression for compressed readers + public CompressionMetadata compression; public Builder(ChannelProxy channel) { this.channel = channel; - this.overrideLength = -1L; this.bufferSize = DEFAULT_BUFFER_SIZE; this.bufferType = BufferType.OFF_HEAP; - this.regions = null; - this.limiter = null; } /** The buffer size is typically already page aligned but if that is not the case @@ -400,38 +338,30 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa * buffer size unless we are throttling, in which case we may as well read the maximum * directly since the intention is to read the full file, see CASSANDRA-8630. * */ - private void setBufferSize() + private int adjustedBufferSize() { if (limiter != null) - { - bufferSize = MAX_BUFFER_SIZE; - return; - } - - if ((bufferSize & ~4095) != bufferSize) - { // should already be a page size multiple but if that's not case round it up - bufferSize = (bufferSize + 4095) & ~4095; - } + return MAX_BUFFER_SIZE; - bufferSize = Math.min(MAX_BUFFER_SIZE, bufferSize); + // should already be a page size multiple but if that's not case round it up + int wholePageSize = (bufferSize + 4095) & ~4095; + return Math.min(MAX_BUFFER_SIZE, wholePageSize); } - protected ByteBuffer createBuffer() + protected Rebufferer createRebufferer() { - setBufferSize(); - - buffer = regions == null - ? allocateBuffer(bufferSize, bufferType) - : regions.floor(0).buffer.duplicate(); - - buffer.limit(0); - return buffer; + return instantiateRebufferer(chunkReader(), limiter); } - public Builder overrideLength(long overrideLength) + public RebuffererFactory chunkReader() { - this.overrideLength = overrideLength; - return this; + if (compression != null) + return CompressedSegmentedFile.chunkReader(channel, compression, regions); + if (regions != null) + return new MmapRebufferer(channel, -1, regions); + + int adjustedSize = adjustedBufferSize(); + return new SimpleChunkReader(channel, -1, bufferType, adjustedSize); } public Builder bufferSize(int bufferSize) @@ -455,6 +385,12 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa return this; } + public Builder compression(CompressionMetadata metadata) + { + this.compression = metadata; + return this; + } + public Builder limiter(RateLimiter limiter) { this.limiter = limiter; @@ -463,12 +399,12 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa public RandomAccessReader build() { - return new RandomAccessReader(this); + return new RandomAccessReader(createRebufferer()); } public RandomAccessReader buildWithChannel() { - return new RandomAccessReaderWithOwnChannel(this); + return new RandomAccessReaderWithOwnChannel(createRebufferer()); } } @@ -479,9 +415,9 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa // not have a shared channel. public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader { - protected RandomAccessReaderWithOwnChannel(Builder builder) + protected RandomAccessReaderWithOwnChannel(Rebufferer rebufferer) { - super(builder); + super(rebufferer); } @Override @@ -493,7 +429,14 @@ public class RandomAccessReader extends RebufferingInputStream implements FileDa } finally { - channel.close(); + try + { + rebufferer.close(); + } + finally + { + getChannel().close(); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java b/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java new file mode 100644 index 0000000..3ddb143 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ReaderFileProxy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +/** + * Base class for the RandomAccessReader components that implement reading. + */ +public interface ReaderFileProxy extends AutoCloseable +{ + void close(); // no checked exceptions + + ChannelProxy channel(); + + long fileLength(); + + /** + * Needed for tests. Returns the table's CRC check chance, which is only set for compressed tables. + */ + double getCrcCheckChance(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/Rebufferer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/Rebufferer.java b/src/java/org/apache/cassandra/io/util/Rebufferer.java new file mode 100644 index 0000000..e88c7cb --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/Rebufferer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +/** + * Rebufferer for reading data by a RandomAccessReader. + */ +public interface Rebufferer extends ReaderFileProxy +{ + /** + * Rebuffer (move on or seek to) a given position, and return a buffer that can be used there. + * The only guarantee about the size of the returned data is that unless rebuffering at the end of the file, + * the buffer will not be empty and will contain the requested position, i.e. + * {@code offset <= position < offset + bh.buffer().limit()}, but the buffer will not be positioned there. + */ + BufferHolder rebuffer(long position); + + /** + * Called when a reader is closed. Should clean up reader-specific data. + */ + void closeReader(); + + public interface BufferHolder + { + /** + * Returns a useable buffer (i.e. one whose position and limit can be freely modified). Its limit will be set + * to the size of the available data in the buffer. + * The buffer must be treated as read-only. + */ + ByteBuffer buffer(); + + /** + * Position in the file of the start of the buffer. + */ + long offset(); + + /** + * To be called when this buffer is no longer in use. Must be called for all BufferHolders, or ChunkCache + * will not be able to free blocks. + */ + void release(); + } + + static final BufferHolder EMPTY = new BufferHolder() + { + final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + + @Override + public ByteBuffer buffer() + { + return EMPTY_BUFFER; + } + + @Override + public long offset() + { + return 0; + } + + @Override + public void release() + { + // nothing to do + } + }; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/RebuffererFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RebuffererFactory.java b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java new file mode 100644 index 0000000..ec35f0b --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/RebuffererFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +/** + * Interface for the classes that can be used to instantiate rebufferers over a given file. + * + * These are one of two types: + * - chunk sources (e.g. SimpleReadRebufferer) which instantiate a buffer managing rebufferer referencing + * themselves. + * - thread-safe shared rebufferers (e.g. MmapRebufferer) which directly return themselves. + */ +public interface RebuffererFactory extends ReaderFileProxy +{ + Rebufferer instantiateRebufferer(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index ab2d291..62e14ba 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -21,7 +21,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; -import java.util.function.Supplier; import com.google.common.util.concurrent.RateLimiter; @@ -52,27 +51,21 @@ import static org.apache.cassandra.utils.Throwables.maybeFail; public abstract class SegmentedFile extends SharedCloseableImpl { public final ChannelProxy channel; - public final int bufferSize; - public final long length; // This differs from length for compressed files (but we still need length for // SegmentIterator because offsets in the file are relative to the uncompressed size) public final long onDiskLength; /** - * Use getBuilder to get a Builder to construct a SegmentedFile. + * Rebufferer to use to construct RandomAccessReaders. */ - SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long length) - { - this(cleanup, channel, bufferSize, length, length); - } + private final RebuffererFactory rebufferer; - protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, int bufferSize, long length, long onDiskLength) + protected SegmentedFile(Cleanup cleanup, ChannelProxy channel, RebuffererFactory rebufferer, long onDiskLength) { super(cleanup); + this.rebufferer = rebufferer; this.channel = channel; - this.bufferSize = bufferSize; - this.length = length; this.onDiskLength = onDiskLength; } @@ -80,8 +73,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl { super(copy); channel = copy.channel; - bufferSize = copy.bufferSize; - length = copy.length; + rebufferer = copy.rebufferer; onDiskLength = copy.onDiskLength; } @@ -90,12 +82,24 @@ public abstract class SegmentedFile extends SharedCloseableImpl return channel.filePath(); } + public long dataLength() + { + return rebufferer.fileLength(); + } + + public RebuffererFactory rebuffererFactory() + { + return rebufferer; + } + protected static class Cleanup implements RefCounted.Tidy { final ChannelProxy channel; - protected Cleanup(ChannelProxy channel) + final ReaderFileProxy rebufferer; + protected Cleanup(ChannelProxy channel, ReaderFileProxy rebufferer) { this.channel = channel; + this.rebufferer = rebufferer; } public String name() @@ -105,7 +109,14 @@ public abstract class SegmentedFile extends SharedCloseableImpl public void tidy() { - channel.close(); + try + { + channel.close(); + } + finally + { + rebufferer.close(); + } } } @@ -113,19 +124,12 @@ public abstract class SegmentedFile extends SharedCloseableImpl public RandomAccessReader createReader() { - return new RandomAccessReader.Builder(channel) - .overrideLength(length) - .bufferSize(bufferSize) - .build(); + return RandomAccessReader.build(this, null); } public RandomAccessReader createReader(RateLimiter limiter) { - return new RandomAccessReader.Builder(channel) - .overrideLength(length) - .bufferSize(bufferSize) - .limiter(limiter) - .build(); + return RandomAccessReader.build(this, limiter); } public FileDataInput createReader(long position) @@ -308,7 +312,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl @Override public String toString() { return getClass().getSimpleName() + "(path='" + path() + '\'' + - ", length=" + length + + ", length=" + rebufferer.fileLength() + ')'; -} + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java new file mode 100644 index 0000000..7bfb57b --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/SimpleChunkReader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.io.compress.BufferType; + +class SimpleChunkReader extends AbstractReaderFileProxy implements ChunkReader +{ + private final int bufferSize; + private final BufferType bufferType; + + public SimpleChunkReader(ChannelProxy channel, long fileLength, BufferType bufferType, int bufferSize) + { + super(channel, fileLength); + this.bufferSize = bufferSize; + this.bufferType = bufferType; + } + + @Override + public void readChunk(long position, ByteBuffer buffer) + { + buffer.clear(); + channel.read(buffer, position); + buffer.flip(); + } + + @Override + public int chunkSize() + { + return bufferSize; + } + + @Override + public BufferType preferredBufferType() + { + return bufferType; + } + + @Override + public boolean alignmentRequired() + { + return false; + } + + @Override + public Rebufferer instantiateRebufferer() + { + return BufferManagingRebufferer.on(this); + } + + @Override + public String toString() + { + return String.format("%s(%s - chunk length %d, data length %d)", + getClass().getSimpleName(), + channel.filePath(), + bufferSize, + fileLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/metrics/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CacheMetrics.java b/src/java/org/apache/cassandra/metrics/CacheMetrics.java index 151268b..e623dcb 100644 --- a/src/java/org/apache/cassandra/metrics/CacheMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CacheMetrics.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.metrics; -import java.util.concurrent.atomic.AtomicLong; - import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.RatioGauge; @@ -56,7 +54,7 @@ public class CacheMetrics * @param type Type of Cache to identify metrics. * @param cache Cache to measure metrics */ - public CacheMetrics(String type, final ICache cache) + public CacheMetrics(String type, final ICache<?, ?> cache) { MetricNameFactory factory = new DefaultNameFactory("Cache", type); http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java b/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java new file mode 100644 index 0000000..19d61ef --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/CacheMissMetrics.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.RatioGauge; +import com.codahale.metrics.Timer; +import org.apache.cassandra.cache.CacheSize; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +/** + * Metrics for {@code ICache}. + */ +public class CacheMissMetrics +{ + /** Cache capacity in bytes */ + public final Gauge<Long> capacity; + /** Total number of cache hits */ + public final Meter misses; + /** Total number of cache requests */ + public final Meter requests; + /** Latency of misses */ + public final Timer missLatency; + /** all time cache hit rate */ + public final Gauge<Double> hitRate; + /** 1m hit rate */ + public final Gauge<Double> oneMinuteHitRate; + /** 5m hit rate */ + public final Gauge<Double> fiveMinuteHitRate; + /** 15m hit rate */ + public final Gauge<Double> fifteenMinuteHitRate; + /** Total size of cache, in bytes */ + public final Gauge<Long> size; + /** Total number of cache entries */ + public final Gauge<Integer> entries; + + /** + * Create metrics for given cache. + * + * @param type Type of Cache to identify metrics. + * @param cache Cache to measure metrics + */ + public CacheMissMetrics(String type, final CacheSize cache) + { + MetricNameFactory factory = new DefaultNameFactory("Cache", type); + + capacity = Metrics.register(factory.createMetricName("Capacity"), (Gauge<Long>) cache::capacity); + misses = Metrics.meter(factory.createMetricName("Misses")); + requests = Metrics.meter(factory.createMetricName("Requests")); + missLatency = Metrics.timer(factory.createMetricName("MissLatency")); + hitRate = Metrics.register(factory.createMetricName("HitRate"), new RatioGauge() + { + @Override + public Ratio getRatio() + { + long req = requests.getCount(); + long mis = misses.getCount(); + return Ratio.of(req - mis, req); + } + }); + oneMinuteHitRate = Metrics.register(factory.createMetricName("OneMinuteHitRate"), new RatioGauge() + { + protected Ratio getRatio() + { + double req = requests.getOneMinuteRate(); + double mis = misses.getOneMinuteRate(); + return Ratio.of(req - mis, req); + } + }); + fiveMinuteHitRate = Metrics.register(factory.createMetricName("FiveMinuteHitRate"), new RatioGauge() + { + protected Ratio getRatio() + { + double req = requests.getFiveMinuteRate(); + double mis = misses.getFiveMinuteRate(); + return Ratio.of(req - mis, req); + } + }); + fifteenMinuteHitRate = Metrics.register(factory.createMetricName("FifteenMinuteHitRate"), new RatioGauge() + { + protected Ratio getRatio() + { + double req = requests.getFifteenMinuteRate(); + double mis = misses.getFifteenMinuteRate(); + return Ratio.of(req - mis, req); + } + }); + size = Metrics.register(factory.createMetricName("Size"), (Gauge<Long>) cache::weightedSize); + entries = Metrics.register(factory.createMetricName("Entries"), (Gauge<Integer>) cache::size); + } + + public void reset() + { + requests.mark(-requests.getCount()); + misses.mark(-misses.getCount()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 55ac7ac..c7d5f98 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -60,7 +60,7 @@ public class CompressedInputStream extends InputStream // number of bytes in the buffer that are actually valid protected int validBufferBytes = -1; - private final Checksum checksum; + private final ChecksumType checksumType; // raw checksum bytes private final byte[] checksumBytes = new byte[4]; @@ -76,11 +76,11 @@ public class CompressedInputStream extends InputStream public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier) { this.info = info; - this.checksum = checksumType.newInstance(); this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); this.crcCheckChanceSupplier = crcCheckChanceSupplier; + this.checksumType = checksumType; new Thread(new Reader(source, info, dataBuffer)).start(); } @@ -122,14 +122,11 @@ public class CompressedInputStream extends InputStream // validate crc randomly if (this.crcCheckChanceSupplier.get() > ThreadLocalRandom.current().nextDouble()) { - checksum.update(compressed, 0, compressed.length - checksumBytes.length); + int checksum = (int) checksumType.of(compressed, 0, compressed.length - checksumBytes.length); System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length); - if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue()) + if (Ints.fromByteArray(checksumBytes) != checksum) throw new IOException("CRC unmatched"); - - // reset checksum object back to the original (blank) state - checksum.reset(); } // buffer offset is always aligned http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index fb6e1fb..2118308 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1179,9 +1179,18 @@ public class NodeProbe implements AutoCloseable CassandraMetricsRegistry.JmxGaugeMBean.class).getValue(); case "Requests": case "Hits": + case "Misses": return JMX.newMBeanProxy(mbeanServerConn, new ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + ",name=" + metricName), CassandraMetricsRegistry.JmxMeterMBean.class).getCount(); + case "MissLatency": + return JMX.newMBeanProxy(mbeanServerConn, + new ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + ",name=" + metricName), + CassandraMetricsRegistry.JmxTimerMBean.class).getMean(); + case "MissLatencyUnit": + return JMX.newMBeanProxy(mbeanServerConn, + new ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + ",name=MissLatency"), + CassandraMetricsRegistry.JmxTimerMBean.class).getDurationUnit(); default: throw new RuntimeException("Unknown cache metric name."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/tools/nodetool/Info.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java b/src/java/org/apache/cassandra/tools/nodetool/Info.java index 268d9df..c37f3b8 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Info.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java @@ -117,6 +117,28 @@ public class Info extends NodeToolCmd probe.getCacheMetric("CounterCache", "HitRate"), cacheService.getCounterCacheSavePeriodInSeconds()); + // Chunk Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds + try + { + System.out.printf("%-23s: entries %d, size %s, capacity %s, %d misses, %d requests, %.3f recent hit rate, %.3f %s miss latency%n", + "Chunk Cache", + probe.getCacheMetric("ChunkCache", "Entries"), + FileUtils.stringifyFileSize((long) probe.getCacheMetric("ChunkCache", "Size")), + FileUtils.stringifyFileSize((long) probe.getCacheMetric("ChunkCache", "Capacity")), + probe.getCacheMetric("ChunkCache", "Misses"), + probe.getCacheMetric("ChunkCache", "Requests"), + probe.getCacheMetric("ChunkCache", "HitRate"), + probe.getCacheMetric("ChunkCache", "MissLatency"), + probe.getCacheMetric("ChunkCache", "MissLatencyUnit")); + } + catch (RuntimeException e) + { + if (!(e.getCause() instanceof InstanceNotFoundException)) + throw e; + + // Chunk cache is not on. + } + // check if node is already joined, before getting tokens, since it throws exception if not. if (probe.isJoined()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/utils/ChecksumType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ChecksumType.java b/src/java/org/apache/cassandra/utils/ChecksumType.java index c9a1eb8..3fa245b 100644 --- a/src/java/org/apache/cassandra/utils/ChecksumType.java +++ b/src/java/org/apache/cassandra/utils/ChecksumType.java @@ -24,7 +24,7 @@ import java.util.zip.Adler32; public enum ChecksumType { - Adler32() + Adler32 { @Override @@ -40,7 +40,7 @@ public enum ChecksumType } }, - CRC32() + CRC32 { @Override @@ -58,6 +58,28 @@ public enum ChecksumType }; public abstract Checksum newInstance(); - public abstract void update(Checksum checksum, ByteBuffer buf); + + private ThreadLocal<Checksum> instances = ThreadLocal.withInitial(this::newInstance); + + public Checksum threadLocalInstance() + { + return instances.get(); + } + + public long of(ByteBuffer buf) + { + Checksum checksum = instances.get(); + checksum.reset(); + update(checksum, buf); + return checksum.getValue(); + } + + public long of(byte[] data, int off, int len) + { + Checksum checksum = instances.get(); + checksum.reset(); + checksum.update(data, off, len); + return checksum.getValue(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index 38c008d..ad2404f 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -46,7 +46,7 @@ import org.apache.cassandra.utils.concurrent.Ref; public class BufferPool { /** The size of a page aligned buffer, 64KiB */ - static final int CHUNK_SIZE = 64 << 10; + public static final int CHUNK_SIZE = 64 << 10; @VisibleForTesting public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/long/org/apache/cassandra/cql3/CachingBench.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/CachingBench.java b/test/long/org/apache/cassandra/cql3/CachingBench.java new file mode 100644 index 0000000..370b3ff --- /dev/null +++ b/test/long/org/apache/cassandra/cql3/CachingBench.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import com.google.common.collect.Iterables; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.Config.DiskAccessMode; +import org.apache.cassandra.cache.ChunkCache; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; + +public class CachingBench extends CQLTester +{ + private static final String STRATEGY = "LeveledCompactionStrategy"; + + private static final int DEL_SECTIONS = 1000; + private static final int FLUSH_FREQ = 10000; + private static final int SCAN_FREQUENCY_INV = 12000; + static final int COUNT = 29000; + static final int ITERS = 9; + + static final int KEY_RANGE = 30; + static final int CLUSTERING_RANGE = 210000; + + static final int EXTRA_SIZE = 1025; + static final boolean CONCURRENT_COMPACTIONS = true; + + // The name of this method is important! + // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we + // are effectively overriding it. + @BeforeClass + public static void setUpClass() + { + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(100); + CQLTester.setUpClass(); + } + + String hashQuery; + + @Before + public void before() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key int," + + " column int," + + " data int," + + " extra text," + + " PRIMARY KEY(key, column)" + + ")" + ); + + String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int", + " CREATE FUNCTION %s (state int, val int)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val : state;'")).name; + String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text", + " CREATE FUNCTION %s (state int, val text)" + + " CALLED ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE java" + + " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name; + + String hashInt = createAggregate(KEYSPACE, "int", + " CREATE AGGREGATE %s (int)" + + " SFUNC " + hashIFunc + + " STYPE int" + + " INITCOND 1"); + String hashText = createAggregate(KEYSPACE, "text", + " CREATE AGGREGATE %s (text)" + + " SFUNC " + hashTFunc + + " STYPE int" + + " INITCOND 1"); + + hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s", + hashInt, hashInt, hashInt, hashText); + } + AtomicLong id = new AtomicLong(); + long compactionTimeNanos = 0; + + void pushData(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('.'); + int key = rand.nextInt(KEY_RANGE); + int column = rand.nextInt(CLUSTERING_RANGE); + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand)); + maybeCompact(ii); + } + } + + private String genExtra(Random rand) + { + StringBuilder builder = new StringBuilder(EXTRA_SIZE); + for (int i = 0; i < EXTRA_SIZE; ++i) + builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1))); + return builder.toString(); + } + + void readAndDelete(Random rand, int count) throws Throwable + { + for (int i = 0; i < count; ++i) + { + int key; + UntypedResultSet res; + long ii = id.incrementAndGet(); + if (ii % 1000 == 0) + System.out.print('-'); + if (rand.nextInt(SCAN_FREQUENCY_INV) != 1) + { + do + { + key = rand.nextInt(KEY_RANGE); + long cid = rand.nextInt(DEL_SECTIONS); + int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS); + int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS); + res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend); + } while (res.size() == 0); + UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size())); + int clustering = r.getInt("column"); + execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering); + } + else + { + execute(hashQuery); + } + maybeCompact(ii); + } + } + + private void maybeCompact(long ii) + { + if (ii % FLUSH_FREQ == 0) + { + System.out.print("F"); + flush(); + if (ii % (FLUSH_FREQ * 10) == 0) + { + System.out.println("C"); + long startTime = System.nanoTime(); + getCurrentColumnFamilyStore().enableAutoCompaction(!CONCURRENT_COMPACTIONS); + long endTime = System.nanoTime(); + compactionTimeNanos += endTime - startTime; + getCurrentColumnFamilyStore().disableAutoCompaction(); + } + } + } + + public void testSetup(String compactionClass, String compressorClass, DiskAccessMode mode, boolean cacheEnabled) throws Throwable + { + id.set(0); + compactionTimeNanos = 0; + ChunkCache.instance.enable(cacheEnabled); + DatabaseDescriptor.setDiskAccessMode(mode); + alterTable("ALTER TABLE %s WITH compaction = { 'class' : '" + compactionClass + "' };"); + alterTable("ALTER TABLE %s WITH compression = { 'sstable_compression' : '" + compressorClass + "' };"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.disableAutoCompaction(); + + long onStartTime = System.currentTimeMillis(); + ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List<Future<?>> tasks = new ArrayList<>(); + for (int ti = 0; ti < 1; ++ti) + { + Random rand = new Random(ti); + tasks.add(es.submit(() -> + { + for (int i = 0; i < ITERS; ++i) + try + { + pushData(rand, COUNT); + readAndDelete(rand, COUNT / 3); + } + catch (Throwable e) + { + throw new AssertionError(e); + } + })); + } + for (Future<?> task : tasks) + task.get(); + + flush(); + long onEndTime = System.currentTimeMillis(); + int startRowCount = countRows(cfs); + int startTombCount = countTombstoneMarkers(cfs); + int startRowDeletions = countRowDeletions(cfs); + int startTableCount = cfs.getLiveSSTables().size(); + long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + System.out.println("\nCompession: " + cfs.getCompressionParameters().toString()); + System.out.println("Reader " + cfs.getLiveSSTables().iterator().next().getFileDataInput(0).toString()); + if (cacheEnabled) + System.out.format("Cache size %s requests %,d hit ratio %f\n", + FileUtils.stringifyFileSize(ChunkCache.instance.metrics.size.getValue()), + ChunkCache.instance.metrics.requests.getCount(), + ChunkCache.instance.metrics.hitRate.getValue()); + else + { + Assert.assertTrue("Chunk cache had requests: " + ChunkCache.instance.metrics.requests.getCount(), ChunkCache.instance.metrics.requests.getCount() < COUNT); + System.out.println("Cache disabled"); + } + System.out.println(String.format("Operations completed in %.3fs", (onEndTime - onStartTime) * 1e-3)); + if (!CONCURRENT_COMPACTIONS) + System.out.println(String.format(", out of which %.3f for non-concurrent compaction", compactionTimeNanos * 1e-9)); + else + System.out.println(); + + String hashesBefore = getHashes(); + long startTime = System.currentTimeMillis(); + CompactionManager.instance.performMaximal(cfs, true); + long endTime = System.currentTimeMillis(); + + int endRowCount = countRows(cfs); + int endTombCount = countTombstoneMarkers(cfs); + int endRowDeletions = countRowDeletions(cfs); + int endTableCount = cfs.getLiveSSTables().size(); + long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables()); + + System.out.println(String.format("Major compaction completed in %.3fs", + (endTime - startTime) * 1e-3)); + System.out.println(String.format("At start: %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers", + startTableCount, FileUtils.stringifyFileSize(startSize), startRowCount, startRowDeletions, startTombCount)); + System.out.println(String.format("At end: %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers", + endTableCount, FileUtils.stringifyFileSize(endSize), endRowCount, endRowDeletions, endTombCount)); + String hashesAfter = getHashes(); + + Assert.assertEquals(hashesBefore, hashesAfter); + } + + private String getHashes() throws Throwable + { + long startTime = System.currentTimeMillis(); + String hashes = Arrays.toString(getRows(execute(hashQuery))[0]); + long endTime = System.currentTimeMillis(); + System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3)); + return hashes; + } + + @Test + public void testWarmup() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false); + } + + @Test + public void testLZ4CachedMmap() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, true); + } + + @Test + public void testLZ4CachedStandard() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, true); + } + + @Test + public void testLZ4UncachedMmap() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false); + } + + @Test + public void testLZ4UncachedStandard() throws Throwable + { + testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, false); + } + + @Test + public void testCachedStandard() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.standard, true); + } + + @Test + public void testUncachedStandard() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.standard, false); + } + + @Test + public void testMmapped() throws Throwable + { + testSetup(STRATEGY, "", DiskAccessMode.mmap, false /* doesn't matter */); + } + + int countTombstoneMarkers(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRangeTombstoneMarker()); + } + + int countRowDeletions(ColumnFamilyStore cfs) + { + return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive()); + } + + int countRows(ColumnFamilyStore cfs) + { + int nowInSec = FBUtilities.nowInSeconds(); + return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec)); + } + + private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate) + { + int count = 0; + for (SSTableReader reader : cfs.getLiveSSTables()) + count += count(reader, predicate); + return count; + } + + int count(SSTableReader reader, Predicate<Unfiltered> predicate) + { + int instances = 0; + try (ISSTableScanner partitions = reader.getScanner()) + { + while (partitions.hasNext()) + { + try (UnfilteredRowIterator iter = partitions.next()) + { + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + if (predicate.test(atom)) + ++instances; + } + } + } + } + return instances; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 1b36c03..1e7d05f 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -108,9 +108,6 @@ public abstract class CQLTester } PROTOCOL_VERSIONS = builder.build(); - // Once per-JVM is enough - prepareServer(); - nativeAddr = InetAddress.getLoopbackAddress(); try @@ -230,6 +227,9 @@ public abstract class CQLTester DatabaseDescriptor.setRowCacheSizeInMB(ROW_CACHE_SIZE_IN_MB); StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); + + // Once per-JVM is enough + prepareServer(); } @AfterClass http://git-wip-us.apache.org/repos/asf/cassandra/blob/30bb255e/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java index 3c83da7..b6f8ada 100644 --- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java +++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java @@ -34,6 +34,8 @@ public class SelectionColumnMappingTest extends CQLTester public static void setUpClass() { DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + + prepareServer(); } @Test
