Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/49b08989 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/49b08989 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/49b08989 Branch: refs/heads/trunk Commit: 49b089893b5407ba42dad389804ff21d535a8537 Parents: 3bfe4b6 db127a1 Author: Tyler Hobbs <[email protected]> Authored: Thu Apr 30 13:15:57 2015 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Thu Apr 30 13:15:57 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/Scrubber.java | 128 ++++++++++++--- .../compress/CompressedRandomAccessReader.java | 2 + .../unit/org/apache/cassandra/SchemaLoader.java | 7 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 157 ++++++++++++++++--- 5 files changed, 242 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 675b604,1f5c7de..1e50da6 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -121,10 -122,10 +130,10 @@@ public class Scrubber implements Closea SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline); try { - ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@@ -150,30 -153,19 +160,19 @@@ // check for null key below } - ByteBuffer currentIndexKey = nextIndexKey; - long nextRowPositionFromIndex; - try - { - nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() - ? dataFile.length() - : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; - } - catch (Throwable th) - { - JVMStabilityInspector.inspectThrowable(th); - outputHandler.warn("Error reading index file", th); - nextIndexKey = null; - nextRowPositionFromIndex = dataFile.length(); - } + updateIndexKey(); long dataStart = dataFile.getFilePointer(); - long dataStartFromIndex = currentIndexKey == null - ? -1 - : rowStart + 2 + currentIndexKey.remaining(); - long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + + long dataStartFromIndex = -1; + long dataSizeFromIndex = -1; + if (currentIndexKey != null) + { + dataStartFromIndex = currentRowPositionFromIndex + 2 + currentIndexKey.remaining(); + dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + } - dataSize = dataSizeFromIndex; + long dataSize = dataSizeFromIndex; // avoid an NPE if key is null String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey()); outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); @@@ -187,7 -186,13 +193,14 @@@ if (dataSize > dataFile.length()) throw new IOError(new IOException("Impossible row size " + dataSize)); + if (dataStart != dataStartFromIndex) + outputHandler.warn(String.format("Data file row position %d different from index file row position %d", dataStart, dataSizeFromIndex)); + + if (dataSize != dataSizeFromIndex) + outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex)); + - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData); ++ if (prevKey != null && prevKey.compareTo(key) > 0) { saveOutOfOrderRow(prevKey, key, atoms); @@@ -216,7 -220,9 +228,9 @@@ key = sstable.partitioner.decorateKey(currentIndexKey); try { + dataFile.seek(dataStartFromIndex); + - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, checkData); if (prevKey != null && prevKey.compareTo(key) > 0) { saveOutOfOrderRow(prevKey, key, atoms); @@@ -233,21 -240,21 +248,21 @@@ catch (Throwable th2) { throwIfFatal(th2); - throwIfCommutative(key, th2); + throwIfCannotContinue(key, th2); outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); - dataFile.seek(nextRowPositionFromIndex); badRows++; + seekToNextRow(); } } else { - throwIfCommutative(key, th); + throwIfCannotContinue(key, th); outputHandler.warn("Row starting at position " + dataStart + " is unreadable; skipping to next"); - if (currentIndexKey != null) - dataFile.seek(nextRowPositionFromIndex); badRows++; + if (currentIndexKey != null) + seekToNextRow(); } } } @@@ -297,6 -304,46 +312,46 @@@ } } + private void updateIndexKey() + { + currentIndexKey = nextIndexKey; + currentRowPositionFromIndex = nextRowPositionFromIndex; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); + nextRowPositionFromIndex = indexFile.isEOF() + ? dataFile.length() - : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; ++ : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + } + catch (Throwable th) + { + JVMStabilityInspector.inspectThrowable(th); + outputHandler.warn("Error reading index file", th); + nextIndexKey = null; + nextRowPositionFromIndex = dataFile.length(); + } + } + + private void seekToNextRow() + { + while(nextRowPositionFromIndex < dataFile.length()) + { + try + { + dataFile.seek(nextRowPositionFromIndex); + return; + } + catch (Throwable th) + { + throwIfFatal(th); + outputHandler.warn(String.format("Failed to seek to next row position %d", nextRowPositionFromIndex), th); + badRows++; + } + + updateIndexKey(); + } + } + private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) { // TODO bitch if the row is too large? if it is there's not much we can do ... http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 1b3cd06,184db9c..edf8c68 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@@ -70,226 -60,87 +70,228 @@@ public class CompressedRandomAccessRead private ByteBuffer compressed; // re-use single crc object - private final Checksum checksum; + private final Adler32 checksum; // raw checksum bytes - private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); + private ByteBuffer checksumBytes; - protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException + protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException { - super(new File(dataFilePath), metadata.chunkLength(), metadata.compressedFileLength, owner); + super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), owner); this.metadata = metadata; - checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32(); - compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); - } + checksum = new Adler32(); - @Override - protected void reBuffer() - { - try + if (!useMmap) { - decompressChunk(metadata.chunkFor(current)); + compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); + checksumBytes = ByteBuffer.wrap(new byte[4]); } - catch (CorruptBlockException e) + else { - throw new CorruptSSTableException(e, getPath()); + try + { + createMappedSegments(); + } + catch (IOException e) + { + throw new IOError(e); + } } - catch (IOException e) + } + + private void createMappedSegments() throws IOException + { + chunkSegments = new TreeMap<>(); + long offset = 0; + long lastSegmentOffset = 0; + long segmentSize = 0; + + while (offset < metadata.dataLength) { - throw new FSReadError(e, getPath()); + CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); + + //Reached a new mmap boundary + if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) + { + chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); + lastSegmentOffset += segmentSize; + segmentSize = 0; + } + + segmentSize += chunk.length + 4; //checksum + offset += metadata.chunkLength(); } + + if (segmentSize > 0) + chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); } - private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException + protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect) { - if (channel.position() != chunk.offset) - channel.position(chunk.offset); + assert Integer.bitCount(bufferSize) == 1; + return useMmap && useDirect + ? ByteBuffer.allocateDirect(bufferSize) + : ByteBuffer.allocate(bufferSize); + } - if (compressed.capacity() < chunk.length) - compressed = ByteBuffer.wrap(new byte[chunk.length]); - else - compressed.clear(); - compressed.limit(chunk.length); + @Override + public void deallocate() + { + super.deallocate(); - if (channel.read(compressed) != chunk.length) - throw new CorruptBlockException(getPath(), chunk); + if (chunkSegments != null) + { + for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet()) + { + FileUtils.clean(entry.getValue()); + } + } - // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes - // in the future this will save a lot of hair-pulling - compressed.flip(); + chunkSegments = null; + } + + private void reBufferStandard() + { try { - validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0); + long position = current(); + assert position < metadata.dataLength; + + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + if (compressed.capacity() < chunk.length) + compressed = ByteBuffer.wrap(new byte[chunk.length]); + else + compressed.clear(); + compressed.limit(chunk.length); + + if (channel.read(compressed, chunk.offset) != chunk.length) + throw new CorruptBlockException(getPath(), chunk); + + // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes + // in the future this will save a lot of hair-pulling + compressed.flip(); + buffer.clear(); + int decompressedBytes; + try + { + decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0); + buffer.limit(decompressedBytes); + } + catch (IOException e) + { ++ buffer.limit(0); + throw new CorruptBlockException(getPath(), chunk); + } + + if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + + checksum.update(compressed.array(), 0, chunk.length); + + if (checksum(chunk) != (int) checksum.getValue()) + throw new CorruptBlockException(getPath(), chunk); + + // reset checksum object back to the original (blank) state + checksum.reset(); + } + + // buffer offset is always aligned + bufferOffset = position & ~(buffer.capacity() - 1); + buffer.position((int) (position - bufferOffset)); + // the length() can be provided at construction time, to override the true (uncompressed) length of the file; + // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length + if (bufferOffset + buffer.limit() > length()) + buffer.limit((int)(length() - bufferOffset)); + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, getPath()); } catch (IOException e) { - throw new CorruptBlockException(getPath(), chunk, e); + throw new FSReadError(e, getPath()); } + } - if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + private void reBufferMmap() + { + try { + long position = current(); + assert position < metadata.dataLength; - if (metadata.hasPostCompressionAdlerChecksums) + CompressionMetadata.Chunk chunk = metadata.chunkFor(position); + + Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset); + long segmentOffset = entry.getKey(); + int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); + MappedByteBuffer compressedChunk = entry.getValue(); + + compressedChunk.position(chunkOffset); + compressedChunk.limit(chunkOffset + chunk.length); + compressedChunk.mark(); + + buffer.clear(); + int decompressedBytes; + try { - checksum.update(compressed.array(), 0, chunk.length); + decompressedBytes = metadata.compressor().uncompress(compressedChunk, buffer); + buffer.limit(decompressedBytes); } - else + catch (IOException e) + { ++ buffer.limit(0); + throw new CorruptBlockException(getPath(), chunk); + } + finally { - checksum.update(buffer, 0, validBufferBytes); + compressedChunk.limit(compressedChunk.capacity()); } - if (checksum(chunk) != (int) checksum.getValue()) - throw new CorruptBlockException(getPath(), chunk); + if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble()) + { + compressedChunk.reset(); + compressedChunk.limit(chunkOffset + chunk.length); + + FBUtilities.directCheckSum(checksum, compressedChunk); + + compressedChunk.limit(compressedChunk.capacity()); - // reset checksum object back to the original (blank) state - checksum.reset(); + + if (compressedChunk.getInt() != (int) checksum.getValue()) + throw new CorruptBlockException(getPath(), chunk); + + // reset checksum object back to the original (blank) state + checksum.reset(); + } + + // buffer offset is always aligned + bufferOffset = position & ~(buffer.capacity() - 1); + buffer.position((int) (position - bufferOffset)); + // the length() can be provided at construction time, to override the true (uncompressed) length of the file; + // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length + if (bufferOffset + buffer.limit() > length()) + buffer.limit((int)(length() - bufferOffset)); + } + catch (CorruptBlockException e) + { + throw new CorruptSSTableException(e, getPath()); } - // buffer offset is always aligned - bufferOffset = current & ~(buffer.length - 1); - // the length() can be provided at construction time, to override the true (uncompressed) length of the file; - // this is permitted to occur within a compressed segment, so we truncate validBufferBytes if we cross the imposed length - if (bufferOffset + validBufferBytes > length()) - validBufferBytes = (int)(length() - bufferOffset); + } + + @Override + protected void reBuffer() + { + if (useMmap) + { + reBufferMmap(); + } + else + { + reBufferStandard(); + } } private int checksum(CompressionMetadata.Chunk chunk) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/SchemaLoader.java index 8f9df8f,c6a3855..46f4a9a --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@@ -435,26 -415,16 +435,31 @@@ public class SchemaLoade private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp) { - return CFMetaData.denseCFMetaData(ksName, cfName, comp).defaultValidator(comp); + return CFMetaData.denseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters()); } - private static CFMetaData jdbcSparseCFMD(String ksName, String cfName, AbstractType comp) + public static CFMetaData jdbcSparseCFMD(String ksName, String cfName, AbstractType comp) { - return CFMetaData.sparseCFMetaData(ksName, cfName, comp).defaultValidator(comp); + return CFMetaData.sparseCFMetaData(ksName, cfName, comp).defaultValidator(comp).compressionParameters(getCompressionParameters()); } - public static void cleanupAndLeaveDirs() + public static CompressionParameters getCompressionParameters() { ++ return getCompressionParameters(null); ++ } ++ ++ public static CompressionParameters getCompressionParameters(Integer chunkSize) ++ { + if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))) - return new CompressionParameters(SnappyCompressor.instance); ++ return new CompressionParameters(SnappyCompressor.instance, chunkSize, Collections.<String, String>emptyMap()); + else + return new CompressionParameters(null); + } + + public static void cleanupAndLeaveDirs() throws IOException + { + // We need to stop and unmap all CLS instances prior to cleanup() or we'll get failures on Windows. + CommitLog.instance.stopUnsafe(true); mkdirs(); cleanup(); mkdirs(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/49b08989/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index dce8d14,a19c76d..128d1b0 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -21,18 -21,19 +21,13 @@@ package org.apache.cassandra.db */ import java.io.*; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.ByteBuffer; - import java.util.Arrays; -import java.util.Collections; --import java.util.HashSet; --import java.util.Iterator; --import java.util.List; --import java.util.Set; ++import java.util.*; import java.util.concurrent.ExecutionException; - import org.apache.cassandra.io.compress.CompressionParameters; - import org.apache.cassandra.io.compress.SnappyCompressor; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.RequestExecutionException; + import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@@ -54,57 -51,31 +49,57 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableRewriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.Util; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +import static org.junit.Assert.*; ++import static org.junit.Assume.assumeTrue; import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.fail; - import static org.junit.Assume.assumeTrue; -import static junit.framework.Assert.assertNotNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; - @RunWith(OrderedJUnit4ClassRunner.class) -public class ScrubTest extends SchemaLoader +public class ScrubTest { - public String KEYSPACE = "Keyspace1"; - public String CF = "Standard1"; - public String CF3 = "Standard2"; - public String COUNTER_CF = "Counter1"; - private static Integer COMPRESSION_CHUNK_LENGTH = 4096; + public static final String KEYSPACE = "Keyspace1"; + public static final String CF = "Standard1"; + public static final String CF2 = "Standard2"; + public static final String CF3 = "Standard3"; + public static final String COUNTER_CF = "Counter1"; + public static final String CF_UUID = "UUIDKeys"; + public static final String CF_INDEX1 = "Indexed1"; + public static final String CF_INDEX2 = "Indexed2"; + + public static final String COL_KEYS_INDEX = "birthdate"; + public static final String COL_COMPOSITES_INDEX = "col1"; + public static final String COL_NON_INDEX = "notanindexcol"; + ++ public static final Integer COMPRESSION_CHUNK_LENGTH = 4096; + @BeforeClass - public static void loadSchema() throws ConfigurationException + public static void defineSchema() throws ConfigurationException { - loadSchema(COMPRESSION_CHUNK_LENGTH); + SchemaLoader.loadSchema(); + SchemaLoader.createKeyspace(KEYSPACE, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE, CF), + SchemaLoader.standardCFMD(KEYSPACE, CF2), + SchemaLoader.standardCFMD(KEYSPACE, CF3), + SchemaLoader.standardCFMD(KEYSPACE, COUNTER_CF) + .defaultValidator(CounterColumnType.instance) - .compressionParameters(SchemaLoader.getCompressionParameters()), ++ .compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)), + SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance), + SchemaLoader.indexCFMD(KEYSPACE, CF_INDEX1, true), + SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true)); } @Test @@@ -140,28 -112,88 +136,89 @@@ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF); cfs.clearUnsafe(); - fillCounterCF(cfs, 2); + fillCounterCF(cfs, numPartitions); - List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); - assertEquals(2, rows.size()); + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), numPartitions*10); + assertEquals(numPartitions, rows.size()); + - overrdeWithGarbage(cfs, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); + assertEquals(1, cfs.getSSTables().size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); + //make sure to override at most 1 chunk when compression is enabled + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); + // with skipCorrupted == false, the scrub is expected to fail - Scrubber scrubber = new Scrubber(cfs, sstable, false, false); - try + try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false)) { scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); } catch (IOError err) {} -- // with skipCorrupted == true, the corrupt row will be skipped ++ // with skipCorrupted == true, the corrupt rows will be skipped + Scrubber.ScrubResult scrubResult; - scrubber = new Scrubber(cfs, sstable, true, false); - scrubResult = scrubber.scrubWithResult(); - scrubber.close(); + try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false)) + { ++ scrubResult = scrubber.scrubWithResult(); ++ } + + assertNotNull(scrubResult); + + boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); + if (compression) + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(numPartitions, scrubResult.badRows + scrubResult.goodRows); + //because we only corrupted 1 chunk and we chose enough partitions to cover at least 3 chunks + assertTrue(scrubResult.goodRows >= scrubResult.badRows * 2); + } + else + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(1, scrubResult.badRows); + assertEquals(numPartitions-1, scrubResult.goodRows); + } + assertEquals(1, cfs.getSSTables().size()); + + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(scrubResult.goodRows, rows.size()); + } + + @Test + public void testScrubCorruptedRowInSmallFile() throws IOException, WriteTimeoutException + { + // cannot test this with compression + assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF); + cfs.clearUnsafe(); + + fillCounterCF(cfs, 2); + + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(2, rows.size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + // overwrite one row with garbage + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); + + // with skipCorrupted == false, the scrub is expected to fail + Scrubber scrubber = new Scrubber(cfs, sstable, false, false); + try + { scrubber.scrub(); + fail("Expected a CorruptSSTableException to be thrown"); } + catch (IOError err) {} + + // with skipCorrupted == true, the corrupt row will be skipped + scrubber = new Scrubber(cfs, sstable, true, false); + scrubber.scrub(); + scrubber.close(); assertEquals(1, cfs.getSSTables().size()); // verify that we can read all of the rows, and there is now one less row @@@ -170,21 -202,34 +227,49 @@@ } @Test + public void testScrubOneRowWithCorruptedKey() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + // cannot test this with compression + assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.clearUnsafe(); + + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, 4); + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + overrideWithGarbage(sstable, 0, 2); + + CompactionManager.instance.performScrub(cfs, false); + + // check data is still there + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + } + + @Test + public void testScrubCorruptedCounterRowNoEarlyOpen() throws IOException, WriteTimeoutException + { + long oldOpenVal = SSTableRewriter.getOpenInterval(); + try + { + SSTableRewriter.overrideOpenInterval(Long.MAX_VALUE); + testScrubCorruptedCounterRow(); + } + finally + { + SSTableRewriter.overrideOpenInterval(oldOpenVal); + } + } + + @Test public void testScrubDeletedRow() throws ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); @@@ -432,129 -471,4 +537,129 @@@ assertEquals("bar", iter.next().getString("c")); assertEquals("boo", iter.next().getString("c")); } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator, + // otherwise it uses LocalByPartitionerType + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubTwice() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true); + } + + /** The SecondaryIndex class is used for custom indexes so to avoid + * making a public final field into a private field with getters + * and setters, we resort to this hack in order to test it properly + * since it can have two values which influence the scrubbing behavior. + * @param comparator - the key comparator we want to test + */ + private void setKeyComparator(AbstractType<?> comparator) + { + try + { + Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator"); + keyComparator.setAccessible(true); + int modifiers = keyComparator.getModifiers(); + Field modifierField = keyComparator.getClass().getDeclaredField("modifiers"); + modifiers = modifiers & ~Modifier.FINAL; + modifierField.setAccessible(true); + modifierField.setInt(keyComparator, modifiers); + + keyComparator.set(null, comparator); + } + catch (Exception ex) + { + fail("Failed to change key comparator in secondary index : " + ex.getMessage()); + ex.printStackTrace(); + } + } + + private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs) + throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); + cfs.clearUnsafe(); + + int numRows = 1000; + long[] colValues = new long [numRows * 2]; // each row has two columns + for (int i = 0; i < colValues.length; i+=2) + { + colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column + colValues[i+1] = 3L; //other column + } + fillIndexCF(cfs, composite, colValues); + + // check index + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, ByteBufferUtil.bytes(1L)); + List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + + // scrub index + Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs(); + assertTrue(indexCfss.size() == 1); + for(ColumnFamilyStore indexCfs : indexCfss) + { + for (int i = 0; i < scrubs.length; i++) + { + boolean failure = !scrubs[i]; + if (failure) + { //make sure the next scrub fails - overrdeWithGarbage(indexCfs, ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L)); ++ overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L)); + } + CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true); + assertEquals(failure ? + CompactionManager.AllSSTableOpStatus.ABORTED : + CompactionManager.AllSSTableOpStatus.SUCCESSFUL, + result); + } + } + + + // check index is still working + rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + } }
