Merge branch 'cassandra-2.1' into trunk

Conflicts:
        
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
        test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36bd31d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36bd31d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36bd31d0

Branch: refs/heads/trunk
Commit: 36bd31d0578fedbcc1bfa2ca9b0dbccc4d9284d6
Parents: 5346ce7 a1e2978
Author: Benedict Elliott Smith <[email protected]>
Authored: Wed Mar 4 16:31:59 2015 +0000
Committer: Benedict Elliott Smith <[email protected]>
Committed: Wed Mar 4 16:31:59 2015 +0000

----------------------------------------------------------------------
 .../compress/CompressedRandomAccessReader.java  |   8 +
 .../io/compress/CompressionMetadata.java        |  38 +++--
 .../cassandra/io/util/RandomAccessReader.java   |   2 +
 .../io/sstable/SSTableRewriterTest.java         | 154 ++++++++++++-------
 4 files changed, 134 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index e61b00e,184db9c..b1b4dd4
--- 
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ 
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -80,214 -67,80 +80,222 @@@ public class CompressedRandomAccessRead
  
      protected CompressedRandomAccessReader(String dataFilePath, 
CompressionMetadata metadata, PoolingSegmentedFile owner) throws 
FileNotFoundException
      {
 -        super(new File(dataFilePath), metadata.chunkLength(), 
metadata.compressedFileLength, owner);
 +        super(new File(dataFilePath), metadata.chunkLength(), 
metadata.compressedFileLength, 
metadata.compressor().useDirectOutputByteBuffers(), owner);
          this.metadata = metadata;
 -        checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() 
: new CRC32();
 -        compressed = ByteBuffer.wrap(new 
byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
 -    }
 +        checksum = new Adler32();
  
 -    @Override
 -    protected void reBuffer()
 -    {
 -        try
 +        if (!useMmap)
          {
 -            decompressChunk(metadata.chunkFor(current));
 +            compressed = ByteBuffer.wrap(new 
byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
 +            checksumBytes = ByteBuffer.wrap(new byte[4]);
          }
 -        catch (CorruptBlockException e)
 +        else
          {
 -            throw new CorruptSSTableException(e, getPath());
 +            try
 +            {
 +                createMappedSegments();
 +            }
 +            catch (IOException e)
 +            {
 +                throw new IOError(e);
 +            }
          }
 -        catch (IOException e)
 +    }
 +
 +    private void createMappedSegments() throws IOException
 +    {
 +        chunkSegments = new TreeMap<>();
 +        long offset = 0;
 +        long lastSegmentOffset = 0;
 +        long segmentSize = 0;
 +
 +        while (offset < metadata.dataLength)
          {
 -            throw new FSReadError(e, getPath());
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
 +
 +            //Reached a new mmap boundary
 +            if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
 +            {
 +                chunkSegments.put(lastSegmentOffset, 
channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
 +                lastSegmentOffset += segmentSize;
 +                segmentSize = 0;
 +            }
 +
 +            segmentSize += chunk.length + 4; //checksum
 +            offset += metadata.chunkLength();
          }
 +
 +        if (segmentSize > 0)
 +            chunkSegments.put(lastSegmentOffset, 
channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
      }
  
 -    private void decompressChunk(CompressionMetadata.Chunk chunk) throws 
IOException
 +    protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
      {
 -        if (channel.position() != chunk.offset)
 -            channel.position(chunk.offset);
 +        assert Integer.bitCount(bufferSize) == 1;
 +        return useMmap && useDirect
 +                ? ByteBuffer.allocateDirect(bufferSize)
 +                : ByteBuffer.allocate(bufferSize);
 +    }
  
 -        if (compressed.capacity() < chunk.length)
 -            compressed = ByteBuffer.wrap(new byte[chunk.length]);
 -        else
 -            compressed.clear();
 -        compressed.limit(chunk.length);
 +    @Override
 +    public void deallocate()
 +    {
 +        super.deallocate();
  
 -        if (channel.read(compressed) != chunk.length)
 -            throw new CorruptBlockException(getPath(), chunk);
 +        if (chunkSegments != null)
 +        {
 +            for (Map.Entry<Long, MappedByteBuffer> entry : 
chunkSegments.entrySet())
 +            {
 +                FileUtils.clean(entry.getValue());
 +            }
 +        }
 +
 +        chunkSegments = null;
 +    }
  
 -        // technically flip() is unnecessary since all the remaining work 
uses the raw array, but if that changes
 -        // in the future this will save a lot of hair-pulling
 -        compressed.flip();
 +    private void reBufferStandard()
 +    {
          try
          {
 -            validBufferBytes = 
metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 
0);
 +            long position = current();
 +            assert position < metadata.dataLength;
 +
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +            if (channel.position() != chunk.offset)
 +                channel.position(chunk.offset);
 +
 +            if (compressed.capacity() < chunk.length)
 +                compressed = ByteBuffer.wrap(new byte[chunk.length]);
 +            else
 +                compressed.clear();
 +            compressed.limit(chunk.length);
 +
 +            if (channel.read(compressed) != chunk.length)
 +                throw new CorruptBlockException(getPath(), chunk);
 +
 +            // technically flip() is unnecessary since all the remaining work 
uses the raw array, but if that changes
 +            // in the future this will save a lot of hair-pulling
 +            compressed.flip();
 +            buffer.clear();
 +            int decompressedBytes;
 +            try
 +            {
 +                decompressedBytes = 
metadata.compressor().uncompress(compressed.array(), 0, chunk.length, 
buffer.array(), 0);
 +                buffer.limit(decompressedBytes);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +
 +            if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
 +            {
 +
 +                checksum.update(compressed.array(), 0, chunk.length);
 +
 +                if (checksum(chunk) != (int) checksum.getValue())
 +                    throw new CorruptBlockException(getPath(), chunk);
 +
 +                // reset checksum object back to the original (blank) state
 +                checksum.reset();
 +            }
 +
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
++            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
++            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
++            if (bufferOffset + buffer.limit() > length())
++                buffer.limit((int)(length() - bufferOffset));
 +        }
 +        catch (CorruptBlockException e)
 +        {
 +            throw new CorruptSSTableException(e, getPath());
          }
          catch (IOException e)
          {
 -            throw new CorruptBlockException(getPath(), chunk, e);
 +            throw new FSReadError(e, getPath());
          }
 +    }
  
 -        if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
 +    private void reBufferMmap()
 +    {
 +        try
          {
 +            long position = current();
 +            assert position < metadata.dataLength;
  
 -            if (metadata.hasPostCompressionAdlerChecksums)
 +            CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 +
 +            Map.Entry<Long, MappedByteBuffer> entry = 
chunkSegments.floorEntry(chunk.offset);
 +            long segmentOffset = entry.getKey();
 +            int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
 +            MappedByteBuffer compressedChunk = entry.getValue();
 +
 +            compressedChunk.position(chunkOffset);
 +            compressedChunk.limit(chunkOffset + chunk.length);
 +            compressedChunk.mark();
 +
 +            buffer.clear();
 +            int decompressedBytes;
 +            try
              {
 -                checksum.update(compressed.array(), 0, chunk.length);
 +                decompressedBytes = 
metadata.compressor().uncompress(compressedChunk, buffer);
 +                buffer.limit(decompressedBytes);
              }
 -            else
 +            catch (IOException e)
 +            {
 +                throw new CorruptBlockException(getPath(), chunk);
 +            }
 +            finally
              {
 -                checksum.update(buffer, 0, validBufferBytes);
 +                compressedChunk.limit(compressedChunk.capacity());
              }
  
 -            if (checksum(chunk) != (int) checksum.getValue())
 -                throw new CorruptBlockException(getPath(), chunk);
 +            if (metadata.parameters.getCrcCheckChance() > 
ThreadLocalRandom.current().nextDouble())
 +            {
 +                compressedChunk.reset();
 +                compressedChunk.limit(chunkOffset + chunk.length);
 +
 +                FBUtilities.directCheckSum(checksum, compressedChunk);
 +
 +                compressedChunk.limit(compressedChunk.capacity());
  
 -            // reset checksum object back to the original (blank) state
 -            checksum.reset();
 +
 +                if (compressedChunk.getInt() != (int) checksum.getValue())
 +                    throw new CorruptBlockException(getPath(), chunk);
 +
 +                // reset checksum object back to the original (blank) state
 +                checksum.reset();
 +            }
 +
 +            // buffer offset is always aligned
 +            bufferOffset = position & ~(buffer.capacity() - 1);
 +            buffer.position((int) (position - bufferOffset));
++            // the length() can be provided at construction time, to override 
the true (uncompressed) length of the file;
++            // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
++            if (bufferOffset + buffer.limit() > length())
++                buffer.limit((int)(length() - bufferOffset));
 +        }
 +        catch (CorruptBlockException e)
 +        {
 +            throw new CorruptSSTableException(e, getPath());
          }
  
 -        // buffer offset is always aligned
 -        bufferOffset = current & ~(buffer.length - 1);
 -        // the length() can be provided at construction time, to override the 
true (uncompressed) length of the file;
 -        // this is permitted to occur within a compressed segment, so we 
truncate validBufferBytes if we cross the imposed length
 -        if (bufferOffset + validBufferBytes > length())
 -            validBufferBytes = (int)(length() - bufferOffset);
 +    }
 +
 +    @Override
 +    protected void reBuffer()
 +    {
 +        if (useMmap)
 +        {
 +            reBufferMmap();
 +        }
 +        else
 +        {
 +            reBufferStandard();
 +        }
      }
  
      private int checksum(CompressionMetadata.Chunk chunk) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index eef9d0c,9ac2f89..7237cb9
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -59,8 -57,12 +59,11 @@@ import org.apache.cassandra.utils.Pair
   */
  public class CompressionMetadata
  {
+     // dataLength can represent either the true length of the file
+     // or some shorter value, in the case we want to impose a shorter limit 
on readers
+     // (when early opening, we want to ensure readers cannot read past fully 
written sections)
      public final long dataLength;
      public final long compressedFileLength;
 -    public final boolean hasPostCompressionAdlerChecksums;
      private final Memory chunkOffsets;
      private final long chunkOffsetsSize;
      public final String indexFilePath;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 1f2d395,95877a2..0188259
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@@ -38,15 -36,21 +38,17 @@@ public class RandomAccessReader extend
      private final String filePath;
  
      // buffer which will cache file blocks
 -    protected byte[] buffer;
 +    protected ByteBuffer buffer;
  
 -    // `current` as current position in file
      // `bufferOffset` is the offset of the beginning of the buffer
      // `markedPointer` folds the offset of the last file mark
 -    protected long bufferOffset, current = 0, markedPointer;
 -    // `validBufferBytes` is the number of bytes in the buffer that are 
actually valid;
 -    //  this will be LESS than buffer capacity if buffer is not full!
 -    protected int validBufferBytes = 0;
 +    protected long bufferOffset, markedPointer;
  
 -    // channel liked with the file, used to retrieve data and force updates.
 +    // channel linked with the file, used to retrieve data and force updates.
      protected final FileChannel channel;
  
+     // this can be overridden at construction to a value shorter than the 
true length of the file;
+     // if so, it acts as an imposed limit on reads, rather than a convenience 
property
      private final long fileLength;
  
      protected final PoolingSegmentedFile owner;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 018b094,76b89e5..77c6df9
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -209,33 -184,37 +210,37 @@@ public class SSTableRewriterTest extend
          ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
          cfs.truncateBlocking();
          ArrayBackedSortedColumns cf = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        for (int i = 0; i < 1000; i++)
 -            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
 +        for (int i = 0; i < 100; i++)
 +            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
          File dir = cfs.directories.getDirectoryForNewSSTables();
- 
          SSTableWriter writer = getWriter(cfs, dir);
-         for (int i = 0; i < 500; i++)
-             
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
-         SSTableReader s = writer.openEarly(1000);
-         assertFileCounts(dir.list(), 2, 3);
- 
-         for (int i = 500; i < 1000; i++)
-             
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
-         SSTableReader s2 = writer.openEarly(1000);
- 
-         assertTrue(s != s2);
-         assertFileCounts(dir.list(), 2, 3);
- 
-         s.markObsolete();
-         s.selfRef().release();
-         s2.selfRef().release();
-         Thread.sleep(1000);
-         assertFileCounts(dir.list(), 0, 3);
-         writer.abort();
-         Thread.sleep(1000);
-         int datafiles = assertFileCounts(dir.list(), 0, 0);
-         assertEquals(datafiles, 0);
-         validateCFS(cfs);
+         try
+         {
+             for (int i = 0; i < 1000; i++)
+                 
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
+             SSTableReader s = writer.openEarly(1000);
+             assertFileCounts(dir.list(), 2, 2);
+             for (int i = 1000; i < 2000; i++)
+                 
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
+             SSTableReader s2 = writer.openEarly(1000);
+             assertTrue(s.last.compareTo(s2.last) < 0);
+             assertFileCounts(dir.list(), 2, 2);
+             s.markObsolete();
+             s.selfRef().release();
+             s2.selfRef().release();
+             Thread.sleep(1000);
+             assertFileCounts(dir.list(), 0, 2);
+             writer.abort();
+             Thread.sleep(1000);
+             int datafiles = assertFileCounts(dir.list(), 0, 0);
+             assertEquals(datafiles, 0);
+             validateCFS(cfs);
+         }
+         catch (Throwable t)
+         {
+             writer.abort();
+             throw t;
+         }
      }
  
      @Test
@@@ -431,9 -416,14 +442,14 @@@
          {
              test.run(scanner, controller, s, cfs, rewriter);
          }
+         catch (Throwable t)
+         {
+             rewriter.abort();
+             throw t;
+         }
  
          Thread.sleep(1000);
 -        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
 +        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
          assertEquals(1, cfs.getSSTables().size());
          assertFileCounts(s.descriptor.directory.list(), 0, 0);
          assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@@ -781,9 -794,11 +815,11 @@@
          int datacount = 0;
          for (String f : files)
          {
-             if (f.contains("tmplink-"))
+             if (f.endsWith("-CRC.db"))
+                 continue;
+             if (f.contains("-tmplink-"))
                  tmplinkcount++;
 -            else if (f.contains("-tmp-"))
 +            else if (f.contains("tmp-"))
                  tmpcount++;
              else if (f.contains("Data"))
                  datacount++;
@@@ -796,6 -811,21 +832,16 @@@
      private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
      {
          String filename = cfs.getTempSSTablePath(directory);
 -        return new SSTableWriter(filename,
 -                                 0,
 -                                 0,
 -                                 cfs.metadata,
 -                                 StorageService.getPartitioner(),
 -                                 new 
MetadataCollector(cfs.metadata.comparator));
 +        return SSTableWriter.create(filename, 0, 0);
      }
+ 
+     private ByteBuffer random(int i, int size)
+     {
+         byte[] bytes = new byte[size + 4];
+         ThreadLocalRandom.current().nextBytes(bytes);
+         ByteBuffer r = ByteBuffer.wrap(bytes);
+         r.putInt(0, i);
+         return r;
+     }
+ 
  }

Reply via email to