http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index b13d154..e9b0ee4 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -19,119 +19,129 @@ package org.apache.cassandra.io.util; import java.io.*; import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.BufferType; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.memory.BufferPool; -public class RandomAccessReader extends AbstractDataInput implements FileDataInput +public class RandomAccessReader extends RebufferingInputStream implements FileDataInput { + // The default buffer size when the client doesn't specify it public static final int DEFAULT_BUFFER_SIZE = 4096; + // The maximum buffer size when the limiter is not null, i.e. when throttling + // is enabled. This is required to avoid aquiring permits that are too large. + public static final int MAX_THROTTLED_BUFFER_SIZE = 1 << 16; // 64k + // the IO channel to the file, we do not own a reference to this due to // performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to // ensure that the channel stays open and that it is closed afterwards protected final ChannelProxy channel; - // buffer which will cache file blocks - protected ByteBuffer buffer; + // optional memory mapped regions for the channel + protected final MmappedRegions regions; - // `bufferOffset` is the offset of the beginning of the buffer - // `markedPointer` folds the offset of the last file mark - protected long bufferOffset, markedPointer; + // An optional limiter that will throttle the amount of data we read + protected final RateLimiter limiter; - // this can be overridden at construction to a value shorter than the true length of the file; - // if so, it acts as an imposed limit on reads, rather than a convenience property + // the file length, this can be overridden at construction to a value shorter + // than the true length of the file; if so, it acts as an imposed limit on reads, + // required when opening sstables early not to read past the mark private final long fileLength; - protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType) - { - this.channel = channel; + // the buffer size for buffered readers + protected final int bufferSize; - if (bufferSize <= 0) - throw new IllegalArgumentException("bufferSize must be positive"); + // the buffer type for buffered readers + private final BufferType bufferType; - // we can cache file length in read-only mode - fileLength = overrideLength <= 0 ? channel.size() : overrideLength; + // offset from the beginning of the file + protected long bufferOffset; - buffer = allocateBuffer(getBufferSize(bufferSize), bufferType); - buffer.limit(0); - } + // offset of the last file mark + protected long markedPointer; - /** The buffer size is typically already page aligned but if that is not the case - * make sure that it is a multiple of the page size, 4096. - * */ - protected int getBufferSize(int size) + protected RandomAccessReader(Builder builder) { - if ((size & ~4095) != size) - { // should already be a page size multiple but if that's not case round it up - size = (size + 4095) & ~4095; - } - return size; - } + super(null); - protected ByteBuffer allocateBuffer(int size, BufferType bufferType) - { - return BufferPool.get(size, bufferType); - } + this.channel = builder.channel; + this.regions = builder.regions; + this.limiter = builder.limiter; + this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() : builder.overrideLength; + this.bufferSize = getBufferSize(builder); + this.bufferType = builder.bufferType; - // A wrapper of the RandomAccessReader that closes the channel when done. - // For performance reasons RAR does not increase the reference count of - // a channel but assumes the owner will keep it open and close it, - // see CASSANDRA-9379, this thin class is just for those cases where we do - // not have a shared channel. - private static class RandomAccessReaderWithChannel extends RandomAccessReader - { - @SuppressWarnings("resource") - RandomAccessReaderWithChannel(File file) - { - super(new ChannelProxy(file), DEFAULT_BUFFER_SIZE, -1L, BufferType.OFF_HEAP); - } + if (builder.bufferSize <= 0) + throw new IllegalArgumentException("bufferSize must be positive"); - @Override - public void close() - { - try - { - super.close(); - } - finally - { - channel.close(); - } - } + if (builder.initializeBuffers) + initializeBuffer(); } - public static RandomAccessReader open(File file) + protected int getBufferSize(Builder builder) { - return new RandomAccessReaderWithChannel(file); + if (builder.limiter == null) + return builder.bufferSize; + + // limit to ensure more accurate throttling + return Math.min(MAX_THROTTLED_BUFFER_SIZE, builder.bufferSize); } - public static RandomAccessReader open(ChannelProxy channel) + protected void initializeBuffer() { - return open(channel, DEFAULT_BUFFER_SIZE, -1L); + if (regions == null) + buffer = allocateBuffer(bufferSize); + else + buffer = regions.floor(0).buffer.duplicate(); + + buffer.limit(0); } - public static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize) + protected ByteBuffer allocateBuffer(int size) { - return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.OFF_HEAP); + return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN); } - public ChannelProxy getChannel() + protected void releaseBuffer() { - return channel; + if (buffer != null) + { + if (regions == null) + BufferPool.put(buffer); + buffer = null; + } } /** * Read data from file starting from current currentOffset to populate buffer. */ - protected void reBuffer() + public void reBuffer() + { + if (isEOF()) + return; + + if (regions == null) + reBufferStandard(); + else + reBufferMmap(); + + if (limiter != null) + limiter.acquire(buffer.remaining()); + + assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering"; + } + + protected void reBufferStandard() { bufferOffset += buffer.position(); - buffer.clear(); assert bufferOffset < fileLength; + buffer.clear(); long position = bufferOffset; long limit = bufferOffset; @@ -145,15 +155,31 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp { int n = channel.read(buffer, position); if (n < 0) - break; + throw new FSReadError(new IOException("Unexpected end of file"), channel.filePath()); + position += n; limit = bufferOffset + buffer.position(); } - if (limit > fileLength) - buffer.position((int)(fileLength - bufferOffset)); + buffer.flip(); } + protected void reBufferMmap() + { + long position = bufferOffset + buffer.position(); + assert position < fileLength; + + MmappedRegions.Region region = regions.floor(position); + bufferOffset = region.bottom(); + buffer = region.buffer.duplicate(); + buffer.position(Ints.checkedCast(position - bufferOffset)); + + if (limiter != null && bufferSize < buffer.remaining()) + { // ensure accurate throttling + buffer.limit(buffer.position() + bufferSize); + } + } + @Override public long getFilePointer() { @@ -170,19 +196,23 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp return channel.filePath(); } - public int getTotalBufferSize() + public ChannelProxy getChannel() { - //This may NPE so we make a ref - //https://issues.apache.org/jira/browse/CASSANDRA-7756 - ByteBuffer ref = buffer; - return ref != null ? ref.capacity() : 0; + return channel; } - public void reset() + @Override + public void reset() throws IOException { seek(markedPointer); } + @Override + public boolean markSupported() + { + return true; + } + public long bytesPastMark() { long bytes = current() - markedPointer; @@ -215,7 +245,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp */ public boolean isEOF() { - return getFilePointer() == length(); + return current() == length(); } public long bytesRemaining() @@ -224,22 +254,29 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp } @Override + public int available() throws IOException + { + return Ints.saturatedCast(bytesRemaining()); + } + + @Override public void close() { //make idempotent if (buffer == null) return; - bufferOffset += buffer.position(); - BufferPool.put(buffer); - buffer = null; + releaseBuffer(); + + //For performance reasons we don't keep a reference to the file + //channel so we don't close it } @Override public String toString() { - return getClass().getSimpleName() + "(" + "filePath='" + channel + "')"; + return getClass().getSimpleName() + "(filePath='" + channel + "')"; } /** @@ -286,93 +323,197 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp assert current() == newPosition; } - // -1 will be returned if there is nothing to read; higher-level methods like readInt - // or readFully (from RandomAccessFile) will throw EOFException but this should not - public int read() + /** + * Reads a line of text form the current position in this file. A line is + * represented by zero or more characters followed by {@code '\n'}, {@code + * '\r'}, {@code "\r\n"} or the end of file marker. The string does not + * include the line terminating sequence. + * <p/> + * Blocks until a line terminating sequence has been read, the end of the + * file is reached or an exception is thrown. + * + * @return the contents of the line or {@code null} if no characters have + * been read before the end of the file has been reached. + * @throws IOException if this file is closed or another I/O error occurs. + */ + public final String readLine() throws IOException { - if (buffer == null) - throw new AssertionError("Attempted to read from closed RAR"); - - if (isEOF()) - return -1; // required by RandomAccessFile - - if (!buffer.hasRemaining()) - reBuffer(); + StringBuilder line = new StringBuilder(80); // Typical line length + boolean foundTerminator = false; + long unreadPosition = -1; + while (true) + { + int nextByte = read(); + switch (nextByte) + { + case -1: + return line.length() != 0 ? line.toString() : null; + case (byte) '\r': + if (foundTerminator) + { + seek(unreadPosition); + return line.toString(); + } + foundTerminator = true; + /* Have to be able to peek ahead one byte */ + unreadPosition = getPosition(); + break; + case (byte) '\n': + return line.toString(); + default: + if (foundTerminator) + { + seek(unreadPosition); + return line.toString(); + } + line.append((char) nextByte); + } + } + } - return (int)buffer.get() & 0xff; + public long length() + { + return fileLength; } - @Override - public int read(byte[] buffer) + public long getPosition() { - return read(buffer, 0, buffer.length); + return current(); } - @Override - // -1 will be returned if there is nothing to read; higher-level methods like readInt - // or readFully (from RandomAccessFile) will throw EOFException but this should not - public int read(byte[] buff, int offset, int length) + public static class Builder { - if (buffer == null) - throw new IllegalStateException("Attempted to read from closed RAR"); + // The NIO file channel or an empty channel + public final ChannelProxy channel; - if (length == 0) - return 0; + // We override the file length when we open sstables early, so that we do not + // read past the early mark + public long overrideLength; - if (isEOF()) - return -1; + // The size of the buffer for buffered readers + public int bufferSize; - if (!buffer.hasRemaining()) - reBuffer(); + // The type of the buffer for buffered readers + public BufferType bufferType; - int toCopy = Math.min(length, buffer.remaining()); - buffer.get(buff, offset, toCopy); - return toCopy; - } + // The mmap segments for mmap readers + public MmappedRegions regions; - public ByteBuffer readBytes(int length) throws EOFException - { - assert length >= 0 : "buffer length should not be negative: " + length; + // An optional limiter that will throttle the amount of data we read + public RateLimiter limiter; - if (buffer == null) - throw new IllegalStateException("Attempted to read from closed RAR"); + public boolean initializeBuffers; - try + public Builder(ChannelProxy channel) { - ByteBuffer result = ByteBuffer.allocate(length); - while (result.hasRemaining()) - { - if (isEOF()) - throw new EOFException(); - if (!buffer.hasRemaining()) - reBuffer(); - ByteBufferUtil.put(buffer, result); + this.channel = channel; + this.overrideLength = -1L; + this.bufferSize = getBufferSize(DEFAULT_BUFFER_SIZE); + this.bufferType = BufferType.OFF_HEAP; + this.regions = null; + this.limiter = null; + this.initializeBuffers = true; + } + + /** The buffer size is typically already page aligned but if that is not the case + * make sure that it is a multiple of the page size, 4096. + * */ + private static int getBufferSize(int size) + { + if ((size & ~4095) != size) + { // should already be a page size multiple but if that's not case round it up + size = (size + 4095) & ~4095; } - result.flip(); - return result; + return size; + } + + public Builder overrideLength(long overrideLength) + { + if (overrideLength > channel.size()) + throw new IllegalArgumentException("overrideLength cannot be more than the file size"); + + this.overrideLength = overrideLength; + return this; } - catch (EOFException e) + + public Builder bufferSize(int bufferSize) { - throw e; + if (bufferSize <= 0) + throw new IllegalArgumentException("bufferSize must be positive"); + + this.bufferSize = getBufferSize(bufferSize); + return this; } - catch (Exception e) + + public Builder bufferType(BufferType bufferType) { - throw new FSReadError(e, channel.toString()); + this.bufferType = bufferType; + return this; + } + + public Builder regions(MmappedRegions regions) + { + this.regions = regions; + return this; + } + + public Builder limiter(RateLimiter limiter) + { + this.limiter = limiter; + return this; + } + + public Builder initializeBuffers(boolean initializeBuffers) + { + this.initializeBuffers = initializeBuffers; + return this; + } + + public RandomAccessReader build() + { + return new RandomAccessReader(this); + } + + public RandomAccessReader buildWithChannel() + { + return new RandomAccessReaderWithOwnChannel(this); } } - public long length() + // A wrapper of the RandomAccessReader that closes the channel when done. + // For performance reasons RAR does not increase the reference count of + // a channel but assumes the owner will keep it open and close it, + // see CASSANDRA-9379, this thin class is just for those cases where we do + // not have a shared channel. + public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader { - return fileLength; + protected RandomAccessReaderWithOwnChannel(Builder builder) + { + super(builder); + } + + @Override + public void close() + { + try + { + super.close(); + } + finally + { + channel.close(); + } + } } - public long getPosition() + @SuppressWarnings("resource") + public static RandomAccessReader open(File file) { - return bufferOffset + (buffer == null ? 0 : buffer.position()); + return new Builder(new ChannelProxy(file)).buildWithChannel(); } - public long getPositionLimit() + public static RandomAccessReader open(ChannelProxy channel) { - return length(); + return new Builder(channel).build(); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java new file mode 100644 index 0000000..7d64f3d --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import net.nicoulaj.compilecommand.annotations.Print; +import org.apache.cassandra.utils.FastByteOperations; +import org.apache.cassandra.utils.vint.VIntCoding; + +import com.google.common.base.Preconditions; + +/** + * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled + * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc). + * + * RebufferingInputStream is not thread safe. + */ +public abstract class RebufferingInputStream extends InputStream implements DataInputPlus, Closeable +{ + protected ByteBuffer buffer; + + protected RebufferingInputStream(ByteBuffer buffer) + { + Preconditions.checkArgument(buffer == null || buffer.order() == ByteOrder.BIG_ENDIAN, "Buffer must have BIG ENDIAN byte ordering"); + this.buffer = buffer; + } + + /** + * Implementations must implement this method to refill the buffer. + * They can expect the buffer to be empty when this method is invoked. + * @throws IOException + */ + protected abstract void reBuffer() throws IOException; + + @Override + public void readFully(byte[] b) throws IOException + { + readFully(b, 0, b.length); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException + { + int read = read(b, off, len); + if (read < len) + throw new EOFException(); + } + + @Print + @Override + public int read(byte[] b, int off, int len) throws IOException { + + // avoid int overflow + if (off < 0 || off > b.length || len < 0 || len > b.length - off) + throw new IndexOutOfBoundsException(); + + if (len == 0) + return 0; + + int copied = 0; + while (copied < len) + { + int position = buffer.position(); + int remaining = buffer.limit() - position; + if (remaining == 0) + { + reBuffer(); + position = buffer.position(); + remaining = buffer.limit() - position; + if (remaining == 0) + return copied == 0 ? -1 : copied; + } + int toCopy = Math.min(len - copied, remaining); + FastByteOperations.copy(buffer, position, b, off + copied, toCopy); + buffer.position(position + toCopy); + copied += toCopy; + } + + return copied; + } + + @DontInline + protected long readPrimitiveSlowly(int bytes) throws IOException + { + long result = 0; + for (int i = 0; i < bytes; i++) + result = (result << 8) | (readByte() & 0xFFL); + return result; + } + + @Override + public int skipBytes(int n) throws IOException + { + int skipped = 0; + + while (skipped < n) + { + int skippedThisTime = (int)skip(n - skipped); + if (skippedThisTime <= 0) break; + skipped += skippedThisTime; + } + + return skipped; + } + + @Override + public boolean readBoolean() throws IOException + { + return readByte() != 0; + } + + @Override + public byte readByte() throws IOException + { + if (!buffer.hasRemaining()) + { + reBuffer(); + if (!buffer.hasRemaining()) + throw new EOFException(); + } + + return buffer.get(); + } + + @Override + public int readUnsignedByte() throws IOException + { + return readByte() & 0xff; + } + + @Override + public short readShort() throws IOException + { + if (buffer.remaining() >= 2) + return buffer.getShort(); + else + return (short) readPrimitiveSlowly(2); + } + + @Override + public int readUnsignedShort() throws IOException + { + return readShort() & 0xFFFF; + } + + @Override + public char readChar() throws IOException + { + if (buffer.remaining() >= 2) + return buffer.getChar(); + else + return (char) readPrimitiveSlowly(2); + } + + @Override + public int readInt() throws IOException + { + if (buffer.remaining() >= 4) + return buffer.getInt(); + else + return (int) readPrimitiveSlowly(4); + } + + @Override + public long readLong() throws IOException + { + if (buffer.remaining() >= 8) + return buffer.getLong(); + else + return readPrimitiveSlowly(8); + } + + public long readVInt() throws IOException + { + return VIntCoding.decodeZigZag64(readUnsignedVInt()); + } + + public long readUnsignedVInt() throws IOException + { + //If 9 bytes aren't available use the slow path in VIntCoding + if (buffer.remaining() < 9) + return VIntCoding.readUnsignedVInt(this); + + byte firstByte = buffer.get(); + + //Bail out early if this is one byte, necessary or it fails later + if (firstByte >= 0) + return firstByte; + + int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); + + int position = buffer.position(); + int extraBits = extraBytes * 8; + + long retval = buffer.getLong(position); + if (buffer.order() == ByteOrder.LITTLE_ENDIAN) + retval = Long.reverseBytes(retval); + buffer.position(position + extraBytes); + + // truncate the bytes we read in excess of those we needed + retval >>>= 64 - extraBits; + // remove the non-value bits from the first byte + firstByte &= VIntCoding.firstByteValueMask(extraBytes); + // shift the first byte up to its correct position + retval |= (long) firstByte << extraBits; + return retval; + } + + @Override + public float readFloat() throws IOException + { + if (buffer.remaining() >= 4) + return buffer.getFloat(); + else + return Float.intBitsToFloat((int)readPrimitiveSlowly(4)); + } + + @Override + public double readDouble() throws IOException + { + if (buffer.remaining() >= 8) + return buffer.getDouble(); + else + return Double.longBitsToDouble(readPrimitiveSlowly(8)); + } + + @Override + public String readLine() throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public String readUTF() throws IOException + { + return DataInputStream.readUTF(this); + } + + @Override + public int read() throws IOException + { + try + { + return readUnsignedByte(); + } + catch (EOFException ex) + { + return -1; + } + } + + @Override + public void reset() throws IOException + { + throw new IOException("mark/reset not supported"); + } + + @Override + public boolean markSupported() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index e586682..c2a2374 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -21,23 +21,19 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; -import java.nio.MappedByteBuffer; -import java.util.Iterator; -import java.util.NoSuchElementException; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.IndexSummaryBuilder; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; @@ -79,7 +75,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl this.onDiskLength = onDiskLength; } - public SegmentedFile(SegmentedFile copy) + protected SegmentedFile(SegmentedFile copy) { super(copy); channel = copy.channel; @@ -93,7 +89,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl return channel.filePath(); } - protected static abstract class Cleanup implements RefCounted.Tidy + protected static class Cleanup implements RefCounted.Tidy { final ChannelProxy channel; protected Cleanup(ChannelProxy channel) @@ -116,16 +112,22 @@ public abstract class SegmentedFile extends SharedCloseableImpl public RandomAccessReader createReader() { - return RandomAccessReader.open(channel, bufferSize, length); + return new RandomAccessReader.Builder(channel) + .overrideLength(length) + .bufferSize(bufferSize) + .build(); } - public RandomAccessReader createThrottledReader(RateLimiter limiter) + public RandomAccessReader createReader(RateLimiter limiter) { - assert limiter != null; - return ThrottledReader.open(channel, bufferSize, length, limiter); + return new RandomAccessReader.Builder(channel) + .overrideLength(length) + .bufferSize(bufferSize) + .limiter(limiter) + .build(); } - public FileDataInput getSegment(long position) + public FileDataInput createReader(long position) { RandomAccessReader reader = createReader(); reader.seek(position); @@ -153,14 +155,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl } /** - * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use. - */ - public Iterator<FileDataInput> iterator(long position) - { - return new SegmentIterator(position); - } - - /** * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it. */ public static abstract class Builder implements AutoCloseable @@ -168,13 +162,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl private ChannelProxy channel; /** - * Adds a position that would be a safe place for a segment boundary in the file. For a block/row based file - * format, safe boundaries are block/row edges. - * @param boundary The absolute position of the potential boundary in the file. - */ - public abstract void addPotentialBoundary(long boundary); - - /** * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk. * @param channel The channel to the file on disk. */ @@ -214,12 +201,12 @@ public abstract class SegmentedFile extends SharedCloseableImpl return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L); } - private int bufferSize(StatsMetadata stats) + private static int bufferSize(StatsMetadata stats) { return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile())); } - private int bufferSize(Descriptor desc, IndexSummary indexSummary) + private static int bufferSize(Descriptor desc, IndexSummary indexSummary) { File file = new File(desc.filenameFor(Component.PRIMARY_INDEX)); return bufferSize(file.length() / indexSummary.size()); @@ -267,13 +254,19 @@ public abstract class SegmentedFile extends SharedCloseableImpl return (int)Math.min(size, 1 << 16); } - public void serializeBounds(DataOutput out) throws IOException + public void serializeBounds(DataOutput out, Version version) throws IOException { + if (!version.hasBoundaries()) + return; + out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name()); } - public void deserializeBounds(DataInput in) throws IOException + public void deserializeBounds(DataInput in, Version version) throws IOException { + if (!version.hasBoundaries()) + return; + if (!in.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name())) throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!"); } @@ -282,6 +275,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl { if (channel != null) return channel.close(accumulate); + return accumulate; } @@ -294,6 +288,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl { if (channel != null) { + // This is really fragile, both path and channel.filePath() + // must agree, i.e. they both must be absolute or both relative + // eventually we should really pass the filePath to the builder + // constructor and remove this if (channel.filePath().equals(path)) return channel.sharedCopy(); else @@ -305,61 +303,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl } } - static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment> - { - public Segment(long offset, MappedByteBuffer segment) - { - super(offset, segment); - } - - public final int compareTo(Segment that) - { - return (int)Math.signum(this.left - that.left); - } - } - - /** - * A lazy Iterator over segments in forward order from the given position. It is caller's responsibility - * to close the FileDataIntputs when finished. - */ - final class SegmentIterator implements Iterator<FileDataInput> - { - private long nextpos; - public SegmentIterator(long position) - { - this.nextpos = position; - } - - public boolean hasNext() - { - return nextpos < length; - } - - public FileDataInput next() - { - long position = nextpos; - if (position >= length) - throw new NoSuchElementException(); - - FileDataInput segment = getSegment(nextpos); - try - { - nextpos = nextpos + segment.bytesRemaining(); - } - catch (IOException e) - { - throw new FSReadError(e, path()); - } - return segment; - } - - public void remove() { throw new UnsupportedOperationException(); } - } - @Override public String toString() { - return getClass().getSimpleName() + "(path='" + path() + "'" + + return getClass().getSimpleName() + "(path='" + path() + '\'' + ", length=" + length + - ")"; + ')'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java deleted file mode 100644 index 024d38f..0000000 --- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.cassandra.io.util; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import com.google.common.util.concurrent.RateLimiter; - -import org.apache.cassandra.io.compress.BufferType; - -public class ThrottledReader extends RandomAccessReader -{ - private final RateLimiter limiter; - - protected ThrottledReader(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter) - { - super(channel, bufferSize, overrideLength, BufferType.OFF_HEAP); - this.limiter = limiter; - } - - protected void reBuffer() - { - limiter.acquire(buffer.capacity()); - super.reBuffer(); - } - - public static ThrottledReader open(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter) - { - return new ThrottledReader(channel, bufferSize, overrideLength, limiter); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 6b24707..adbd091 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@ -55,7 +55,7 @@ public class CompressedStreamWriter extends StreamWriter public void write(DataOutputStreamPlus out) throws IOException { long totalSize = totalSize(); - try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel().sharedCopy()) + try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) { long progress = 0L; // calculate chunks to transfer. we want to send continuous chunks altogether. @@ -72,13 +72,7 @@ public class CompressedStreamWriter extends StreamWriter final long bytesTransferredFinal = bytesTransferred; final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); limiter.acquire(toTransfer); - long lastWrite = out.applyToChannel(new Function<WritableByteChannel, Long>() - { - public Long apply(WritableByteChannel wbc) - { - return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc); - } - }); + long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc)); bytesTransferred += lastWrite; progress += lastWrite; session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index abc2a37..a05c3c8 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -398,9 +398,6 @@ public class ByteBufferUtil if (length == 0) return EMPTY_BYTE_BUFFER; - if (in instanceof FileDataInput) - return ((FileDataInput) in).readBytes(length); - byte[] buff = new byte[length]; in.readFully(buff); return ByteBuffer.wrap(buff); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index d6ce7b4..a5170ff 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -82,29 +82,54 @@ public final class Throwables @SuppressWarnings("unchecked") public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E { - Throwable fail = null; - Iterator<DiscreteAction<? extends E>> iter = actions.iterator(); - while (iter.hasNext()) + Throwable fail = perform(null, actions); + if (failIfCanCast(fail, null)) + throw (E) fail; + } + + public static Throwable perform(Throwable accumulate, Stream<? extends DiscreteAction<?>> actions) + { + return perform(accumulate, actions.iterator()); + } + + public static Throwable perform(Throwable accumulate, Iterator<? extends DiscreteAction<?>> actions) + { + while (actions.hasNext()) { - DiscreteAction<? extends E> action = iter.next(); + DiscreteAction<?> action = actions.next(); try { action.perform(); } catch (Throwable t) { - fail = merge(fail, t); + accumulate = merge(accumulate, t); } } - - if (failIfCanCast(fail, null)) - throw (E) fail; + return accumulate; } @SafeVarargs public static void perform(File against, FileOpType opType, DiscreteAction<? extends IOException> ... actions) { - perform(Arrays.stream(actions).map((action) -> () -> + perform(against.getPath(), opType, actions); + } + + @SafeVarargs + public static void perform(String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions) + { + maybeFail(perform(null, filePath, opType, actions)); + } + + @SafeVarargs + public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions) + { + return perform(accumulate, filePath, opType, Arrays.stream(actions)); + } + + public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, Stream<DiscreteAction<? extends IOException>> actions) + { + return perform(accumulate, actions.map((action) -> () -> { try { @@ -112,7 +137,7 @@ public final class Throwables } catch (IOException e) { - throw (opType == FileOpType.WRITE) ? new FSWriteError(e, against) : new FSReadError(e, against); + throw (opType == FileOpType.WRITE) ? new FSWriteError(e, filePath) : new FSReadError(e, filePath); } })); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/vint/VIntCoding.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java index 0ac4124..daf5006 100644 --- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java +++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java @@ -67,7 +67,7 @@ public class VIntCoding return firstByte; int size = numberOfExtraBytesToRead(firstByte); - long retval = firstByte & firstByteValueMask(size);; + long retval = firstByte & firstByteValueMask(size); for (int ii = 0; ii < size; ii++) { byte b = input.readByte(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index fcdab62..555cdda 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -60,8 +60,7 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.ByteBufferDataInput; -import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; @@ -535,13 +534,12 @@ public class CommitLogTest { ByteBuffer buf = ByteBuffer.allocate(1024); CommitLogDescriptor.writeHeader(buf, desc); - long length = buf.position(); // Put some extra data in the stream. buf.putDouble(0.1); buf.flip(); - FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); + + DataInputBuffer input = new DataInputBuffer(buf, false); CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); - Assert.assertEquals("Descriptor length", length, input.getFilePointer()); Assert.assertEquals("Descriptors", desc, read); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 1ab9ca7..fea83c1 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.NIODataInputStream; +import org.apache.cassandra.io.util.RebufferingInputStream; /** * Utility class for tests needing to examine the commitlog contents. @@ -60,7 +61,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer @Override void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) { - NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); + RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size); Mutation mutation; try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java index e431924..323a12d 100644 --- a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java +++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java @@ -17,96 +17,217 @@ */ package org.apache.cassandra.hints; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.zip.CRC32; import org.junit.Test; -import org.apache.cassandra.hints.ChecksummedDataInput; -import org.apache.cassandra.io.util.AbstractDataInput; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class ChecksummedDataInputTest { @Test - public void testThatItWorks() throws IOException + public void testReadMethods() throws IOException { + // Make sure this array is bigger than the reader buffer size + // so we test updating the crc across buffer boundaries + byte[] b = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE * 2]; + for (int i = 0; i < b.length; i++) + b[i] = (byte)i; + + ByteBuffer buffer; + // fill a bytebuffer with some input - DataOutputBuffer out = new DataOutputBuffer(); - out.write(127); - out.write(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }); - out.writeBoolean(false); - out.writeByte(10); - out.writeChar('t'); - out.writeDouble(3.3); - out.writeFloat(2.2f); - out.writeInt(42); - out.writeLong(Long.MAX_VALUE); - out.writeShort(Short.MIN_VALUE); - out.writeUTF("utf"); - ByteBuffer buffer = out.buffer(); - - // calculate resulting CRC + try (DataOutputBuffer out = new DataOutputBuffer()) + { + out.write(127); + out.write(b); + out.writeBoolean(false); + out.writeByte(10); + out.writeChar('t'); + out.writeDouble(3.3); + out.writeFloat(2.2f); + out.writeInt(42); + out.writeLong(Long.MAX_VALUE); + out.writeShort(Short.MIN_VALUE); + out.writeUTF("utf"); + out.writeVInt(67L); + out.writeUnsignedVInt(88L); + out.writeBytes("abcdefghi"); + + buffer = out.buffer(); + } + + // calculate expected CRC CRC32 crc = new CRC32(); FBUtilities.updateChecksum(crc, buffer); - int expectedCRC = (int) crc.getValue(); - - ChecksummedDataInput crcInput = ChecksummedDataInput.wrap(new DummyByteBufferDataInput(buffer.duplicate())); - crcInput.limit(buffer.remaining()); - - // assert that we read all the right values back - assertEquals(127, crcInput.read()); - byte[] bytes = new byte[7]; - crcInput.readFully(bytes); - assertTrue(Arrays.equals(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }, bytes)); - assertEquals(false, crcInput.readBoolean()); - assertEquals(10, crcInput.readByte()); - assertEquals('t', crcInput.readChar()); - assertEquals(3.3, crcInput.readDouble()); - assertEquals(2.2f, crcInput.readFloat()); - assertEquals(42, crcInput.readInt()); - assertEquals(Long.MAX_VALUE, crcInput.readLong()); - assertEquals(Short.MIN_VALUE, crcInput.readShort()); - assertEquals("utf", crcInput.readUTF()); - - // assert that the crc matches, and that we've read exactly as many bytes as expected - assertEquals(0, crcInput.bytesRemaining()); - assertEquals(expectedCRC, crcInput.getCrc()); + + // save the buffer to file to create a RAR + File file = File.createTempFile("testReadMethods", "1"); + file.deleteOnExit(); + try (SequentialWriter writer = SequentialWriter.open(file)) + { + writer.write(buffer); + writer.writeInt((int) crc.getValue()); + writer.finish(); + } + + assertTrue(file.exists()); + assertEquals(buffer.remaining() + 4, file.length()); + + try (ChecksummedDataInput reader = ChecksummedDataInput.open(file)) + { + reader.limit(buffer.remaining() + 4); + + // assert that we read all the right values back + assertEquals(127, reader.read()); + byte[] bytes = new byte[b.length]; + reader.readFully(bytes); + assertTrue(Arrays.equals(bytes, b)); + assertEquals(false, reader.readBoolean()); + assertEquals(10, reader.readByte()); + assertEquals('t', reader.readChar()); + assertEquals(3.3, reader.readDouble()); + assertEquals(2.2f, reader.readFloat()); + assertEquals(42, reader.readInt()); + assertEquals(Long.MAX_VALUE, reader.readLong()); + assertEquals(Short.MIN_VALUE, reader.readShort()); + assertEquals("utf", reader.readUTF()); + assertEquals(67L, reader.readVInt()); + assertEquals(88L, reader.readUnsignedVInt()); + assertEquals("abcdefghi", new String(ByteBufferUtil.read(reader, 9).array(), StandardCharsets.UTF_8)); + + // assert that the crc matches, and that we've read exactly as many bytes as expected + assertTrue(reader.checkCrc()); + assertEquals(0, reader.bytesRemaining()); + + reader.checkLimit(0); + } } - private static final class DummyByteBufferDataInput extends AbstractDataInput + @Test + public void testResetCrc() throws IOException { - private final ByteBuffer buffer; + CRC32 crc = new CRC32(); + ByteBuffer buffer; + + // fill a bytebuffer with some input + try (DataOutputBuffer out = new DataOutputBuffer()) + { + out.write(127); + out.writeBoolean(false); + out.writeByte(10); + out.writeChar('t'); + + buffer = out.buffer(); + FBUtilities.updateChecksum(crc, buffer); + out.writeInt((int) crc.getValue()); + + int bufferPos = out.getLength(); + out.writeDouble(3.3); + out.writeFloat(2.2f); + out.writeInt(42); + + buffer = out.buffer(); + buffer.position(bufferPos); + crc.reset(); + FBUtilities.updateChecksum(crc, buffer); + + out.writeInt((int) crc.getValue()); + buffer = out.buffer(); + } - DummyByteBufferDataInput(ByteBuffer buffer) + // save the buffer to file to create a RAR + File file = File.createTempFile("testResetCrc", "1"); + file.deleteOnExit(); + try (SequentialWriter writer = SequentialWriter.open(file)) { - this.buffer = buffer; + writer.write(buffer); + writer.finish(); } - public void seek(long position) + assertTrue(file.exists()); + assertEquals(buffer.remaining(), file.length()); + + try (ChecksummedDataInput reader = ChecksummedDataInput.open(file)) { - throw new UnsupportedOperationException(); + reader.limit(buffer.remaining()); + + // assert that we read all the right values back + assertEquals(127, reader.read()); + assertEquals(false, reader.readBoolean()); + assertEquals(10, reader.readByte()); + assertEquals('t', reader.readChar()); + assertTrue(reader.checkCrc()); + + reader.resetCrc(); + assertEquals(3.3, reader.readDouble()); + assertEquals(2.2f, reader.readFloat()); + assertEquals(42, reader.readInt()); + assertTrue(reader.checkCrc()); + assertEquals(0, reader.bytesRemaining()); } + } - public long getPosition() + @Test + public void testFailedCrc() throws IOException + { + CRC32 crc = new CRC32(); + ByteBuffer buffer; + + // fill a bytebuffer with some input + try (DataOutputBuffer out = new DataOutputBuffer()) { - throw new UnsupportedOperationException(); + out.write(127); + out.writeBoolean(false); + out.writeByte(10); + out.writeChar('t'); + + buffer = out.buffer(); + FBUtilities.updateChecksum(crc, buffer); + + // update twice so it won't match + FBUtilities.updateChecksum(crc, buffer); + out.writeInt((int) crc.getValue()); + + buffer = out.buffer(); } - public long getPositionLimit() + // save the buffer to file to create a RAR + File file = File.createTempFile("testFailedCrc", "1"); + file.deleteOnExit(); + try (SequentialWriter writer = SequentialWriter.open(file)) { - throw new UnsupportedOperationException(); + writer.write(buffer); + writer.finish(); } - public int read() + assertTrue(file.exists()); + assertEquals(buffer.remaining(), file.length()); + + try (ChecksummedDataInput reader = ChecksummedDataInput.open(file)) { - return buffer.get() & 0xFF; + reader.limit(buffer.remaining()); + + // assert that we read all the right values back + assertEquals(127, reader.read()); + assertEquals(false, reader.readBoolean()); + assertEquals(10, reader.readByte()); + assertEquals('t', reader.readChar()); + assertFalse(reader.checkCrc()); + assertEquals(0, reader.bytesRemaining()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java deleted file mode 100644 index c1e43c9..0000000 --- a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.Arrays; -import java.util.concurrent.ThreadLocalRandom; - -import org.junit.Test; - -import static org.junit.Assert.*; -import org.apache.cassandra.io.util.ChecksummedRandomAccessReader; -import org.apache.cassandra.io.util.ChecksummedSequentialWriter; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SequentialWriter; - -public class ChecksummedRandomAccessReaderTest -{ - @Test - public void readFully() throws IOException - { - final File data = File.createTempFile("testReadFully", "data"); - final File crc = File.createTempFile("testReadFully", "crc"); - - final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering. - ThreadLocalRandom.current().nextBytes(expected); - - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(expected); - writer.finish(); - - assert data.exists(); - - RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); - byte[] b = new byte[expected.length]; - reader.readFully(b); - - assertArrayEquals(expected, b); - - assertTrue(reader.isEOF()); - - reader.close(); - } - - @Test - public void seek() throws IOException - { - final File data = File.createTempFile("testSeek", "data"); - final File crc = File.createTempFile("testSeek", "crc"); - - final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size - ThreadLocalRandom.current().nextBytes(dataBytes); - - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(dataBytes); - writer.finish(); - - assert data.exists(); - - RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); - - final int seekPosition = 66000; - reader.seek(seekPosition); - - byte[] b = new byte[dataBytes.length - seekPosition]; - reader.readFully(b); - - byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length); - - assertArrayEquals(expected, b); - - assertTrue(reader.isEOF()); - - reader.close(); - } - - @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class) - public void corruptionDetection() throws IOException - { - final File data = File.createTempFile("corruptionDetection", "data"); - final File crc = File.createTempFile("corruptionDetection", "crc"); - - final byte[] expected = new byte[5 * 1024]; - Arrays.fill(expected, (byte) 0); - - SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); - writer.write(expected); - writer.finish(); - - assert data.exists(); - - // simulate corruption of file - try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw")) - { - dataFile.seek(1024); - dataFile.write((byte) 5); - } - - RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); - byte[] b = new byte[expected.length]; - reader.readFully(b); - - assertArrayEquals(expected, b); - - assertTrue(reader.isEOF()); - - reader.close(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java deleted file mode 100644 index edbd603..0000000 --- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java +++ /dev/null @@ -1,269 +0,0 @@ -package org.apache.cassandra.io; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.junit.Test; - -import static org.junit.Assert.*; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SequentialWriter; - -public class RandomAccessReaderTest -{ - @Test - public void testReadFully() throws IOException - { - testReadImpl(1, 0); - } - - @Test - public void testReadLarge() throws IOException - { - testReadImpl(1000, 0); - } - - @Test - public void testReadLargeWithSkip() throws IOException - { - testReadImpl(1000, 322); - } - - @Test - public void testReadBufferSizeNotAligned() throws IOException - { - testReadImpl(1000, 0, 5122); - } - - private void testReadImpl(int numIterations, int skipIterations) throws IOException - { - testReadImpl(numIterations, skipIterations, RandomAccessReader.DEFAULT_BUFFER_SIZE); - } - - private void testReadImpl(int numIterations, int skipIterations, int bufferSize) throws IOException - { - final File f = File.createTempFile("testReadFully", "1"); - final String expected = "The quick brown fox jumps over the lazy dog"; - - SequentialWriter writer = SequentialWriter.open(f); - for (int i = 0; i < numIterations; i++) - writer.write(expected.getBytes()); - writer.finish(); - - assert f.exists(); - - ChannelProxy channel = new ChannelProxy(f); - RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L); - assertEquals(f.getAbsolutePath(), reader.getPath()); - assertEquals(expected.length() * numIterations, reader.length()); - - if (skipIterations > 0) - { - reader.seek(skipIterations * expected.length()); - } - - byte[] b = new byte[expected.length()]; - int n = numIterations - skipIterations; - for (int i = 0; i < n; i++) - { - reader.readFully(b); - assertEquals(expected, new String(b)); - } - - assertTrue(reader.isEOF()); - assertEquals(0, reader.bytesRemaining()); - - reader.close(); - channel.close(); - } - - @Test - public void testReadBytes() throws IOException - { - File f = File.createTempFile("testReadBytes", "1"); - final String expected = "The quick brown fox jumps over the lazy dog"; - - SequentialWriter writer = SequentialWriter.open(f); - writer.write(expected.getBytes()); - writer.finish(); - - assert f.exists(); - - ChannelProxy channel = new ChannelProxy(f); - RandomAccessReader reader = RandomAccessReader.open(channel); - assertEquals(f.getAbsolutePath(), reader.getPath()); - assertEquals(expected.length(), reader.length()); - - ByteBuffer b = reader.readBytes(expected.length()); - assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); - - assertTrue(reader.isEOF()); - assertEquals(0, reader.bytesRemaining()); - - reader.close(); - channel.close(); - } - - @Test - public void testReset() throws IOException - { - File f = File.createTempFile("testMark", "1"); - final String expected = "The quick brown fox jumps over the lazy dog"; - final int numIterations = 10; - - SequentialWriter writer = SequentialWriter.open(f); - for (int i = 0; i < numIterations; i++) - writer.write(expected.getBytes()); - writer.finish(); - - assert f.exists(); - - ChannelProxy channel = new ChannelProxy(f); - RandomAccessReader reader = RandomAccessReader.open(channel); - assertEquals(expected.length() * numIterations, reader.length()); - - ByteBuffer b = reader.readBytes(expected.length()); - assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); - - assertFalse(reader.isEOF()); - assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining()); - - FileMark mark = reader.mark(); - assertEquals(0, reader.bytesPastMark()); - assertEquals(0, reader.bytesPastMark(mark)); - - for (int i = 0; i < (numIterations - 1); i++) - { - b = reader.readBytes(expected.length()); - assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); - } - assertTrue(reader.isEOF()); - assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark()); - assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark)); - - reader.reset(mark); - assertEquals(0, reader.bytesPastMark()); - assertEquals(0, reader.bytesPastMark(mark)); - assertFalse(reader.isEOF()); - for (int i = 0; i < (numIterations - 1); i++) - { - b = reader.readBytes(expected.length()); - assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); - } - - reader.reset(); - assertEquals(0, reader.bytesPastMark()); - assertEquals(0, reader.bytesPastMark(mark)); - assertFalse(reader.isEOF()); - for (int i = 0; i < (numIterations - 1); i++) - { - b = reader.readBytes(expected.length()); - assertEquals(expected, new String(b.array(), Charset.forName("UTF-8"))); - } - - assertTrue(reader.isEOF()); - reader.close(); - channel.close(); - } - - @Test - public void testSeekSingleThread() throws IOException, InterruptedException - { - testSeek(1); - } - - @Test - public void testSeekMultipleThreads() throws IOException, InterruptedException - { - testSeek(10); - } - - private void testSeek(int numThreads) throws IOException, InterruptedException - { - final File f = File.createTempFile("testMark", "1"); - final String[] expected = new String[10]; - int len = 0; - for (int i = 0; i < expected.length; i++) - { - expected[i] = UUID.randomUUID().toString(); - len += expected[i].length(); - } - final int totalLength = len; - - SequentialWriter writer = SequentialWriter.open(f); - for (int i = 0; i < expected.length; i++) - writer.write(expected[i].getBytes()); - writer.finish(); - - assert f.exists(); - - final ChannelProxy channel = new ChannelProxy(f); - - final Runnable worker = new Runnable() { - - @Override - public void run() - { - try - { - RandomAccessReader reader = RandomAccessReader.open(channel); - assertEquals(totalLength, reader.length()); - - ByteBuffer b = reader.readBytes(expected[0].length()); - assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8"))); - - assertFalse(reader.isEOF()); - assertEquals(totalLength - expected[0].length(), reader.bytesRemaining()); - - long filePointer = reader.getFilePointer(); - - for (int i = 1; i < expected.length; i++) - { - b = reader.readBytes(expected[i].length()); - assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8"))); - } - assertTrue(reader.isEOF()); - - reader.seek(filePointer); - assertFalse(reader.isEOF()); - for (int i = 1; i < expected.length; i++) - { - b = reader.readBytes(expected[i].length()); - assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8"))); - } - - assertTrue(reader.isEOF()); - reader.close(); - } - catch (Exception ex) - { - ex.printStackTrace(); - fail(ex.getMessage()); - } - } - }; - - if(numThreads == 1) - { - worker.run(); - return; - } - - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - for (int i = 0; i < numThreads; i++) - executor.submit(worker); - - executor.shutdown(); - executor.awaitTermination(1, TimeUnit.MINUTES); - - channel.close(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 8f94cf2..9154d79 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.MmappedRegions; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.schema.CompressionParams; @@ -39,6 +40,8 @@ import org.apache.cassandra.utils.SyncUtil; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; public class CompressedRandomAccessReaderTest { @@ -46,16 +49,24 @@ public class CompressedRandomAccessReaderTest public void testResetAndTruncate() throws IOException { // test reset in current buffer or previous one - testResetAndTruncate(File.createTempFile("normal", "1"), false, 10); - testResetAndTruncate(File.createTempFile("normal", "2"), false, CompressionParams.DEFAULT_CHUNK_LENGTH); + testResetAndTruncate(File.createTempFile("normal", "1"), false, false, 10); + testResetAndTruncate(File.createTempFile("normal", "2"), false, false, CompressionParams.DEFAULT_CHUNK_LENGTH); } @Test public void testResetAndTruncateCompressed() throws IOException { // test reset in current buffer or previous one - testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10); - testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParams.DEFAULT_CHUNK_LENGTH); + testResetAndTruncate(File.createTempFile("compressed", "1"), true, false, 10); + testResetAndTruncate(File.createTempFile("compressed", "2"), true, false, CompressionParams.DEFAULT_CHUNK_LENGTH); + } + + @Test + public void testResetAndTruncateCompressedMmap() throws IOException + { + // test reset in current buffer or previous one + testResetAndTruncate(File.createTempFile("compressed_mmap", "1"), true, true, 10); + testResetAndTruncate(File.createTempFile("compressed_mmap", "2"), true, true, CompressionParams.DEFAULT_CHUNK_LENGTH); } @Test @@ -63,87 +74,102 @@ public class CompressedRandomAccessReaderTest { File f = File.createTempFile("compressed6791_", "3"); String filename = f.getAbsolutePath(); - ChannelProxy channel = new ChannelProxy(f); - try + try(ChannelProxy channel = new ChannelProxy(f)) { MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); - CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector); + try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector)) + { - for (int i = 0; i < 20; i++) - writer.write("x".getBytes()); + for (int i = 0; i < 20; i++) + writer.write("x".getBytes()); - FileMark mark = writer.mark(); - // write enough garbage to create new chunks: - for (int i = 0; i < 40; ++i) - writer.write("y".getBytes()); + FileMark mark = writer.mark(); + // write enough garbage to create new chunks: + for (int i = 0; i < 40; ++i) + writer.write("y".getBytes()); - writer.resetAndTruncate(mark); + writer.resetAndTruncate(mark); - for (int i = 0; i < 20; i++) - writer.write("x".getBytes()); - writer.finish(); + for (int i = 0; i < 20; i++) + writer.write("x".getBytes()); + writer.finish(); + } - CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)); - String res = reader.readLine(); - assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); - assertEquals(40, res.length()); + try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, + new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)) + .build()) + { + String res = reader.readLine(); + assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); + assertEquals(40, res.length()); + } } finally { - // cleanup - channel.close(); - if (f.exists()) - f.delete(); + assertTrue(f.delete()); File metadata = new File(filename+ ".metadata"); if (metadata.exists()) metadata.delete(); } } - private void testResetAndTruncate(File f, boolean compressed, int junkSize) throws IOException + private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize) throws IOException { final String filename = f.getAbsolutePath(); - ChannelProxy channel = new ChannelProxy(f); - try + try(ChannelProxy channel = new ChannelProxy(f)) { MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null); - SequentialWriter writer = compressed + try(SequentialWriter writer = compressed ? new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(), sstableMetadataCollector) - : SequentialWriter.open(f); + : SequentialWriter.open(f)) + { + writer.write("The quick ".getBytes()); + FileMark mark = writer.mark(); + writer.write("blue fox jumps over the lazy dog".getBytes()); - writer.write("The quick ".getBytes()); - FileMark mark = writer.mark(); - writer.write("blue fox jumps over the lazy dog".getBytes()); + // write enough to be sure to change chunk + for (int i = 0; i < junkSize; ++i) + { + writer.write((byte) 1); + } - // write enough to be sure to change chunk - for (int i = 0; i < junkSize; ++i) + writer.resetAndTruncate(mark); + writer.write("brown fox jumps over the lazy dog".getBytes()); + writer.finish(); + } + assert f.exists(); + + CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null; + RandomAccessReader.Builder builder = compressed + ? new CompressedRandomAccessReader.Builder(channel, compressionMetadata) + : new RandomAccessReader.Builder(channel); + + if (usemmap) { - writer.write((byte)1); + if (compressed) + builder.regions(MmappedRegions.map(channel, compressionMetadata)); + else + builder.regions(MmappedRegions.map(channel, f.length())); } - writer.resetAndTruncate(mark); - writer.write("brown fox jumps over the lazy dog".getBytes()); - writer.finish(); + try(RandomAccessReader reader = builder.build()) + { + String expected = "The quick brown fox jumps over the lazy dog"; + assertEquals(expected.length(), reader.length()); + byte[] b = new byte[expected.length()]; + reader.readFully(b); + assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\''; + } - assert f.exists(); - RandomAccessReader reader = compressed - ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)) - : RandomAccessReader.open(f); - String expected = "The quick brown fox jumps over the lazy dog"; - assertEquals(expected.length(), reader.length()); - byte[] b = new byte[expected.length()]; - reader.readFully(b); - assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + "'"; + if (usemmap) + builder.regions.close(); } finally { - // cleanup - channel.close(); - if (f.exists()) - f.delete(); + assertTrue(f.delete()); File metadata = new File(filename + ".metadata"); if (compressed && metadata.exists()) metadata.delete(); @@ -161,6 +187,9 @@ public class CompressedRandomAccessReaderTest File metadata = new File(file.getPath() + ".meta"); metadata.deleteOnExit(); + assertTrue(file.createNewFile()); + assertTrue(metadata.createNewFile()); + MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null); try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), CompressionParams.snappy(), sstableMetadataCollector)) { @@ -168,74 +197,74 @@ public class CompressedRandomAccessReaderTest writer.finish(); } - ChannelProxy channel = new ChannelProxy(file); - - // open compression metadata and get chunk information - CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32); - CompressionMetadata.Chunk chunk = meta.chunkFor(0); - - RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta); - // read and verify compressed data - assertEquals(CONTENT, reader.readLine()); - - Random random = new Random(); - RandomAccessFile checksumModifier = null; - - try + try(ChannelProxy channel = new ChannelProxy(file)) { - checksumModifier = new RandomAccessFile(file, "rw"); - byte[] checksum = new byte[4]; - - // seek to the end of the compressed chunk - checksumModifier.seek(chunk.length); - // read checksum bytes - checksumModifier.read(checksum); - // seek back to the chunk end - checksumModifier.seek(chunk.length); - - // lets modify one byte of the checksum on each iteration - for (int i = 0; i < checksum.length; i++) - { - checksumModifier.write(random.nextInt()); - SyncUtil.sync(checksumModifier); // making sure that change was synced with disk + // open compression metadata and get chunk information + CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32); + CompressionMetadata.Chunk chunk = meta.chunkFor(0); + + try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, meta).build()) + {// read and verify compressed data + assertEquals(CONTENT, reader.readLine()); - final RandomAccessReader r = CompressedRandomAccessReader.open(channel, meta); + Random random = new Random(); + RandomAccessFile checksumModifier = null; - Throwable exception = null; try { - r.readLine(); + checksumModifier = new RandomAccessFile(file, "rw"); + byte[] checksum = new byte[4]; + + // seek to the end of the compressed chunk + checksumModifier.seek(chunk.length); + // read checksum bytes + checksumModifier.read(checksum); + // seek back to the chunk end + checksumModifier.seek(chunk.length); + + // lets modify one byte of the checksum on each iteration + for (int i = 0; i < checksum.length; i++) + { + checksumModifier.write(random.nextInt()); + SyncUtil.sync(checksumModifier); // making sure that change was synced with disk + + try (final RandomAccessReader r = new CompressedRandomAccessReader.Builder(channel, meta).build()) + { + Throwable exception = null; + try + { + r.readLine(); + } + catch (Throwable t) + { + exception = t; + } + assertNotNull(exception); + assertSame(exception.getClass(), CorruptSSTableException.class); + assertSame(exception.getCause().getClass(), CorruptBlockException.class); + } + } + + // lets write original checksum and check if we can read data + updateChecksum(checksumModifier, chunk.length, checksum); + + try (RandomAccessReader cr = new CompressedRandomAccessReader.Builder(channel, meta).build()) + { + // read and verify compressed data + assertEquals(CONTENT, cr.readLine()); + // close reader + } } - catch (Throwable t) + finally { - exception = t; + if (checksumModifier != null) + checksumModifier.close(); } - assertNotNull(exception); - assertEquals(exception.getClass(), CorruptSSTableException.class); - assertEquals(exception.getCause().getClass(), CorruptBlockException.class); - - r.close(); } - - // lets write original checksum and check if we can read data - updateChecksum(checksumModifier, chunk.length, checksum); - - reader = CompressedRandomAccessReader.open(channel, meta); - // read and verify compressed data - assertEquals(CONTENT, reader.readLine()); - // close reader - reader.close(); - } - finally - { - channel.close(); - - if (checksumModifier != null) - checksumModifier.close(); } } - private void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException + private static void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException { file.seek(checksumOffset); file.write(checksum);