Simplify CompressedRandomAccessReader to work around JDK FD bug patch by jbellis; reviewed by Aleksey Yeschenko and tested by Cathy Daw for CASSANDRA-5088
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55f936f1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55f936f1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55f936f1 Branch: refs/heads/cassandra-1.2 Commit: 55f936f1da13dc732da52735f1d1becc18546b0b Parents: bf1ed40 Author: Jonathan Ellis <[email protected]> Authored: Mon Jan 7 16:46:06 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jan 7 16:46:06 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressedRandomAccessReader.java | 55 ++++++--------- 2 files changed, 24 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1c0930..c5c3863 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1.9 + * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088) * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087) * fix multithreaded compaction deadlock (CASSANDRA-4492) * fix specifying and altering crc_check_chance (CASSANDRA-5053) http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 3d3b95b..a5faff1 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -19,19 +19,20 @@ package org.apache.cassandra.io.compress; import java.io.*; -import java.nio.channels.FileChannel; +import java.nio.ByteBuffer; import java.util.zip.CRC32; import java.util.zip.Checksum; -import com.google.common.primitives.Ints; - import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer" +/** + * CRAR extends RAR to transparently uncompress blocks from the file into RAR.buffer. Most of the RAR + * "read bytes from the buffer, rebuffering when necessary" machinery works unchanged after that. + */ public class CompressedRandomAccessReader extends RandomAccessReader { private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class); @@ -47,28 +48,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader } private final CompressionMetadata metadata; - // used by reBuffer() to escape creating lots of temporary buffers - private byte[] compressed; + + // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer. + private ByteBuffer compressed; // re-use single crc object private final Checksum checksum = new CRC32(); // raw checksum bytes - private final byte[] checksumBytes = new byte[4]; - - private final FileInputStream source; - private final FileChannel channel; + private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException { super(new File(dataFilePath), metadata.chunkLength(), skipIOCache); this.metadata = metadata; - compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]; - // can't use super.read(...) methods - // that is why we are allocating special InputStream to read data from disk - // from already open file descriptor - source = new FileInputStream(getFD()); - channel = source.getChannel(); // for position manipulation + compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); } @Override @@ -82,13 +76,18 @@ public class CompressedRandomAccessReader extends RandomAccessReader if (channel.position() != chunk.offset) channel.position(chunk.offset); - if (compressed.length < chunk.length) - compressed = new byte[chunk.length]; + if (compressed.capacity() < chunk.length) + compressed = ByteBuffer.wrap(new byte[chunk.length]); + else + compressed.clear(); + compressed.limit(chunk.length); - if (source.read(compressed, 0, chunk.length) != chunk.length) + if (channel.read(compressed) != chunk.length) throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset)); - - validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0); + // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes + // in the future this will save a lot of hair-pulling + compressed.flip(); + validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0); if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) { @@ -108,14 +107,13 @@ public class CompressedRandomAccessReader extends RandomAccessReader private int checksum(CompressionMetadata.Chunk chunk) throws IOException { assert channel.position() == chunk.offset + chunk.length; - - if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length) + checksumBytes.clear(); + if (channel.read(checksumBytes) != checksumBytes.capacity()) throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.", getPath(), chunk.offset, chunk.length)); - - return Ints.fromByteArray(checksumBytes); + return checksumBytes.getInt(0); } @Override @@ -125,13 +123,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader } @Override - public void close() throws IOException - { - super.close(); - source.close(); - } - - @Override public String toString() { return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength);
