Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36bd31d0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36bd31d0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36bd31d0
Branch: refs/heads/trunk
Commit: 36bd31d0578fedbcc1bfa2ca9b0dbccc4d9284d6
Parents: 5346ce7 a1e2978
Author: Benedict Elliott Smith <[email protected]>
Authored: Wed Mar 4 16:31:59 2015 +0000
Committer: Benedict Elliott Smith <[email protected]>
Committed: Wed Mar 4 16:31:59 2015 +0000
----------------------------------------------------------------------
.../compress/CompressedRandomAccessReader.java | 8 +
.../io/compress/CompressionMetadata.java | 38 +++--
.../cassandra/io/util/RandomAccessReader.java | 2 +
.../io/sstable/SSTableRewriterTest.java | 154 ++++++++++++-------
4 files changed, 134 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index e61b00e,184db9c..b1b4dd4
---
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -80,214 -67,80 +80,222 @@@ public class CompressedRandomAccessRead
protected CompressedRandomAccessReader(String dataFilePath,
CompressionMetadata metadata, PoolingSegmentedFile owner) throws
FileNotFoundException
{
- super(new File(dataFilePath), metadata.chunkLength(),
metadata.compressedFileLength, owner);
+ super(new File(dataFilePath), metadata.chunkLength(),
metadata.compressedFileLength,
metadata.compressor().useDirectOutputByteBuffers(), owner);
this.metadata = metadata;
- checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32()
: new CRC32();
- compressed = ByteBuffer.wrap(new
byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
- }
+ checksum = new Adler32();
- @Override
- protected void reBuffer()
- {
- try
+ if (!useMmap)
{
- decompressChunk(metadata.chunkFor(current));
+ compressed = ByteBuffer.wrap(new
byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+ checksumBytes = ByteBuffer.wrap(new byte[4]);
}
- catch (CorruptBlockException e)
+ else
{
- throw new CorruptSSTableException(e, getPath());
+ try
+ {
+ createMappedSegments();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
- catch (IOException e)
+ }
+
+ private void createMappedSegments() throws IOException
+ {
+ chunkSegments = new TreeMap<>();
+ long offset = 0;
+ long lastSegmentOffset = 0;
+ long segmentSize = 0;
+
+ while (offset < metadata.dataLength)
{
- throw new FSReadError(e, getPath());
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+ //Reached a new mmap boundary
+ if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+ {
+ chunkSegments.put(lastSegmentOffset,
channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
+ lastSegmentOffset += segmentSize;
+ segmentSize = 0;
+ }
+
+ segmentSize += chunk.length + 4; //checksum
+ offset += metadata.chunkLength();
}
+
+ if (segmentSize > 0)
+ chunkSegments.put(lastSegmentOffset,
channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
}
- private void decompressChunk(CompressionMetadata.Chunk chunk) throws
IOException
+ protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
{
- if (channel.position() != chunk.offset)
- channel.position(chunk.offset);
+ assert Integer.bitCount(bufferSize) == 1;
+ return useMmap && useDirect
+ ? ByteBuffer.allocateDirect(bufferSize)
+ : ByteBuffer.allocate(bufferSize);
+ }
- if (compressed.capacity() < chunk.length)
- compressed = ByteBuffer.wrap(new byte[chunk.length]);
- else
- compressed.clear();
- compressed.limit(chunk.length);
+ @Override
+ public void deallocate()
+ {
+ super.deallocate();
- if (channel.read(compressed) != chunk.length)
- throw new CorruptBlockException(getPath(), chunk);
+ if (chunkSegments != null)
+ {
+ for (Map.Entry<Long, MappedByteBuffer> entry :
chunkSegments.entrySet())
+ {
+ FileUtils.clean(entry.getValue());
+ }
+ }
+
+ chunkSegments = null;
+ }
- // technically flip() is unnecessary since all the remaining work
uses the raw array, but if that changes
- // in the future this will save a lot of hair-pulling
- compressed.flip();
+ private void reBufferStandard()
+ {
try
{
- validBufferBytes =
metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer,
0);
+ long position = current();
+ assert position < metadata.dataLength;
+
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+ if (channel.position() != chunk.offset)
+ channel.position(chunk.offset);
+
+ if (compressed.capacity() < chunk.length)
+ compressed = ByteBuffer.wrap(new byte[chunk.length]);
+ else
+ compressed.clear();
+ compressed.limit(chunk.length);
+
+ if (channel.read(compressed) != chunk.length)
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // technically flip() is unnecessary since all the remaining work
uses the raw array, but if that changes
+ // in the future this will save a lot of hair-pulling
+ compressed.flip();
+ buffer.clear();
+ int decompressedBytes;
+ try
+ {
+ decompressedBytes =
metadata.compressor().uncompress(compressed.array(), 0, chunk.length,
buffer.array(), 0);
+ buffer.limit(decompressedBytes);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
+
+ if (metadata.parameters.getCrcCheckChance() >
ThreadLocalRandom.current().nextDouble())
+ {
+
+ checksum.update(compressed.array(), 0, chunk.length);
+
+ if (checksum(chunk) != (int) checksum.getValue())
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // reset checksum object back to the original (blank) state
+ checksum.reset();
+ }
+
+ // buffer offset is always aligned
+ bufferOffset = position & ~(buffer.capacity() - 1);
+ buffer.position((int) (position - bufferOffset));
++ // the length() can be provided at construction time, to override
the true (uncompressed) length of the file;
++ // this is permitted to occur within a compressed segment, so we
truncate validBufferBytes if we cross the imposed length
++ if (bufferOffset + buffer.limit() > length())
++ buffer.limit((int)(length() - bufferOffset));
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
}
catch (IOException e)
{
- throw new CorruptBlockException(getPath(), chunk, e);
+ throw new FSReadError(e, getPath());
}
+ }
- if (metadata.parameters.getCrcCheckChance() >
ThreadLocalRandom.current().nextDouble())
+ private void reBufferMmap()
+ {
+ try
{
+ long position = current();
+ assert position < metadata.dataLength;
- if (metadata.hasPostCompressionAdlerChecksums)
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
+
+ Map.Entry<Long, MappedByteBuffer> entry =
chunkSegments.floorEntry(chunk.offset);
+ long segmentOffset = entry.getKey();
+ int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
+ MappedByteBuffer compressedChunk = entry.getValue();
+
+ compressedChunk.position(chunkOffset);
+ compressedChunk.limit(chunkOffset + chunk.length);
+ compressedChunk.mark();
+
+ buffer.clear();
+ int decompressedBytes;
+ try
{
- checksum.update(compressed.array(), 0, chunk.length);
+ decompressedBytes =
metadata.compressor().uncompress(compressedChunk, buffer);
+ buffer.limit(decompressedBytes);
}
- else
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
+ finally
{
- checksum.update(buffer, 0, validBufferBytes);
+ compressedChunk.limit(compressedChunk.capacity());
}
- if (checksum(chunk) != (int) checksum.getValue())
- throw new CorruptBlockException(getPath(), chunk);
+ if (metadata.parameters.getCrcCheckChance() >
ThreadLocalRandom.current().nextDouble())
+ {
+ compressedChunk.reset();
+ compressedChunk.limit(chunkOffset + chunk.length);
+
+ FBUtilities.directCheckSum(checksum, compressedChunk);
+
+ compressedChunk.limit(compressedChunk.capacity());
- // reset checksum object back to the original (blank) state
- checksum.reset();
+
+ if (compressedChunk.getInt() != (int) checksum.getValue())
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // reset checksum object back to the original (blank) state
+ checksum.reset();
+ }
+
+ // buffer offset is always aligned
+ bufferOffset = position & ~(buffer.capacity() - 1);
+ buffer.position((int) (position - bufferOffset));
++ // the length() can be provided at construction time, to override
the true (uncompressed) length of the file;
++ // this is permitted to occur within a compressed segment, so we
truncate validBufferBytes if we cross the imposed length
++ if (bufferOffset + buffer.limit() > length())
++ buffer.limit((int)(length() - bufferOffset));
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
}
- // buffer offset is always aligned
- bufferOffset = current & ~(buffer.length - 1);
- // the length() can be provided at construction time, to override the
true (uncompressed) length of the file;
- // this is permitted to occur within a compressed segment, so we
truncate validBufferBytes if we cross the imposed length
- if (bufferOffset + validBufferBytes > length())
- validBufferBytes = (int)(length() - bufferOffset);
+ }
+
+ @Override
+ protected void reBuffer()
+ {
+ if (useMmap)
+ {
+ reBufferMmap();
+ }
+ else
+ {
+ reBufferStandard();
+ }
}
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index eef9d0c,9ac2f89..7237cb9
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -59,8 -57,12 +59,11 @@@ import org.apache.cassandra.utils.Pair
*/
public class CompressionMetadata
{
+ // dataLength can represent either the true length of the file
+ // or some shorter value, in the case we want to impose a shorter limit
on readers
+ // (when early opening, we want to ensure readers cannot read past fully
written sections)
public final long dataLength;
public final long compressedFileLength;
- public final boolean hasPostCompressionAdlerChecksums;
private final Memory chunkOffsets;
private final long chunkOffsetsSize;
public final String indexFilePath;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 1f2d395,95877a2..0188259
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@@ -38,15 -36,21 +38,17 @@@ public class RandomAccessReader extend
private final String filePath;
// buffer which will cache file blocks
- protected byte[] buffer;
+ protected ByteBuffer buffer;
- // `current` as current position in file
// `bufferOffset` is the offset of the beginning of the buffer
// `markedPointer` folds the offset of the last file mark
- protected long bufferOffset, current = 0, markedPointer;
- // `validBufferBytes` is the number of bytes in the buffer that are
actually valid;
- // this will be LESS than buffer capacity if buffer is not full!
- protected int validBufferBytes = 0;
+ protected long bufferOffset, markedPointer;
- // channel liked with the file, used to retrieve data and force updates.
+ // channel linked with the file, used to retrieve data and force updates.
protected final FileChannel channel;
+ // this can be overridden at construction to a value shorter than the
true length of the file;
+ // if so, it acts as an imposed limit on reads, rather than a convenience
property
private final long fileLength;
protected final PoolingSegmentedFile owner;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36bd31d0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 018b094,76b89e5..77c6df9
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -209,33 -184,37 +210,37 @@@ public class SSTableRewriterTest extend
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
cfs.truncateBlocking();
ArrayBackedSortedColumns cf =
ArrayBackedSortedColumns.factory.create(cfs.metadata);
- for (int i = 0; i < 1000; i++)
- cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ for (int i = 0; i < 100; i++)
+ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
File dir = cfs.directories.getDirectoryForNewSSTables();
-
SSTableWriter writer = getWriter(cfs, dir);
- for (int i = 0; i < 500; i++)
-
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
- SSTableReader s = writer.openEarly(1000);
- assertFileCounts(dir.list(), 2, 3);
-
- for (int i = 500; i < 1000; i++)
-
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
- SSTableReader s2 = writer.openEarly(1000);
-
- assertTrue(s != s2);
- assertFileCounts(dir.list(), 2, 3);
-
- s.markObsolete();
- s.selfRef().release();
- s2.selfRef().release();
- Thread.sleep(1000);
- assertFileCounts(dir.list(), 0, 3);
- writer.abort();
- Thread.sleep(1000);
- int datafiles = assertFileCounts(dir.list(), 0, 0);
- assertEquals(datafiles, 0);
- validateCFS(cfs);
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
+ SSTableReader s = writer.openEarly(1000);
+ assertFileCounts(dir.list(), 2, 2);
+ for (int i = 1000; i < 2000; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
+ SSTableReader s2 = writer.openEarly(1000);
+ assertTrue(s.last.compareTo(s2.last) < 0);
+ assertFileCounts(dir.list(), 2, 2);
+ s.markObsolete();
+ s.selfRef().release();
+ s2.selfRef().release();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 2);
+ writer.abort();
+ Thread.sleep(1000);
+ int datafiles = assertFileCounts(dir.list(), 0, 0);
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
}
@Test
@@@ -431,9 -416,14 +442,14 @@@
{
test.run(scanner, controller, s, cfs, rewriter);
}
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ throw t;
+ }
Thread.sleep(1000);
- assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(1, cfs.getSSTables().size());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@@ -781,9 -794,11 +815,11 @@@
int datacount = 0;
for (String f : files)
{
- if (f.contains("tmplink-"))
+ if (f.endsWith("-CRC.db"))
+ continue;
+ if (f.contains("-tmplink-"))
tmplinkcount++;
- else if (f.contains("-tmp-"))
+ else if (f.contains("tmp-"))
tmpcount++;
else if (f.contains("Data"))
datacount++;
@@@ -796,6 -811,21 +832,16 @@@
private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
{
String filename = cfs.getTempSSTablePath(directory);
- return new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new
MetadataCollector(cfs.metadata.comparator));
+ return SSTableWriter.create(filename, 0, 0);
}
+
+ private ByteBuffer random(int i, int size)
+ {
+ byte[] bytes = new byte[size + 4];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ByteBuffer r = ByteBuffer.wrap(bytes);
+ r.putInt(0, i);
+ return r;
+ }
+
}