Fix CASSANDRA-8750's treatment of compressed files patch by benedict; reviewed by marcus for CASSANDRA-8750
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1e2978f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1e2978f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1e2978f Branch: refs/heads/trunk Commit: a1e2978f9f6d2e9a318f44a5b6c625659b86efe8 Parents: ec7fba4 Author: Benedict Elliott Smith <[email protected]> Authored: Wed Mar 4 16:27:30 2015 +0000 Committer: Benedict Elliott Smith <[email protected]> Committed: Wed Mar 4 16:27:30 2015 +0000 ---------------------------------------------------------------------- .../compress/CompressedRandomAccessReader.java | 4 + .../io/compress/CompressionMetadata.java | 38 +++-- .../cassandra/io/util/RandomAccessReader.java | 2 + .../io/sstable/SSTableRewriterTest.java | 149 ++++++++++++------- 4 files changed, 129 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index e29ad33..184db9c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -137,6 +137,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader // buffer offset is always aligned bufferOffset = current & ~(buffer.length - 1); + // the length() can be provided at construction time, to override the true (uncompressed) length of the file; + // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length + if (bufferOffset + validBufferBytes > length()) + validBufferBytes = (int)(length() - bufferOffset); } private int checksum(CompressionMetadata.Chunk chunk) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 59c5da5..9ac2f89 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -57,6 +57,9 @@ import org.apache.cassandra.utils.Pair; */ public class CompressionMetadata { + // dataLength can represent either the true length of the file + // or some shorter value, in the case we want to impose a shorter limit on readers + // (when early opening, we want to ensure readers cannot read past fully written sections) public final long dataLength; public final long compressedFileLength; public final boolean hasPostCompressionAdlerChecksums; @@ -331,33 +334,39 @@ public class CompressionMetadata public CompressionMetadata open(long dataLength, long compressedLength, OpenType type) { - SafeMemory offsets = this.offsets; + SafeMemory offsets; int count = this.count; switch (type) { case FINAL: case SHARED_FINAL: - // maybe resize the data if (this.offsets.size() != count * 8L) { - offsets = this.offsets.copy(count * 8L); - // release our reference to the original shared data; - // we don't do this if not resizing since we must pass out existing - // reference onto our caller + // finalize the size of memory used if it won't now change; + // unnecessary if already correct size + SafeMemory tmp = this.offsets.copy(count * 8L); this.offsets.free(); + this.offsets = tmp; } - // null out our reference to the original shared data to catch accidental reuse - // note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe - this.offsets = null; + if (type == OpenType.SHARED_FINAL) - // we will use the data again, so stash our resized data back, and take an extra reference to it - this.offsets = offsets.sharedCopy(); + { + offsets = this.offsets.sharedCopy(); + } + else + { + offsets = this.offsets; + // null out our reference to the original shared data to catch accidental reuse + // note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe + this.offsets = null; + } break; case SHARED: - + offsets = this.offsets.sharedCopy(); // we should only be opened on a compression data boundary; truncate our size to this boundary - assert dataLength % parameters.chunkLength() == 0; count = (int) (dataLength / parameters.chunkLength()); + if (dataLength % parameters.chunkLength() != 0) + count++; // grab our actual compressed length from the next offset from our the position we're opened to if (count < this.count) compressedLength = offsets.getLong(count * 8L); @@ -409,7 +418,8 @@ public class CompressionMetadata public void abort() { - offsets.close(); + if (offsets != null) + offsets.close(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index df68ca3..95877a2 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -49,6 +49,8 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu // channel liked with the file, used to retrieve data and force updates. protected final FileChannel channel; + // this can be overridden at construction to a value shorter than the true length of the file; + // if so, it acts as an imposed limit on reads, rather than a convenience property private final long fileLength; protected final PoolingSegmentedFile owner; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1e2978f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 6c96905..76b89e5 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -187,26 +188,33 @@ public class SSTableRewriterTest extends SchemaLoader cf.addColumn(Util.column(String.valueOf(i), "a", 1)); File dir = cfs.directories.getDirectoryForNewSSTables(); SSTableWriter writer = getWriter(cfs, dir); - - for (int i = 0; i < 500; i++) - writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); - SSTableReader s = writer.openEarly(1000); - assertFileCounts(dir.list(), 2, 3); - for (int i = 500; i < 1000; i++) - writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); - SSTableReader s2 = writer.openEarly(1000); - assertTrue(s != s2); - assertFileCounts(dir.list(), 2, 3); - s.markObsolete(); - s.selfRef().release(); - s2.selfRef().release(); - Thread.sleep(1000); - assertFileCounts(dir.list(), 0, 3); - writer.abort(); - Thread.sleep(1000); - int datafiles = assertFileCounts(dir.list(), 0, 0); - assertEquals(datafiles, 0); - validateCFS(cfs); + try + { + for (int i = 0; i < 1000; i++) + writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf); + SSTableReader s = writer.openEarly(1000); + assertFileCounts(dir.list(), 2, 2); + for (int i = 1000; i < 2000; i++) + writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf); + SSTableReader s2 = writer.openEarly(1000); + assertTrue(s.last.compareTo(s2.last) < 0); + assertFileCounts(dir.list(), 2, 2); + s.markObsolete(); + s.selfRef().release(); + s2.selfRef().release(); + Thread.sleep(1000); + assertFileCounts(dir.list(), 0, 2); + writer.abort(); + Thread.sleep(1000); + int datafiles = assertFileCounts(dir.list(), 0, 0); + assertEquals(datafiles, 0); + validateCFS(cfs); + } + catch (Throwable t) + { + writer.abort(); + throw t; + } } @Test @@ -287,17 +295,23 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } + + List<SSTableReader> sstables = rewriter.finish(); + assertEquals(files, sstables.size()); + assertEquals(files, cfs.getSSTables().size()); + assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + assertEquals(files, cfs.getSSTables().size()); + assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + catch (Throwable t) + { + rewriter.abort(); + throw t; } - List<SSTableReader> sstables = rewriter.finish(); - assertEquals(files, sstables.size()); - assertEquals(files, cfs.getSSTables().size()); - assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, cfs.getSSTables().size()); - assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } @@ -402,6 +416,11 @@ public class SSTableRewriterTest extends SchemaLoader { test.run(scanner, controller, s, cfs, rewriter); } + catch (Throwable t) + { + rewriter.abort(); + throw t; + } Thread.sleep(1000); assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count()); @@ -434,7 +453,7 @@ public class SSTableRewriterTest extends SchemaLoader while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); - if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + if (rewriter.currentWriter().getFilePointer() > 25000000) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); files++; @@ -448,11 +467,17 @@ public class SSTableRewriterTest extends SchemaLoader break; } } + + Thread.sleep(1000); + assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + catch (Throwable t) + { + rewriter.abort(); + throw t; } - Thread.sleep(1000); - assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } @Test @@ -484,14 +509,20 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. } } + + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + cfs.truncateBlocking(); + Thread.sleep(1000); // make sure the deletion tasks have run etc + validateCFS(cfs); + } + catch (Throwable t) + { + rewriter.abort(); + throw t; } - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - cfs.truncateBlocking(); - Thread.sleep(1000); // make sure the deletion tasks have run etc - validateCFS(cfs); } @Test @@ -523,14 +554,20 @@ public class SSTableRewriterTest extends SchemaLoader files++; } } + + List<SSTableReader> sstables = rewriter.finish(); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + assertEquals(files, sstables.size()); + assertEquals(files, cfs.getSSTables().size()); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + catch (Throwable t) + { + rewriter.abort(); + throw t; } - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, sstables.size()); - assertEquals(files, cfs.getSSTables().size()); - Thread.sleep(1000); - assertFileCounts(s.descriptor.directory.list(), 0, 0); - validateCFS(cfs); } @Test public void testSSTableSplit() throws InterruptedException @@ -710,7 +747,7 @@ public class SSTableRewriterTest extends SchemaLoader { ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); for (int i = 0; i < count / 100; i++) - cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1); + cf.addColumn(Util.cellname(i), random(0, 1000), 1); File dir = cfs.directories.getDirectoryForNewSSTables(); String filename = cfs.getTempSSTablePath(dir); @@ -757,6 +794,8 @@ public class SSTableRewriterTest extends SchemaLoader int datacount = 0; for (String f : files) { + if (f.endsWith("-CRC.db")) + continue; if (f.contains("-tmplink-")) tmplinkcount++; else if (f.contains("-tmp-")) @@ -779,4 +818,14 @@ public class SSTableRewriterTest extends SchemaLoader StorageService.getPartitioner(), new MetadataCollector(cfs.metadata.comparator)); } + + private ByteBuffer random(int i, int size) + { + byte[] bytes = new byte[size + 4]; + ThreadLocalRandom.current().nextBytes(bytes); + ByteBuffer r = ByteBuffer.wrap(bytes); + r.putInt(0, i); + return r; + } + }
