Repository: cassandra Updated Branches: refs/heads/trunk 3bfe4b6ae -> 49b089893
Handle corrupted compression chunks in scrub Patch by Stefania Alborghetti; reviewed by Tyler Hobbs for CASSANDRA-9140 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/593a7257 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/593a7257 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/593a7257 Branch: refs/heads/trunk Commit: 593a7257b5d5243c4ceb75b5417fb05e8a98cc90 Parents: a85e731 Author: Stefania Alborghetti <[email protected]> Authored: Thu Apr 30 13:02:23 2015 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Thu Apr 30 13:02:23 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/Scrubber.java | 131 ++++++++++++---- .../unit/org/apache/cassandra/SchemaLoader.java | 17 ++- .../unit/org/apache/cassandra/db/ScrubTest.java | 153 +++++++++++++++++-- 4 files changed, 256 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/593a7257/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9f90cc2..4e7a5d0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.15: + * Allow scrub to handle corrupted compressed chunks (CASSANDRA-9140) * Fix assertion error when resetlocalschema is run during repair (CASSANDRA-9249) * Disable single sstable tombstone compactions for DTCS by default (CASSANDRA-9234) * Do more agressive ttl expiration checks to be able to http://git-wip-us.apache.org/repos/asf/cassandra/blob/593a7257/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 6a61e56..1752b21 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.io.*; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import org.apache.cassandra.db.*; @@ -53,6 +54,11 @@ public class Scrubber implements Closeable private int badRows; private int emptyRows; + private ByteBuffer currentIndexKey; + private ByteBuffer nextIndexKey; + long currentRowPositionFromIndex; + long nextRowPositionFromIndex; + private final OutputHandler outputHandler; private static final Comparator<Row> rowComparator = new Comparator<Row>() @@ -99,6 +105,9 @@ public class Scrubber implements Closeable : sstable.openDataReader(CompactionManager.instance.getRateLimiter()); this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))); this.scrubInfo = new ScrubInfo(dataFile, sstable); + + this.currentRowPositionFromIndex = 0; + this.nextRowPositionFromIndex = 0; } public void scrub() @@ -106,7 +115,7 @@ public class Scrubber implements Closeable outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); try { - ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { // throw away variable so we don't have a side effect in the assert long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; @@ -122,6 +131,7 @@ public class Scrubber implements Closeable { if (scrubInfo.isStopRequested()) throw new CompactionInterruptedException(scrubInfo.getCompactionInfo()); + long rowStart = dataFile.getFilePointer(); outputHandler.debug("Reading row at " + rowStart); @@ -142,29 +152,19 @@ public class Scrubber implements Closeable // check for null key below } - ByteBuffer currentIndexKey = nextIndexKey; - long nextRowPositionFromIndex; - try - { - nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() - ? dataFile.length() - : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; - } - catch (Throwable th) - { - outputHandler.warn("Error reading index file", th); - nextIndexKey = null; - nextRowPositionFromIndex = dataFile.length(); - } + updateIndexKey(); long dataStart = dataFile.getFilePointer(); - long dataStartFromIndex = currentIndexKey == null - ? -1 - : rowStart + 2 + currentIndexKey.remaining(); - if (sstable.descriptor.version.hasRowSizeAndColumnCount) - dataStartFromIndex += 8; - long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + long dataStartFromIndex = -1; + long dataSizeFromIndex = -1; + if (currentIndexKey != null) + { + dataStartFromIndex = currentRowPositionFromIndex + 2 + currentIndexKey.remaining(); + if (sstable.descriptor.version.hasRowSizeAndColumnCount) + dataStartFromIndex += 8; + + dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + } if (!sstable.descriptor.version.hasRowSizeAndColumnCount) { @@ -186,8 +186,21 @@ public class Scrubber implements Closeable { if (key == null) throw new IOError(new IOException("Unable to read row key from data file")); + + if (!key.key.equals(currentIndexKey)) + { + throw new IOError(new IOException(String.format("Key from data file (%s) does not match key from index file (%s)", + ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)))); + } + if (dataSize > dataFile.length()) - throw new IOError(new IOException("Impossible row size " + dataSize)); + throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize)); + + if (dataStart != dataStartFromIndex) + outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataSizeFromIndex)); + + if (dataSize != dataSizeFromIndex) + outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex)); SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); if (prevKey != null && prevKey.compareTo(key) > 0) @@ -201,9 +214,8 @@ public class Scrubber implements Closeable emptyRows++; else goodRows++; + prevKey = key; - if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) - outputHandler.warn("Index file contained a different key or row size; using key from data file"); } catch (Throwable th) { @@ -219,6 +231,8 @@ public class Scrubber implements Closeable key = sstable.partitioner.decorateKey(currentIndexKey); try { + dataFile.seek(dataStartFromIndex); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); if (prevKey != null && prevKey.compareTo(key) > 0) { @@ -231,6 +245,7 @@ public class Scrubber implements Closeable emptyRows++; else goodRows++; + prevKey = key; } catch (Throwable th2) @@ -240,8 +255,8 @@ public class Scrubber implements Closeable outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); writer.resetAndTruncate(); - dataFile.seek(nextRowPositionFromIndex); badRows++; + seekToNextRow(); } } else @@ -249,9 +264,9 @@ public class Scrubber implements Closeable throwIfCommutative(key, th); outputHandler.warn("Row starting at position " + dataStart + " is unreadable; skipping to next"); - if (currentIndexKey != null) - dataFile.seek(nextRowPositionFromIndex); badRows++; + if (currentIndexKey != null) + seekToNextRow(); } } } @@ -294,6 +309,45 @@ public class Scrubber implements Closeable } } + private void updateIndexKey() + { + currentIndexKey = nextIndexKey; + currentRowPositionFromIndex = nextRowPositionFromIndex; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); + nextRowPositionFromIndex = indexFile.isEOF() + ? dataFile.length() + : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + } + catch (Throwable th) + { + outputHandler.warn("Error reading index file", th); + nextIndexKey = null; + nextRowPositionFromIndex = dataFile.length(); + } + } + + private void seekToNextRow() + { + while(nextRowPositionFromIndex < dataFile.length()) + { + try + { + dataFile.seek(nextRowPositionFromIndex); + return; + } + catch (Throwable th) + { + throwIfFatal(th); + outputHandler.warn(String.format("Failed to seek to next row position %d", nextRowPositionFromIndex), th); + badRows++; + } + + updateIndexKey(); + } + } + private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) { // TODO bitch if the row is too large? if it is there's not much we can do ... @@ -389,4 +443,25 @@ public class Scrubber implements Closeable return false; } } + + @VisibleForTesting + public ScrubResult scrubWithResult() + { + scrub(); + return new ScrubResult(this); + } + + public static final class ScrubResult + { + public final int goodRows; + public final int badRows; + public final int emptyRows; + + public ScrubResult(Scrubber scrubber) + { + this.goodRows = scrubber.goodRows; + this.badRows = scrubber.badRows; + this.emptyRows = scrubber.emptyRows; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/593a7257/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 7dea52c..6b34b9a 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -60,6 +60,11 @@ public class SchemaLoader public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException { + loadSchema(withOldCfIds, null); + } + + public static void loadSchema(boolean withOldCfIds, Integer compressionChunkLength) throws IOException, ConfigurationException + { // Cleanup first cleanupAndLeaveDirs(); @@ -78,7 +83,7 @@ public class SchemaLoader startGossiper(); // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly // Schema.instance.load(schemaDefinition(withOldCfIds)); - for (KSMetaData ksm : schemaDefinition(withOldCfIds)) + for (KSMetaData ksm : schemaDefinition(withOldCfIds, compressionChunkLength)) MigrationManager.announceNewKeyspace(ksm); } @@ -93,7 +98,7 @@ public class SchemaLoader Gossiper.instance.stop(); } - public static Collection<KSMetaData> schemaDefinition(boolean withOldCfIds) throws ConfigurationException + public static Collection<KSMetaData> schemaDefinition(boolean withOldCfIds, Integer compressionChunkLength) throws ConfigurationException { List<KSMetaData> schema = new ArrayList<KSMetaData>(); @@ -344,7 +349,7 @@ public class SchemaLoader if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))) - useCompression(schema); + useCompression(schema, compressionChunkLength); return schema; } @@ -368,13 +373,15 @@ public class SchemaLoader }}); } - private static void useCompression(List<KSMetaData> schema) + private static void useCompression(List<KSMetaData> schema, Integer chunkLength) throws ConfigurationException { for (KSMetaData ksm : schema) { for (CFMetaData cfm : ksm.cfMetaData().values()) { - cfm.compressionParameters(new CompressionParameters(SnappyCompressor.instance)); + cfm.compressionParameters(new CompressionParameters(SnappyCompressor.instance, + chunkLength, + Collections.<String, String>emptyMap())); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/593a7257/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 632ce1c..08237a4 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db; */ import java.io.*; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -31,13 +32,14 @@ import java.util.concurrent.ExecutionException; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -50,8 +52,10 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.ByteBufferUtil; +import static junit.framework.Assert.assertNotNull; import static org.apache.cassandra.Util.column; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -62,6 +66,13 @@ public class ScrubTest extends SchemaLoader public String CF = "Standard1"; public String CF3 = "Standard2"; public String COUNTER_CF = "Counter1"; + private static Integer COMPRESSION_CHUNK_LENGTH = 4096; + + @BeforeClass + public static void loadSchema() throws IOException, ConfigurationException + { + loadSchema(false, COMPRESSION_CHUNK_LENGTH); + } @Test public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException @@ -88,7 +99,69 @@ public class ScrubTest extends SchemaLoader @Test public void testScrubCorruptedCounterRow() throws IOException, InterruptedException, ExecutionException { - // skip the test when compression is enabled until CASSANDRA-9140 is complete + // When compression is enabled, for testing corrupted chunks we need enough partitions to cover + // at least 3 chunks of size COMPRESSION_CHUNK_LENGTH + int numPartitions = 1000; + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF); + cfs.clearUnsafe(); + + fillCounterCF(cfs, numPartitions); + + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), numPartitions*10); + assertEquals(numPartitions, rows.size()); + assertEquals(1, cfs.getSSTables().size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + //make sure to override at most 1 chunk when compression is enabled + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); + + // with skipCorrupted == false, the scrub is expected to fail + Scrubber scrubber = new Scrubber(cfs, sstable, false); + try + { + scrubber.scrub(); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (IOError err) {} + + // with skipCorrupted == true, the corrupt row will be skipped + Scrubber.ScrubResult scrubResult; + scrubber = new Scrubber(cfs, sstable, true); + scrubResult = scrubber.scrubWithResult(); + scrubber.close(); + cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB); + + assertNotNull(scrubResult); + + boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); + if (compression) + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(numPartitions, scrubResult.badRows + scrubResult.goodRows); + //because we only corrupted 1 chunk and we chose enough partitions to cover at least 3 chunks + assertTrue(scrubResult.goodRows >= scrubResult.badRows * 2); + } + else + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(1, scrubResult.badRows); + assertEquals(numPartitions-1, scrubResult.goodRows); + } + assertEquals(1, cfs.getSSTables().size()); + + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(scrubResult.goodRows, rows.size()); + } + + + @Test + public void testScrubCorruptedRowInSmallFile() throws IOException, InterruptedException, ExecutionException + { + // cannot test this with compression assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); CompactionManager.instance.disableAutoCompaction(); @@ -102,17 +175,7 @@ public class ScrubTest extends SchemaLoader assertEquals(2, rows.size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); - - // overwrite one row with garbage - long row0Start = sstable.getPosition(RowPosition.forKey(ByteBufferUtil.bytes("0"), sstable.partitioner), SSTableReader.Operator.EQ).position; - long row1Start = sstable.getPosition(RowPosition.forKey(ByteBufferUtil.bytes("1"), sstable.partitioner), SSTableReader.Operator.EQ).position; - long startPosition = row0Start < row1Start ? row0Start : row1Start; - long endPosition = row0Start < row1Start ? row1Start : row0Start; - - RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw"); - file.seek(startPosition); - file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition))); - file.close(); + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); // with skipCorrupted == false, the scrub is expected to fail Scrubber scrubber = new Scrubber(cfs, sstable, false); @@ -136,6 +199,34 @@ public class ScrubTest extends SchemaLoader } @Test + public void testScrubOneRowWithCorruptedKey() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + // cannot test this with compression + assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.clearUnsafe(); + + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, 4); + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + overrideWithGarbage(sstable, 0, 2); + + CompactionManager.instance.performScrub(cfs, false); + + // check data is still there + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + } + + @Test public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException { CompactionManager.instance.disableAutoCompaction(); @@ -239,6 +330,42 @@ public class ScrubTest extends SchemaLoader assert rows.size() == 6 : "Got " + rows.size(); } + private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException + { + boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); + long startPosition, endPosition; + + if (compression) + { // overwrite with garbage the compression chunks from key1 to key2 + CompressionMetadata compData = CompressionMetadata.create(sstable.getFilename()); + + CompressionMetadata.Chunk chunk1 = compData.chunkFor( + sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner), SSTableReader.Operator.EQ).position); + CompressionMetadata.Chunk chunk2 = compData.chunkFor( + sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner), SSTableReader.Operator.EQ).position); + + startPosition = Math.min(chunk1.offset, chunk2.offset); + endPosition = Math.max(chunk1.offset + chunk1.length, chunk2.offset + chunk2.length); + } + else + { // overwrite with garbage from key1 to key2 + long row0Start = sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner), SSTableReader.Operator.EQ).position; + long row1Start = sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner), SSTableReader.Operator.EQ).position; + startPosition = Math.min(row0Start, row1Start); + endPosition = Math.max(row0Start, row1Start); + } + + overrideWithGarbage(sstable, startPosition, endPosition); + } + + private void overrideWithGarbage(SSTable sstable, long startPosition, long endPosition) throws IOException + { + RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw"); + file.seek(startPosition); + file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition))); + file.close(); + } + private static boolean isRowOrdered(List<Row> rows) { DecoratedKey prev = null;
