Merge branch cassandra-2.1 into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3ac7937
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3ac7937
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3ac7937

Branch: refs/heads/cassandra-3.11
Commit: b3ac7937edce41a341d1d01c7f3201592e1caa8f
Parents: 2e5e11d 34a1d5d
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Tue Apr 10 09:51:02 2018 +0200
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Tue Apr 10 09:52:18 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedRandomAccessReader.java  | 52 ++++++++++----------
 2 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ac7937/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 527975c,aeb3009..5221b1e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,8 +1,17 @@@
 -2.1.21
 +2.2.13
 + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)
 + * Fix query pager DEBUG log leak causing hit in paged reads throughput 
(CASSANDRA-14318)
 + * Backport circleci yaml (CASSANDRA-14240)
 +Merged from 2.1:
+  * Check checksum before decompressing data (CASSANDRA-14284)
   * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt 
(CASSANDRA-14183)
  
 -2.1.20
 +2.2.12
 + * Fix the inspectJvmOptions startup check (CASSANDRA-14112)
 + * Fix race that prevents submitting compaction for a table when executor is 
full (CASSANDRA-13801)
 + * Rely on the JVM to handle OutOfMemoryErrors (CASSANDRA-13006)
 + * Grab refs during scrub/index redistribution/cleanup (CASSANDRA-13873)
 +Merged from 2.1:
   * Protect against overflow of local expiration time (CASSANDRA-14092)
   * More PEP8 compliance for cqlsh (CASSANDRA-14021)
   * RPM package spec: fix permissions for installed jars and config files 
(CASSANDRA-14181)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3ac7937/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index ccfa5e7,fe90cc9..0fc96ed
--- 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -99,54 -77,7 +99,54 @@@ public class CompressedRandomAccessRead
      {
          try
          {
 -            decompressChunk(metadata.chunkFor(current));
 +            long position = current();
 +            assert position < metadata.dataLength;
 +
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +            if (compressed.capacity() < chunk.length)
 +                compressed = allocateBuffer(chunk.length, 
metadata.compressor().preferredBufferType());
 +            else
 +                compressed.clear();
 +            compressed.limit(chunk.length);
 +
 +            if (channel.read(compressed, chunk.offset) != chunk.length)
 +                throw new CorruptBlockException(getPath(), chunk);
 +            compressed.flip();
 +            buffer.clear();
 +
++            if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
++            {
++                FBUtilities.directCheckSum(checksum, compressed);
++
++                if (checksum(chunk) != (int) checksum.getValue())
++                    throw new CorruptBlockException(getPath(), chunk);
++
++                // reset checksum object back to the original (blank) state
++                checksum.reset();
++                compressed.rewind();
++            }
++
 +            try
 +            {
 +                metadata.compressor().uncompress(compressed, buffer);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +            finally
 +            {
 +                buffer.flip();
 +            }
 +
-             if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
-             {
-                 compressed.rewind();
-                 FBUtilities.directCheckSum(checksum, compressed);
- 
-                 if (checksum(chunk) != (int) checksum.getValue())
-                     throw new CorruptBlockException(getPath(), chunk);
- 
-                 // reset checksum object back to the original (blank) state
-                 checksum.reset();
-             }
- 
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
 +            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
 +            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
 +            if (bufferOffset + buffer.limit() > length())
 +                buffer.limit((int)(length() - bufferOffset));
          }
          catch (CorruptBlockException e)
          {
@@@ -158,76 -89,58 +158,76 @@@
          }
      }
  
 -    private void decompressChunk(CompressionMetadata.Chunk chunk) throws 
IOException
 +    private void reBufferMmap()
      {
 -        if (channel.position() != chunk.offset)
 -            channel.position(chunk.offset);
 +        try
 +        {
 +            long position = current();
 +            assert position < metadata.dataLength;
  
 -        if (compressed.capacity() < chunk.length)
 -            compressed = ByteBuffer.wrap(new byte[chunk.length]);
 -        else
 -            compressed.clear();
 -        compressed.limit(chunk.length);
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
  
 -        if (channel.read(compressed) != chunk.length)
 -            throw new CorruptBlockException(getPath(), chunk);
 +            Map.Entry<Long, MappedByteBuffer> entry = 
chunkSegments.floorEntry(chunk.offset);
 +            long segmentOffset = entry.getKey();
 +            int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
 +            ByteBuffer compressedChunk = entry.getValue().duplicate(); // 
TODO: change to slice(chunkOffset) when we upgrade LZ4-java
  
 -        // technically flip() is unnecessary since all the remaining work 
uses the raw array, but if that changes
 -        // in the future this will save a lot of hair-pulling
 -        compressed.flip();
 +            compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
  
 -        // If the checksum is on compressed data we want to check it before 
uncompressing the data
 -        if (metadata.hasPostCompressionAdlerChecksums)
 -            checkChecksumIfNeeded(chunk, compressed.array(), chunk.length);
 +            buffer.clear();
  
 -        try
 -        {
 -            validBufferBytes = 
metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 
0);
++            if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
++            {
++                FBUtilities.directCheckSum(checksum, compressedChunk);
++
++                compressedChunk.limit(compressedChunk.capacity());
++                if (compressedChunk.getInt() != (int) checksum.getValue())
++                    throw new CorruptBlockException(getPath(), chunk);
++
++                // reset checksum object back to the original (blank) state
++                checksum.reset();
++
++                compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
++            }
++
 +            try
 +            {
 +                metadata.compressor().uncompress(compressedChunk, buffer);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +            finally
 +            {
 +                buffer.flip();
 +            }
 +
-             if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
-             {
-                 compressedChunk.position(chunkOffset).limit(chunkOffset + 
chunk.length);
- 
-                 FBUtilities.directCheckSum(checksum, compressedChunk);
- 
-                 compressedChunk.limit(compressedChunk.capacity());
-                 if (compressedChunk.getInt() != (int) checksum.getValue())
-                     throw new CorruptBlockException(getPath(), chunk);
- 
-                 // reset checksum object back to the original (blank) state
-                 checksum.reset();
-             }
- 
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
 +            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
 +            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
 +            if (bufferOffset + buffer.limit() > length())
 +                buffer.limit((int)(length() - bufferOffset));
          }
 -        catch (IOException e)
 +        catch (CorruptBlockException e)
          {
 -            throw new CorruptBlockException(getPath(), chunk, e);
 +            throw new CorruptSSTableException(e, getPath());
          }
  
 -        if (!metadata.hasPostCompressionAdlerChecksums)
 -            checkChecksumIfNeeded(chunk, buffer, validBufferBytes);
 -
 -
 -        // buffer offset is always aligned
 -        bufferOffset = current & ~(buffer.length - 1);
 -        // the length() can be provided at construction time, to override the 
true (uncompressed) length of the file;
 -        // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
 -        if (bufferOffset + validBufferBytes > length())
 -            validBufferBytes = (int)(length() - bufferOffset);
      }
  
 -    private void checkChecksumIfNeeded(CompressionMetadata.Chunk chunk, 
byte[] bytes, int length) throws IOException
 +    @Override
 +    protected void reBuffer()
      {
 -        if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
 +        if (chunkSegments != null)
          {
 -            checksum.update(bytes, 0, length);
 -            if (checksum(chunk) != (int) checksum.getValue())
 -                throw new CorruptBlockException(getPath(), chunk);
 -            // reset checksum object back to the original (blank) state
 -            checksum.reset();
 +            reBufferMmap();
 +        }
 +        else
 +        {
 +            reBufferStandard();
          }
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to