This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 602a5ee Fix and optimise partial compressed sstable streaming
602a5ee is described below
commit 602a5eef177ac65020470cb0fcf8d88d820ab888
Author: Aleksey Yeshchenko <[email protected]>
AuthorDate: Mon Dec 16 11:22:38 2019 +0000
Fix and optimise partial compressed sstable streaming
patch by Aleksey Yeschenko; reviewed by Dinesh Joshi for CASSANDRA-13938
---
CHANGES.txt | 1 +
.../streaming/CassandraCompressedStreamReader.java | 6 +-
.../streaming/CassandraCompressedStreamWriter.java | 67 +++--
.../db/streaming/CompressedInputStream.java | 317 +++++++++------------
.../cassandra/io/compress/CompressionMetadata.java | 20 +-
.../cassandra/io/util/RebufferingInputStream.java | 36 ++-
.../cassandra/distributed/test/RepairTest.java | 45 ++-
7 files changed, 259 insertions(+), 233 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f7ecf79..1860e63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha3
+ * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
* Improve error when JVM 11 can't access required modules (CASSANDRA-15468)
* Better handling of file deletion failures by DiskFailurePolicy
(CASSANDRA-15143)
* Prevent read repair mutations from increasing read timeout (CASSANDRA-15442)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index c362d11..37b1a01 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -85,7 +85,7 @@ public class CassandraCompressedStreamReader extends
CassandraStreamReader
int sectionIdx = 0;
for (SSTableReader.PartitionPositionBounds section : sections)
{
- assert cis.getTotalCompressedBytesRead() <= totalSize;
+ assert cis.chunkBytesRead() <= totalSize;
long sectionLength = section.upperPosition -
section.lowerPosition;
logger.trace("[Stream #{}] Reading section {} with length {}
from stream.", session.planId(), sectionIdx++, sectionLength);
@@ -97,12 +97,12 @@ public class CassandraCompressedStreamReader extends
CassandraStreamReader
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed
chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename, ProgressInfo.Direction.IN,
cis.getTotalCompressedBytesRead(), totalSize);
+ session.progress(filename, ProgressInfo.Direction.IN,
cis.chunkBytesRead(), totalSize);
}
assert in.getBytesRead() == sectionLength;
}
logger.trace("[Stream #{}] Finished receiving file #{} from {}
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
- session.peer,
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()),
FBUtilities.prettyPrintMemory(totalSize));
+ session.peer,
FBUtilities.prettyPrintMemory(cis.chunkBytesRead()),
FBUtilities.prettyPrintMemory(totalSize));
return writer;
}
catch (Throwable e)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index efbccdc..21406b2 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
@@ -42,6 +43,7 @@ import org.apache.cassandra.utils.FBUtilities;
public class CassandraCompressedStreamWriter extends CassandraStreamWriter
{
private static final int CHUNK_SIZE = 1 << 16;
+ private static final int CRC_LENGTH = 4;
private static final Logger logger =
LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
@@ -63,16 +65,17 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
{
long progress = 0L;
- // calculate chunks to transfer. we want to send continuous chunks
altogether.
- List<SSTableReader.PartitionPositionBounds> sections =
getTransferSections(compressionInfo.chunks);
+
+ // we want to send continuous chunks together to minimise reads
from disk and network writes
+ List<Section> sections =
fuseAdjacentChunks(compressionInfo.chunks);
int sectionIdx = 0;
// stream each of the required sections of the file
- for (final SSTableReader.PartitionPositionBounds section :
sections)
+ for (Section section : sections)
{
// length of the section to stream
- long length = section.upperPosition - section.lowerPosition;
+ long length = section.end - section.start;
logger.debug("[Stream #{}] Writing section {} with length {}
to stream.", session.planId(), sectionIdx++, length);
@@ -81,7 +84,7 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
while (bytesTransferred < length)
{
int toTransfer = (int) Math.min(CHUNK_SIZE, length -
bytesTransferred);
- long position = section.lowerPosition + bytesTransferred;
+ long position = section.start + bytesTransferred;
out.writeToChannel(bufferSupplier -> {
ByteBuffer outBuffer = bufferSupplier.get(toTransfer);
@@ -111,32 +114,48 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
}
// chunks are assumed to be sorted by offset
- private List<SSTableReader.PartitionPositionBounds>
getTransferSections(CompressionMetadata.Chunk[] chunks)
+ private List<Section> fuseAdjacentChunks(CompressionMetadata.Chunk[]
chunks)
{
- List<SSTableReader.PartitionPositionBounds> transferSections = new
ArrayList<>();
- SSTableReader.PartitionPositionBounds lastSection = null;
- for (CompressionMetadata.Chunk chunk : chunks)
+ if (chunks.length == 0)
+ return Collections.emptyList();
+
+ long start = chunks[0].offset;
+ long end = start + chunks[0].length + CRC_LENGTH;
+
+ List<Section> sections = new ArrayList<>();
+
+ for (int i = 1; i < chunks.length; i++)
{
- if (lastSection != null)
+ CompressionMetadata.Chunk chunk = chunks[i];
+
+ if (chunk.offset == end)
{
- if (chunk.offset == lastSection.upperPosition)
- {
- // extend previous section to end of this chunk
- lastSection = new
SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset +
chunk.length + 4); // 4 bytes for CRC
- }
- else
- {
- transferSections.add(lastSection);
- lastSection = new
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length
+ 4);
- }
+ end += (chunk.length + CRC_LENGTH);
}
else
{
- lastSection = new
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length
+ 4);
+ sections.add(new Section(start, end));
+
+ start = chunk.offset;
+ end = start + chunk.length + CRC_LENGTH;
}
}
- if (lastSection != null)
- transferSections.add(lastSection);
- return transferSections;
+ sections.add(new Section(start, end));
+
+ return sections;
+ }
+
+ // [start, end) positions in the compressed sstable file that we want to
stream;
+ // each section contains 1..n adjacent compressed chunks in it.
+ private static class Section
+ {
+ private final long start;
+ private final long end;
+
+ private Section(long start, long end)
+ {
+ this.start = start;
+ this.end = end;
+ }
}
}
diff --git
a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index c0278e8..b8626ff 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -20,266 +20,215 @@ package org.apache.cassandra.db.streaming;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.DoubleSupplier;
import com.google.common.collect.Iterators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.primitives.Ints;
-import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RebufferingInputStream;
-import
org.apache.cassandra.db.streaming.CassandraStreamReader.StreamDeserializer;
+import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.WrappedRunnable;
+
+import static java.lang.Math.max;
+import static java.lang.String.format;
/**
- * InputStream which reads data from underlining source with given {@link
CompressionInfo}. Uses {@link #buffer} as a buffer
- * for uncompressed data (which is read by stream consumers - {@link
StreamDeserializer} in this case).
+ * InputStream which reads compressed chunks from the underlying input stream
and deals with decompression
+ * and position tracking.
+ *
+ * The underlying input will be an instance of {@link RebufferingInputStream}
except in some unit tests.
+ *
+ * Compressed chunks transferred will be a subset of all chunks in the source
streamed sstable - just enough to
+ * deserialize the requested partition position ranges. Correctness of the
entire operation depends on provided
+ * partition position ranges and compressed chunks properly matching, and
there is no way on the receiving side to
+ * verify if that's the case, which arguably makes this a little brittle.
*/
public class CompressedInputStream extends RebufferingInputStream implements
AutoCloseable
{
+ private static final double GROWTH_FACTOR = 1.5;
- private static final Logger logger =
LoggerFactory.getLogger(CompressedInputStream.class);
-
- private final CompressionInfo info;
- // chunk buffer
- private final BlockingQueue<ByteBuffer> dataBuffer;
- private final DoubleSupplier crcCheckChanceSupplier;
+ private final DataInputPlus input;
- /**
- * The base offset of the current {@link #buffer} from the beginning of
the stream.
- */
- private long bufferOffset = 0;
-
- /**
- * The current {@link CassandraCompressedStreamReader#sections} offset in
the stream.
- */
- private long current = 0;
+ private final Iterator<CompressionMetadata.Chunk> compressedChunks;
+ private final CompressionParams compressionParams;
private final ChecksumType checksumType;
-
- private static final int CHECKSUM_LENGTH = 4;
+ private final DoubleSupplier validateChecksumChance;
/**
- * Indicates there was a problem when reading from source stream.
- * When this is added to the <code>dataBuffer</code> by the stream Reader,
- * it is expected that the <code>readException</code> variable is populated
- * with the cause of the error when reading from source stream, so it is
- * thrown to the consumer on subsequent read operation.
+ * The base offset of the current {@link #buffer} into the original
sstable as if it were uncompressed.
*/
- private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
-
- private volatile IOException readException = null;
-
- private long totalCompressedBytesRead;
+ private long uncompressedChunkPosition = Long.MIN_VALUE;
/**
- * @param source Input source to read compressed data from
- * @param info Compression info
+ * @param input Input input to read compressed data from
+ * @param compressionInfo Compression info
*/
- public CompressedInputStream(DataInputPlus source, CompressionInfo info,
ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier)
+ public CompressedInputStream(DataInputPlus input,
+ CompressionInfo compressionInfo,
+ ChecksumType checksumType,
+ DoubleSupplier validateChecksumChance)
{
- super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
- buffer.limit(buffer.position()); // force the buffer to appear
"consumed" so that it triggers reBuffer on the first read
- this.info = info;
- this.dataBuffer = new
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+
super(ByteBuffer.allocateDirect(compressionInfo.parameters.chunkLength()));
+ buffer.limit(0);
+
+ this.input = input;
this.checksumType = checksumType;
+ this.validateChecksumChance = validateChecksumChance;
- new FastThreadLocalThread(new Reader(source, info,
dataBuffer)).start();
+ compressionParams = compressionInfo.parameters;
+ compressedChunks = Iterators.forArray(compressionInfo.chunks);
+ compressedChunk =
ByteBuffer.allocateDirect(compressionParams.chunkLength());
}
/**
- * Invoked when crossing into the next stream boundary in {@link
CassandraCompressedStreamReader#sections}.
+ * Invoked when crossing into the next {@link
SSTableReader.PartitionPositionBounds} section
+ * in {@link CassandraCompressedStreamReader#read(DataInputPlus)}.
+ * Will skip 1..n compressed chunks of the original sstable.
*/
public void position(long position) throws IOException
{
- if (readException != null)
- throw readException;
-
- assert position >= current : "stream can only read forward.";
- current = position;
+ if (position < uncompressedChunkPosition + buffer.position())
+ throw new IllegalStateException("stream can only move forward");
- if (current > bufferOffset + buffer.limit())
- reBuffer(false);
+ if (position >= uncompressedChunkPosition + buffer.limit())
+ {
+ loadNextChunk();
+ // uncompressedChunkPosition = position - (position %
compressionParams.chunkLength())
+ uncompressedChunkPosition = position &
-compressionParams.chunkLength();
+ }
- buffer.position((int)(current - bufferOffset));
+ buffer.position(Ints.checkedCast(position -
uncompressedChunkPosition));
}
+ @Override
protected void reBuffer() throws IOException
{
- reBuffer(true);
+ if (uncompressedChunkPosition < 0)
+ throw new IllegalStateException("position(long position) wasn't
called first");
+
+ /*
+ * reBuffer() will only be called if a partition range spanning
multiple (adjacent) compressed chunks
+ * has consumed the current uncompressed buffer, and needs to move to
the next adjacent chunk;
+ * uncompressedChunkPosition in this scenario *always* increases by
the fixed chunk length.
+ */
+ loadNextChunk();
+ uncompressedChunkPosition += compressionParams.chunkLength();
}
- private void reBuffer(boolean updateCurrent) throws IOException
+ /**
+ * Reads the next chunk, decompresses if necessary, and probabilistically
verifies the checksum/CRC.
+ *
+ * Doesn't adjust uncompressedChunkPosition - it's up to the caller to do
so.
+ */
+ private void loadNextChunk() throws IOException
{
- if (readException != null)
- {
- FileUtils.clean(buffer);
- buffer = null;
- throw readException;
- }
+ if (!compressedChunks.hasNext())
+ throw new EOFException();
- // increment the offset into the stream based on the current buffer's
read count
- if (updateCurrent)
- current += buffer.position();
+ int chunkLength = compressedChunks.next().length;
+ chunkBytesRead += (chunkLength + 4); // chunk length + checksum or CRC
length
- try
+ /*
+ * uncompress if the buffer size is less than the max chunk size;
else, if the buffer size is greater than
+ * or equal to the maxCompressedLength, we assume the buffer is not
compressed (see CASSANDRA-10520)
+ */
+ if (chunkLength < compressionParams.maxCompressedLength())
{
- ByteBuffer compressedWithCRC = dataBuffer.take();
- if (compressedWithCRC == POISON_PILL)
+ if (compressedChunk.capacity() < chunkLength)
{
- assert readException != null;
- throw readException;
+ // with poorly compressible data, it's possible for a
compressed chunk to be larger than
+ // configured uncompressed chunk size - depending on data,
min_compress_ratio, and compressor;
+ // we may need to resize the compressed buffer.
+ FileUtils.clean(compressedChunk);
+ compressedChunk = ByteBuffer.allocateDirect(max((int)
(compressedChunk.capacity() * GROWTH_FACTOR), chunkLength));
}
- decompress(compressedWithCRC);
- }
- catch (InterruptedException e)
- {
- throw new EOFException("No chunk available");
- }
- }
+ compressedChunk.position(0).limit(chunkLength);
+ readChunk(compressedChunk);
+ compressedChunk.position(0);
- private void decompress(ByteBuffer compressed) throws IOException
- {
- int length = compressed.remaining();
+ maybeValidateChecksum(compressedChunk, input.readInt());
- // uncompress if the buffer size is less than the max chunk size.
else, if the buffer size is greater than or equal to the maxCompressedLength,
- // we assume the buffer is not compressed. see CASSANDRA-10520
- final boolean releaseCompressedBuffer;
- if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength())
- {
buffer.clear();
- compressed.limit(length - CHECKSUM_LENGTH);
- info.parameters.getSstableCompressor().uncompress(compressed,
buffer);
+
compressionParams.getSstableCompressor().uncompress(compressedChunk, buffer);
buffer.flip();
- releaseCompressedBuffer = true;
}
else
{
- FileUtils.clean(buffer);
- buffer = compressed;
- buffer.limit(length - CHECKSUM_LENGTH);
- releaseCompressedBuffer = false;
- }
- totalCompressedBytesRead += length;
-
- // validate crc randomly
- double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble();
- if (crcCheckChance >= 1d ||
- (crcCheckChance > 0d && crcCheckChance >
ThreadLocalRandom.current().nextDouble()))
- {
- ByteBuffer crcBuf = compressed.duplicate();
- crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
- int checksum = (int) checksumType.of(crcBuf);
+ buffer.position(0).limit(chunkLength);
+ readChunk(buffer);
+ buffer.position(0);
- crcBuf.limit(length);
- if (crcBuf.getInt() != checksum)
- throw new IOException("CRC unmatched");
+ maybeValidateChecksum(buffer, input.readInt());
}
+ }
+ private ByteBuffer compressedChunk;
- if (releaseCompressedBuffer)
- FileUtils.clean(compressed);
+ private void readChunk(ByteBuffer dst) throws IOException
+ {
+ if (input instanceof RebufferingInputStream)
+ ((RebufferingInputStream) input).readFully(dst);
+ else
+ readChunkSlow(dst);
+ }
+
+ // slow path that involves an intermediate copy into a byte array; only
used by some of the unit tests
+ private void readChunkSlow(ByteBuffer dst) throws IOException
+ {
+ if (copyArray == null)
+ copyArray = new byte[dst.remaining()];
+ else if (copyArray.length < dst.remaining())
+ copyArray = new byte[max((int)(copyArray.length * GROWTH_FACTOR),
dst.remaining())];
- // buffer offset is always aligned
- final int compressedChunkLength = info.parameters.chunkLength();
- bufferOffset = current & ~(compressedChunkLength - 1);
+ input.readFully(copyArray, 0, dst.remaining());
+ dst.put(copyArray, 0, dst.remaining());
}
+ private byte[] copyArray;
- public long getTotalCompressedBytesRead()
+ private void maybeValidateChecksum(ByteBuffer buffer, int
expectedChecksum) throws IOException
{
- return totalCompressedBytesRead;
+ double validateChance = validateChecksumChance.getAsDouble();
+
+ if (validateChance >= 1.0d || (validateChance > 0.0d && validateChance
> ThreadLocalRandom.current().nextDouble()))
+ {
+ int position = buffer.position();
+ int actualChecksum = (int) checksumType.of(buffer);
+ buffer.position(position); // checksum calculation consumes the
buffer, so we must reset its position afterwards
+
+ if (expectedChecksum != actualChecksum)
+ throw new IOException(format("Checksum didn't match (expected:
%d, actual: %d)", expectedChecksum, actualChecksum));
+ }
}
- /**
- * {@inheritDoc}
- *
- * Releases the resources specific to this instance, but not the {@link
DataInputPlus} that is used by the {@link Reader}.
- */
@Override
public void close()
{
- if (buffer != null)
+ if (null != buffer)
{
FileUtils.clean(buffer);
buffer = null;
}
- }
- class Reader extends WrappedRunnable
- {
- private final DataInputPlus source;
- private final Iterator<CompressionMetadata.Chunk> chunks;
- private final BlockingQueue<ByteBuffer> dataBuffer;
-
- Reader(DataInputPlus source, CompressionInfo info,
BlockingQueue<ByteBuffer> dataBuffer)
+ if (null != compressedChunk)
{
- this.source = source;
- this.chunks = Iterators.forArray(info.chunks);
- this.dataBuffer = dataBuffer;
+ FileUtils.clean(compressedChunk);
+ compressedChunk = null;
}
+ }
- protected void runMayThrow() throws Exception
- {
- byte[] tmp = null;
- while (chunks.hasNext())
- {
- CompressionMetadata.Chunk chunk = chunks.next();
-
- int readLength = chunk.length + 4; // read with CRC
- ByteBuffer compressedWithCRC = null;
- try
- {
- final int r;
- if (source instanceof ReadableByteChannel)
- {
- compressedWithCRC =
ByteBuffer.allocateDirect(readLength);
- r =
((ReadableByteChannel)source).read(compressedWithCRC);
- compressedWithCRC.flip();
- }
- else
- {
- // read into an on-heap araay, then copy over to an
off-heap buffer. at a minumum snappy requires
- // off-heap buffers for decompression, else we could
have just wrapped the plain byte array in a ByteBuffer
- if (tmp == null || tmp.length <
info.parameters.chunkLength() + CHECKSUM_LENGTH)
- tmp = new byte[info.parameters.chunkLength() +
CHECKSUM_LENGTH];
- source.readFully(tmp, 0, readLength);
- compressedWithCRC =
ByteBuffer.allocateDirect(readLength);
- compressedWithCRC.put(tmp, 0, readLength);
- compressedWithCRC.position(0);
- r = readLength;
- }
-
- if (r < 0)
- {
- FileUtils.clean(compressedWithCRC);
- readException = new EOFException("No chunk available");
- dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
- }
- }
- catch (IOException e)
- {
- if (!(e instanceof EOFException))
- logger.warn("Error while reading compressed input
stream.", e);
- if (compressedWithCRC != null)
- FileUtils.clean(compressedWithCRC);
-
- readException = e;
- dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
- }
- dataBuffer.put(compressedWithCRC);
- }
- }
+ /**
+ * @return accumulated size of all chunks read so far - including checksums
+ */
+ long chunkBytesRead()
+ {
+ return chunkBytesRead;
}
+ private long chunkBytesRead = 0;
}
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index e6ef5c3..d783ba4 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -269,8 +269,11 @@ public class CompressionMetadata
for (SSTableReader.PartitionPositionBounds section : sections)
{
int startIndex = (int) (section.lowerPosition /
parameters.chunkLength());
+
int endIndex = (int) (section.upperPosition /
parameters.chunkLength());
- endIndex = section.upperPosition % parameters.chunkLength() == 0 ?
endIndex - 1 : endIndex;
+ if (section.upperPosition % parameters.chunkLength() == 0)
+ endIndex--;
+
for (int i = startIndex; i <= endIndex; i++)
{
long offset = i * 8L;
@@ -295,18 +298,16 @@ public class CompressionMetadata
public Chunk[]
getChunksForSections(Collection<SSTableReader.PartitionPositionBounds> sections)
{
// use SortedSet to eliminate duplicates and sort by chunk offset
- SortedSet<Chunk> offsets = new TreeSet<Chunk>(new Comparator<Chunk>()
- {
- public int compare(Chunk o1, Chunk o2)
- {
- return Longs.compare(o1.offset, o2.offset);
- }
- });
+ SortedSet<Chunk> offsets = new TreeSet<>((o1, o2) ->
Longs.compare(o1.offset, o2.offset));
+
for (SSTableReader.PartitionPositionBounds section : sections)
{
int startIndex = (int) (section.lowerPosition /
parameters.chunkLength());
+
int endIndex = (int) (section.upperPosition /
parameters.chunkLength());
- endIndex = section.upperPosition % parameters.chunkLength() == 0 ?
endIndex - 1 : endIndex;
+ if (section.upperPosition % parameters.chunkLength() == 0)
+ endIndex--;
+
for (int i = startIndex; i <= endIndex; i++)
{
long offset = i * 8L;
@@ -317,6 +318,7 @@ public class CompressionMetadata
offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset -
chunkOffset - 4))); // "4" bytes reserved for checksum
}
}
+
return offsets.toArray(new Chunk[offsets.size()]);
}
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index ef51888..18cabd3 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -31,6 +31,8 @@ import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.vint.VIntCoding;
+import static java.lang.Math.min;
+
/**
* Rough equivalent of BufferedInputStream and DataInputStream wrapping a
ByteBuffer that can be refilled
* via rebuffer. Implementations provide this buffer from various channels
(socket, file, memory, etc).
@@ -91,7 +93,7 @@ public abstract class RebufferingInputStream extends
InputStream implements Data
if (remaining == 0)
return copied == 0 ? -1 : copied;
}
- int toCopy = Math.min(len - copied, remaining);
+ int toCopy = min(len - copied, remaining);
FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
buffer.position(position + toCopy);
copied += toCopy;
@@ -100,6 +102,38 @@ public abstract class RebufferingInputStream extends
InputStream implements Data
return copied;
}
+ /**
+ * Equivalent to {@link #read(byte[], int, int)}, where offset is {@code
dst.position()} and length is {@code dst.remaining()}
+ */
+ public void readFully(ByteBuffer dst) throws IOException
+ {
+ int offset = dst.position();
+ int len = dst.limit() - offset;
+
+ int copied = 0;
+ while (copied < len)
+ {
+ int position = buffer.position();
+ int remaining = buffer.limit() - position;
+
+ if (remaining == 0)
+ {
+ reBuffer();
+
+ position = buffer.position();
+ remaining = buffer.limit() - position;
+
+ if (remaining == 0)
+ throw new EOFException("EOF after " + copied + " bytes out
of " + len);
+ }
+
+ int toCopy = min(len - copied, remaining);
+ FastByteOperations.copy(buffer, position, dst, offset + copied,
toCopy);
+ buffer.position(position + toCopy);
+ copied += toCopy;
+ }
+ }
+
@DontInline
protected long readPrimitiveSlowly(int bytes) throws IOException
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index a4312a6..0460dec 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
@@ -84,8 +83,7 @@ public class RepairTest extends DistributedTestBase
.with(GOSSIP)
);
- Cluster cluster =
init(Cluster.build(3).withConfig(configModifier).start());
- return cluster;
+ return init(Cluster.build(3).withConfig(configModifier).start());
}
private void repair(Cluster cluster, Map<String, String> options)
@@ -100,13 +98,12 @@ public class RepairTest extends DistributedTestBase
}));
}
- void populate(Cluster cluster, boolean compression)
+ void populate(Cluster cluster, String compression)
{
try
{
cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS
%s.test;"));
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text,
c1 text, c2 text, PRIMARY KEY (k))") +
- (compression == false ? " WITH compression =
{'enabled' : false};" : ";"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text,
c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression));
insert(cluster, 0, 1000, 1, 2, 3);
flush(cluster, 1);
@@ -126,7 +123,7 @@ public class RepairTest extends DistributedTestBase
}
- void simpleRepair(Cluster cluster, boolean sequential, boolean
compression) throws IOException
+ void repair(Cluster cluster, boolean sequential, String compression)
{
populate(cluster, compression);
repair(cluster, ImmutableMap.of("parallelism", sequential ?
"sequential" : "parallel"));
@@ -146,15 +143,39 @@ public class RepairTest extends DistributedTestBase
cluster.close();
}
- @Ignore("Test requires CASSANDRA-13938 to be merged")
- public void testSimpleSequentialRepairDefaultCompression() throws
IOException
+ @Test
+ public void testSequentialRepairWithDefaultCompression()
+ {
+ repair(cluster, true, "{'class':
'org.apache.cassandra.io.compress.LZ4Compressor'}");
+ }
+
+ @Test
+ public void testParallelRepairWithDefaultCompression()
+ {
+ repair(cluster, false, "{'class':
'org.apache.cassandra.io.compress.LZ4Compressor'}");
+ }
+
+ @Test
+ public void testSequentialRepairWithMinCompressRatio()
+ {
+ repair(cluster, true, "{'class':
'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
+ }
+
+ @Test
+ public void testParallelRepairWithMinCompressRatio()
+ {
+ repair(cluster, false, "{'class':
'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
+ }
+
+ @Test
+ public void testSequentialRepairWithoutCompression()
{
- simpleRepair(cluster, true, true);
+ repair(cluster, true, "{'enabled': false}");
}
@Test
- public void testSimpleSequentialRepairCompressionOff() throws IOException
+ public void testParallelRepairWithoutCompression()
{
- simpleRepair(cluster, true, false);
+ repair(cluster, false, "{'enabled': false}");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]