This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 602a5ee  Fix and optimise partial compressed sstable streaming
602a5ee is described below

commit 602a5eef177ac65020470cb0fcf8d88d820ab888
Author: Aleksey Yeshchenko <[email protected]>
AuthorDate: Mon Dec 16 11:22:38 2019 +0000

    Fix and optimise partial compressed sstable streaming
    
    patch by Aleksey Yeschenko; reviewed by Dinesh Joshi for CASSANDRA-13938
---
 CHANGES.txt                                        |   1 +
 .../streaming/CassandraCompressedStreamReader.java |   6 +-
 .../streaming/CassandraCompressedStreamWriter.java |  67 +++--
 .../db/streaming/CompressedInputStream.java        | 317 +++++++++------------
 .../cassandra/io/compress/CompressionMetadata.java |  20 +-
 .../cassandra/io/util/RebufferingInputStream.java  |  36 ++-
 .../cassandra/distributed/test/RepairTest.java     |  45 ++-
 7 files changed, 259 insertions(+), 233 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f7ecf79..1860e63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Fix and optimise partial compressed sstable streaming (CASSANDRA-13938)
  * Improve error when JVM 11 can't access required modules (CASSANDRA-15468)
  * Better handling of file deletion failures by DiskFailurePolicy 
(CASSANDRA-15143)
  * Prevent read repair mutations from increasing read timeout (CASSANDRA-15442)
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index c362d11..37b1a01 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -85,7 +85,7 @@ public class CassandraCompressedStreamReader extends 
CassandraStreamReader
             int sectionIdx = 0;
             for (SSTableReader.PartitionPositionBounds section : sections)
             {
-                assert cis.getTotalCompressedBytesRead() <= totalSize;
+                assert cis.chunkBytesRead() <= totalSize;
                 long sectionLength = section.upperPosition - 
section.lowerPosition;
 
                 logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
@@ -97,12 +97,12 @@ public class CassandraCompressedStreamReader extends 
CassandraStreamReader
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
+                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.chunkBytesRead(), totalSize);
                 }
                 assert in.getBytesRead() == sectionLength;
             }
             logger.trace("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
-                         session.peer, 
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
+                         session.peer, 
FBUtilities.prettyPrintMemory(cis.chunkBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
             return writer;
         }
         catch (Throwable e)
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index efbccdc..21406b2 100644
--- 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -42,6 +43,7 @@ import org.apache.cassandra.utils.FBUtilities;
 public class CassandraCompressedStreamWriter extends CassandraStreamWriter
 {
     private static final int CHUNK_SIZE = 1 << 16;
+    private static final int CRC_LENGTH = 4;
 
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
 
@@ -63,16 +65,17 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
         try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
         {
             long progress = 0L;
-            // calculate chunks to transfer. we want to send continuous chunks 
altogether.
-            List<SSTableReader.PartitionPositionBounds> sections = 
getTransferSections(compressionInfo.chunks);
+
+            // we want to send continuous chunks together to minimise reads 
from disk and network writes
+            List<Section> sections = 
fuseAdjacentChunks(compressionInfo.chunks);
 
             int sectionIdx = 0;
 
             // stream each of the required sections of the file
-            for (final SSTableReader.PartitionPositionBounds section : 
sections)
+            for (Section section : sections)
             {
                 // length of the section to stream
-                long length = section.upperPosition - section.lowerPosition;
+                long length = section.end - section.start;
 
                 logger.debug("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
 
@@ -81,7 +84,7 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
                 while (bytesTransferred < length)
                 {
                     int toTransfer = (int) Math.min(CHUNK_SIZE, length - 
bytesTransferred);
-                    long position = section.lowerPosition + bytesTransferred;
+                    long position = section.start + bytesTransferred;
 
                     out.writeToChannel(bufferSupplier -> {
                         ByteBuffer outBuffer = bufferSupplier.get(toTransfer);
@@ -111,32 +114,48 @@ public class CassandraCompressedStreamWriter extends 
CassandraStreamWriter
     }
 
     // chunks are assumed to be sorted by offset
-    private List<SSTableReader.PartitionPositionBounds> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
+    private List<Section> fuseAdjacentChunks(CompressionMetadata.Chunk[] 
chunks)
     {
-        List<SSTableReader.PartitionPositionBounds> transferSections = new 
ArrayList<>();
-        SSTableReader.PartitionPositionBounds lastSection = null;
-        for (CompressionMetadata.Chunk chunk : chunks)
+        if (chunks.length == 0)
+            return Collections.emptyList();
+
+        long start = chunks[0].offset;
+        long end = start + chunks[0].length + CRC_LENGTH;
+
+        List<Section> sections = new ArrayList<>();
+
+        for (int i = 1; i < chunks.length; i++)
         {
-            if (lastSection != null)
+            CompressionMetadata.Chunk chunk = chunks[i];
+
+            if (chunk.offset == end)
             {
-                if (chunk.offset == lastSection.upperPosition)
-                {
-                    // extend previous section to end of this chunk
-                    lastSection = new 
SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
-                }
-                else
-                {
-                    transferSections.add(lastSection);
-                    lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
-                }
+                end += (chunk.length + CRC_LENGTH);
             }
             else
             {
-                lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
+                sections.add(new Section(start, end));
+
+                start = chunk.offset;
+                end = start + chunk.length + CRC_LENGTH;
             }
         }
-        if (lastSection != null)
-            transferSections.add(lastSection);
-        return transferSections;
+        sections.add(new Section(start, end));
+
+        return sections;
+    }
+
+    // [start, end) positions in the compressed sstable file that we want to 
stream;
+    // each section contains 1..n adjacent compressed chunks in it.
+    private static class Section
+    {
+        private final long start;
+        private final long end;
+
+        private Section(long start, long end)
+        {
+            this.start = start;
+            this.end = end;
+        }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java 
b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index c0278e8..b8626ff 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -20,266 +20,215 @@ package org.apache.cassandra.db.streaming;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
 import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.DoubleSupplier;
 
 import com.google.common.collect.Iterators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.primitives.Ints;
 
-import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RebufferingInputStream;
-import 
org.apache.cassandra.db.streaming.CassandraStreamReader.StreamDeserializer;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.WrappedRunnable;
+
+import static java.lang.Math.max;
+import static java.lang.String.format;
 
 /**
- * InputStream which reads data from underlining source with given {@link 
CompressionInfo}. Uses {@link #buffer} as a buffer
- * for uncompressed data (which is read by stream consumers - {@link 
StreamDeserializer} in this case).
+ * InputStream which reads compressed chunks from the underlying input stream 
and deals with decompression
+ * and position tracking.
+ *
+ * The underlying input will be an instance of {@link RebufferingInputStream} 
except in some unit tests.
+ *
+ * Compressed chunks transferred will be a subset of all chunks in the source 
streamed sstable - just enough to
+ * deserialize the requested partition position ranges. Correctness of the 
entire operation depends on provided
+ * partition position ranges and compressed chunks properly matching, and 
there is no way on the receiving side to
+ * verify if that's the case, which arguably makes this a little brittle.
  */
 public class CompressedInputStream extends RebufferingInputStream implements 
AutoCloseable
 {
+    private static final double GROWTH_FACTOR = 1.5;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedInputStream.class);
-
-    private final CompressionInfo info;
-    // chunk buffer
-    private final BlockingQueue<ByteBuffer> dataBuffer;
-    private final DoubleSupplier crcCheckChanceSupplier;
+    private final DataInputPlus input;
 
-    /**
-     * The base offset of the current {@link #buffer} from the beginning of 
the stream.
-     */
-    private long bufferOffset = 0;
-
-    /**
-     * The current {@link CassandraCompressedStreamReader#sections} offset in 
the stream.
-     */
-    private long current = 0;
+    private final Iterator<CompressionMetadata.Chunk> compressedChunks;
+    private final CompressionParams compressionParams;
 
     private final ChecksumType checksumType;
-
-    private static final int CHECKSUM_LENGTH = 4;
+    private final DoubleSupplier validateChecksumChance;
 
     /**
-     * Indicates there was a problem when reading from source stream.
-     * When this is added to the <code>dataBuffer</code> by the stream Reader,
-     * it is expected that the <code>readException</code> variable is populated
-     * with the cause of the error when reading from source stream, so it is
-     * thrown to the consumer on subsequent read operation.
+     * The base offset of the current {@link #buffer} into the original 
sstable as if it were uncompressed.
      */
-    private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
-
-    private volatile IOException readException = null;
-
-    private long totalCompressedBytesRead;
+    private long uncompressedChunkPosition = Long.MIN_VALUE;
 
     /**
-     * @param source Input source to read compressed data from
-     * @param info Compression info
+     * @param input Input input to read compressed data from
+     * @param compressionInfo Compression info
      */
-    public CompressedInputStream(DataInputPlus source, CompressionInfo info, 
ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier)
+    public CompressedInputStream(DataInputPlus input,
+                                 CompressionInfo compressionInfo,
+                                 ChecksumType checksumType,
+                                 DoubleSupplier validateChecksumChance)
     {
-        super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
-        buffer.limit(buffer.position()); // force the buffer to appear 
"consumed" so that it triggers reBuffer on the first read
-        this.info = info;
-        this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
-        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+        
super(ByteBuffer.allocateDirect(compressionInfo.parameters.chunkLength()));
+        buffer.limit(0);
+
+        this.input = input;
         this.checksumType = checksumType;
+        this.validateChecksumChance = validateChecksumChance;
 
-        new FastThreadLocalThread(new Reader(source, info, 
dataBuffer)).start();
+        compressionParams = compressionInfo.parameters;
+        compressedChunks = Iterators.forArray(compressionInfo.chunks);
+        compressedChunk = 
ByteBuffer.allocateDirect(compressionParams.chunkLength());
     }
 
     /**
-     * Invoked when crossing into the next stream boundary in {@link 
CassandraCompressedStreamReader#sections}.
+     * Invoked when crossing into the next {@link 
SSTableReader.PartitionPositionBounds} section
+     * in {@link CassandraCompressedStreamReader#read(DataInputPlus)}.
+     * Will skip 1..n compressed chunks of the original sstable.
      */
     public void position(long position) throws IOException
     {
-        if (readException != null)
-            throw readException;
-
-        assert position >= current : "stream can only read forward.";
-        current = position;
+        if (position < uncompressedChunkPosition + buffer.position())
+            throw new IllegalStateException("stream can only move forward");
 
-        if (current > bufferOffset + buffer.limit())
-            reBuffer(false);
+        if (position >= uncompressedChunkPosition + buffer.limit())
+        {
+            loadNextChunk();
+            // uncompressedChunkPosition = position - (position % 
compressionParams.chunkLength())
+            uncompressedChunkPosition = position & 
-compressionParams.chunkLength();
+        }
 
-        buffer.position((int)(current - bufferOffset));
+        buffer.position(Ints.checkedCast(position - 
uncompressedChunkPosition));
     }
 
+    @Override
     protected void reBuffer() throws IOException
     {
-        reBuffer(true);
+        if (uncompressedChunkPosition < 0)
+            throw new IllegalStateException("position(long position) wasn't 
called first");
+
+        /*
+         * reBuffer() will only be called if a partition range spanning 
multiple (adjacent) compressed chunks
+         * has consumed the current uncompressed buffer, and needs to move to 
the next adjacent chunk;
+         * uncompressedChunkPosition in this scenario *always* increases by 
the fixed chunk length.
+         */
+        loadNextChunk();
+        uncompressedChunkPosition += compressionParams.chunkLength();
     }
 
-    private void reBuffer(boolean updateCurrent) throws IOException
+    /**
+     * Reads the next chunk, decompresses if necessary, and probabilistically 
verifies the checksum/CRC.
+     *
+     * Doesn't adjust uncompressedChunkPosition - it's up to the caller to do 
so.
+     */
+    private void loadNextChunk() throws IOException
     {
-        if (readException != null)
-        {
-            FileUtils.clean(buffer);
-            buffer = null;
-            throw readException;
-        }
+        if (!compressedChunks.hasNext())
+            throw new EOFException();
 
-        // increment the offset into the stream based on the current buffer's 
read count
-        if (updateCurrent)
-            current += buffer.position();
+        int chunkLength = compressedChunks.next().length;
+        chunkBytesRead += (chunkLength + 4); // chunk length + checksum or CRC 
length
 
-        try
+        /*
+         * uncompress if the buffer size is less than the max chunk size; 
else, if the buffer size is greater than
+         * or equal to the maxCompressedLength, we assume the buffer is not 
compressed (see CASSANDRA-10520)
+         */
+        if (chunkLength < compressionParams.maxCompressedLength())
         {
-            ByteBuffer compressedWithCRC = dataBuffer.take();
-            if (compressedWithCRC == POISON_PILL)
+            if (compressedChunk.capacity() < chunkLength)
             {
-                assert readException != null;
-                throw readException;
+                // with poorly compressible data, it's possible for a 
compressed chunk to be larger than
+                // configured uncompressed chunk size - depending on data, 
min_compress_ratio, and compressor;
+                // we may need to resize the compressed buffer.
+                FileUtils.clean(compressedChunk);
+                compressedChunk = ByteBuffer.allocateDirect(max((int) 
(compressedChunk.capacity() * GROWTH_FACTOR), chunkLength));
             }
 
-            decompress(compressedWithCRC);
-        }
-        catch (InterruptedException e)
-        {
-            throw new EOFException("No chunk available");
-        }
-    }
+            compressedChunk.position(0).limit(chunkLength);
+            readChunk(compressedChunk);
+            compressedChunk.position(0);
 
-    private void decompress(ByteBuffer compressed) throws IOException
-    {
-        int length = compressed.remaining();
+            maybeValidateChecksum(compressedChunk, input.readInt());
 
-        // uncompress if the buffer size is less than the max chunk size. 
else, if the buffer size is greater than or equal to the maxCompressedLength,
-        // we assume the buffer is not compressed. see CASSANDRA-10520
-        final boolean releaseCompressedBuffer;
-        if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength())
-        {
             buffer.clear();
-            compressed.limit(length - CHECKSUM_LENGTH);
-            info.parameters.getSstableCompressor().uncompress(compressed, 
buffer);
+            
compressionParams.getSstableCompressor().uncompress(compressedChunk, buffer);
             buffer.flip();
-            releaseCompressedBuffer = true;
         }
         else
         {
-            FileUtils.clean(buffer);
-            buffer = compressed;
-            buffer.limit(length - CHECKSUM_LENGTH);
-            releaseCompressedBuffer = false;
-        }
-        totalCompressedBytesRead += length;
-
-        // validate crc randomly
-        double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble();
-        if (crcCheckChance >= 1d ||
-            (crcCheckChance > 0d && crcCheckChance > 
ThreadLocalRandom.current().nextDouble()))
-        {
-            ByteBuffer crcBuf = compressed.duplicate();
-            crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
-            int checksum = (int) checksumType.of(crcBuf);
+            buffer.position(0).limit(chunkLength);
+            readChunk(buffer);
+            buffer.position(0);
 
-            crcBuf.limit(length);
-            if (crcBuf.getInt() != checksum)
-                throw new IOException("CRC unmatched");
+            maybeValidateChecksum(buffer, input.readInt());
         }
+    }
+    private ByteBuffer compressedChunk;
 
-        if (releaseCompressedBuffer)
-            FileUtils.clean(compressed);
+    private void readChunk(ByteBuffer dst) throws IOException
+    {
+        if (input instanceof RebufferingInputStream)
+            ((RebufferingInputStream) input).readFully(dst);
+        else
+            readChunkSlow(dst);
+    }
+
+    // slow path that involves an intermediate copy into a byte array; only 
used by some of the unit tests
+    private void readChunkSlow(ByteBuffer dst) throws IOException
+    {
+        if (copyArray == null)
+            copyArray = new byte[dst.remaining()];
+        else if (copyArray.length < dst.remaining())
+            copyArray = new byte[max((int)(copyArray.length * GROWTH_FACTOR), 
dst.remaining())];
 
-        // buffer offset is always aligned
-        final int compressedChunkLength = info.parameters.chunkLength();
-        bufferOffset = current & ~(compressedChunkLength - 1);
+        input.readFully(copyArray, 0, dst.remaining());
+        dst.put(copyArray, 0, dst.remaining());
     }
+    private byte[] copyArray;
 
-    public long getTotalCompressedBytesRead()
+    private void maybeValidateChecksum(ByteBuffer buffer, int 
expectedChecksum) throws IOException
     {
-        return totalCompressedBytesRead;
+        double validateChance = validateChecksumChance.getAsDouble();
+
+        if (validateChance >= 1.0d || (validateChance > 0.0d && validateChance 
> ThreadLocalRandom.current().nextDouble()))
+        {
+            int position = buffer.position();
+            int actualChecksum = (int) checksumType.of(buffer);
+            buffer.position(position); // checksum calculation consumes the 
buffer, so we must reset its position afterwards
+
+            if (expectedChecksum != actualChecksum)
+                throw new IOException(format("Checksum didn't match (expected: 
%d, actual: %d)", expectedChecksum, actualChecksum));
+        }
     }
 
-    /**
-     * {@inheritDoc}
-     *
-     * Releases the resources specific to this instance, but not the {@link 
DataInputPlus} that is used by the {@link Reader}.
-     */
     @Override
     public void close()
     {
-        if (buffer != null)
+        if (null != buffer)
         {
             FileUtils.clean(buffer);
             buffer = null;
         }
-    }
 
-    class Reader extends WrappedRunnable
-    {
-        private final DataInputPlus source;
-        private final Iterator<CompressionMetadata.Chunk> chunks;
-        private final BlockingQueue<ByteBuffer> dataBuffer;
-
-        Reader(DataInputPlus source, CompressionInfo info, 
BlockingQueue<ByteBuffer> dataBuffer)
+        if (null != compressedChunk)
         {
-            this.source = source;
-            this.chunks = Iterators.forArray(info.chunks);
-            this.dataBuffer = dataBuffer;
+            FileUtils.clean(compressedChunk);
+            compressedChunk = null;
         }
+    }
 
-        protected void runMayThrow() throws Exception
-        {
-            byte[] tmp = null;
-            while (chunks.hasNext())
-            {
-                CompressionMetadata.Chunk chunk = chunks.next();
-
-                int readLength = chunk.length + 4; // read with CRC
-                ByteBuffer compressedWithCRC = null;
-                try
-                {
-                    final int r;
-                    if (source instanceof ReadableByteChannel)
-                    {
-                        compressedWithCRC = 
ByteBuffer.allocateDirect(readLength);
-                        r = 
((ReadableByteChannel)source).read(compressedWithCRC);
-                        compressedWithCRC.flip();
-                    }
-                    else
-                    {
-                        // read into an on-heap araay, then copy over to an 
off-heap buffer. at a minumum snappy requires
-                        // off-heap buffers for decompression, else we could 
have just wrapped the plain byte array in a ByteBuffer
-                        if (tmp == null || tmp.length < 
info.parameters.chunkLength() + CHECKSUM_LENGTH)
-                            tmp = new byte[info.parameters.chunkLength() + 
CHECKSUM_LENGTH];
-                        source.readFully(tmp, 0, readLength);
-                        compressedWithCRC = 
ByteBuffer.allocateDirect(readLength);
-                        compressedWithCRC.put(tmp, 0, readLength);
-                        compressedWithCRC.position(0);
-                        r = readLength;
-                    }
-
-                    if (r < 0)
-                    {
-                        FileUtils.clean(compressedWithCRC);
-                        readException = new EOFException("No chunk available");
-                        dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
-                    }
-                }
-                catch (IOException e)
-                {
-                    if (!(e instanceof EOFException))
-                        logger.warn("Error while reading compressed input 
stream.", e);
-                    if (compressedWithCRC != null)
-                        FileUtils.clean(compressedWithCRC);
-
-                    readException = e;
-                    dataBuffer.put(POISON_PILL);
-                    return; // throw exception where we consume dataBuffer
-                }
-                dataBuffer.put(compressedWithCRC);
-            }
-        }
+    /**
+     * @return accumulated size of all chunks read so far - including checksums
+     */
+    long chunkBytesRead()
+    {
+        return chunkBytesRead;
     }
+    private long chunkBytesRead = 0;
 }
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index e6ef5c3..d783ba4 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -269,8 +269,11 @@ public class CompressionMetadata
         for (SSTableReader.PartitionPositionBounds section : sections)
         {
             int startIndex = (int) (section.lowerPosition / 
parameters.chunkLength());
+
             int endIndex = (int) (section.upperPosition / 
parameters.chunkLength());
-            endIndex = section.upperPosition % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
+            if (section.upperPosition % parameters.chunkLength() == 0)
+                endIndex--;
+
             for (int i = startIndex; i <= endIndex; i++)
             {
                 long offset = i * 8L;
@@ -295,18 +298,16 @@ public class CompressionMetadata
     public Chunk[] 
getChunksForSections(Collection<SSTableReader.PartitionPositionBounds> sections)
     {
         // use SortedSet to eliminate duplicates and sort by chunk offset
-        SortedSet<Chunk> offsets = new TreeSet<Chunk>(new Comparator<Chunk>()
-        {
-            public int compare(Chunk o1, Chunk o2)
-            {
-                return Longs.compare(o1.offset, o2.offset);
-            }
-        });
+        SortedSet<Chunk> offsets = new TreeSet<>((o1, o2) -> 
Longs.compare(o1.offset, o2.offset));
+
         for (SSTableReader.PartitionPositionBounds section : sections)
         {
             int startIndex = (int) (section.lowerPosition / 
parameters.chunkLength());
+
             int endIndex = (int) (section.upperPosition / 
parameters.chunkLength());
-            endIndex = section.upperPosition % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
+            if (section.upperPosition % parameters.chunkLength() == 0)
+                endIndex--;
+
             for (int i = startIndex; i <= endIndex; i++)
             {
                 long offset = i * 8L;
@@ -317,6 +318,7 @@ public class CompressionMetadata
                 offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset - 
chunkOffset - 4))); // "4" bytes reserved for checksum
             }
         }
+
         return offsets.toArray(new Chunk[offsets.size()]);
     }
 
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java 
b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index ef51888..18cabd3 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -31,6 +31,8 @@ import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import static java.lang.Math.min;
+
 /**
  * Rough equivalent of BufferedInputStream and DataInputStream wrapping a 
ByteBuffer that can be refilled
  * via rebuffer. Implementations provide this buffer from various channels 
(socket, file, memory, etc).
@@ -91,7 +93,7 @@ public abstract class RebufferingInputStream extends 
InputStream implements Data
                 if (remaining == 0)
                     return copied == 0 ? -1 : copied;
             }
-            int toCopy = Math.min(len - copied, remaining);
+            int toCopy = min(len - copied, remaining);
             FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
             buffer.position(position + toCopy);
             copied += toCopy;
@@ -100,6 +102,38 @@ public abstract class RebufferingInputStream extends 
InputStream implements Data
         return copied;
     }
 
+    /**
+     * Equivalent to {@link #read(byte[], int, int)}, where offset is {@code 
dst.position()} and length is {@code dst.remaining()}
+     */
+    public void readFully(ByteBuffer dst) throws IOException
+    {
+        int offset = dst.position();
+        int len = dst.limit() - offset;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            int position = buffer.position();
+            int remaining = buffer.limit() - position;
+
+            if (remaining == 0)
+            {
+                reBuffer();
+
+                position = buffer.position();
+                remaining = buffer.limit() - position;
+
+                if (remaining == 0)
+                    throw new EOFException("EOF after " + copied + " bytes out 
of " + len);
+            }
+
+            int toCopy = min(len - copied, remaining);
+            FastByteOperations.copy(buffer, position, dst, offset + copied, 
toCopy);
+            buffer.position(position + toCopy);
+            copied += toCopy;
+        }
+    }
+
     @DontInline
     protected long readPrimitiveSlowly(int bytes) throws IOException
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index a4312a6..0460dec 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
@@ -84,8 +83,7 @@ public class RepairTest extends DistributedTestBase
                         .with(GOSSIP)
         );
 
-        Cluster cluster = 
init(Cluster.build(3).withConfig(configModifier).start());
-        return cluster;
+        return init(Cluster.build(3).withConfig(configModifier).start());
     }
 
     private void repair(Cluster cluster, Map<String, String> options)
@@ -100,13 +98,12 @@ public class RepairTest extends DistributedTestBase
         }));
     }
 
-    void populate(Cluster cluster, boolean compression)
+    void populate(Cluster cluster, String compression)
     {
         try
         {
             cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS 
%s.test;"));
-            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, 
c1 text, c2 text, PRIMARY KEY (k))") +
-                                 (compression == false ? " WITH compression = 
{'enabled' : false};" : ";"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, 
c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression));
 
             insert(cluster,    0, 1000, 1, 2, 3);
             flush(cluster, 1);
@@ -126,7 +123,7 @@ public class RepairTest extends DistributedTestBase
 
     }
 
-    void simpleRepair(Cluster cluster, boolean sequential, boolean 
compression) throws IOException
+    void repair(Cluster cluster, boolean sequential, String compression)
     {
         populate(cluster, compression);
         repair(cluster, ImmutableMap.of("parallelism", sequential ? 
"sequential" : "parallel"));
@@ -146,15 +143,39 @@ public class RepairTest extends DistributedTestBase
             cluster.close();
     }
 
-    @Ignore("Test requires CASSANDRA-13938 to be merged")
-    public void testSimpleSequentialRepairDefaultCompression() throws 
IOException
+    @Test
+    public void testSequentialRepairWithDefaultCompression()
+    {
+        repair(cluster, true, "{'class': 
'org.apache.cassandra.io.compress.LZ4Compressor'}");
+    }
+
+    @Test
+    public void testParallelRepairWithDefaultCompression()
+    {
+        repair(cluster, false, "{'class': 
'org.apache.cassandra.io.compress.LZ4Compressor'}");
+    }
+
+    @Test
+    public void testSequentialRepairWithMinCompressRatio()
+    {
+        repair(cluster, true, "{'class': 
'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
+    }
+
+    @Test
+    public void testParallelRepairWithMinCompressRatio()
+    {
+        repair(cluster, false, "{'class': 
'org.apache.cassandra.io.compress.LZ4Compressor', 'min_compress_ratio': 4.0}");
+    }
+
+    @Test
+    public void testSequentialRepairWithoutCompression()
     {
-        simpleRepair(cluster, true, true);
+        repair(cluster, true, "{'enabled': false}");
     }
 
     @Test
-    public void testSimpleSequentialRepairCompressionOff() throws IOException
+    public void testParallelRepairWithoutCompression()
     {
-        simpleRepair(cluster, true, false);
+        repair(cluster, false, "{'enabled': false}");
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to