Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db127a10 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db127a10 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db127a10 Branch: refs/heads/cassandra-2.1 Commit: db127a1073cfec426019c06d8f33e6c44292949f Parents: 80c5191 593a725 Author: Tyler Hobbs <[email protected]> Authored: Thu Apr 30 13:09:11 2015 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Thu Apr 30 13:09:11 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/Scrubber.java | 127 +++++++++++---- .../unit/org/apache/cassandra/SchemaLoader.java | 17 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 156 +++++++++++++++++-- 4 files changed, 256 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index cb235a4,4e7a5d0..c063368 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,98 -1,11 +1,99 @@@ -2.0.15: +2.1.6 + * Fix PITR commitlog replay (CASSANDRA-9195) + * GCInspector logs very different times (CASSANDRA-9124) + * Fix deleting from an empty list (CASSANDRA-9198) + * Update tuple and collection types that use a user-defined type when that UDT + is modified (CASSANDRA-9148, CASSANDRA-9192) +Merged from 2.0: + * Allow scrub to handle corrupted compressed chunks (CASSANDRA-9140) * Fix assertion error when resetlocalschema is run during repair (CASSANDRA-9249) * Disable single sstable tombstone compactions for DTCS by default (CASSANDRA-9234) - * Do more agressive ttl expiration checks to be able to - drop more sstables (CASSANDRA-8243) * IncomingTcpConnection thread is not named (CASSANDRA-9262) * Close incoming connections when MessagingService is stopped (CASSANDRA-9238) + +2.1.5 + * Re-add deprecated cold_reads_to_omit param for backwards compat (CASSANDRA-9203) + * Make anticompaction visible in compactionstats (CASSANDRA-9098) + * Improve nodetool getendpoints documentation about the partition + key parameter (CASSANDRA-6458) + * Don't check other keyspaces for schema changes when an user-defined + type is altered (CASSANDRA-9187) + * Allow takeColumnFamilySnapshot to take a list of tables (CASSANDRA-8348) + * Limit major sstable operations to their canonical representation (CASSANDRA-8669) + * cqlsh: Add tests for INSERT and UPDATE tab completion (CASSANDRA-9125) + * cqlsh: quote column names when needed in COPY FROM inserts (CASSANDRA-9080) + * Add generate-idea-files target to build.xml (CASSANDRA-9123) + * Do not load read meter for offline operations (CASSANDRA-9082) + * cqlsh: Make CompositeType data readable (CASSANDRA-8919) + * cqlsh: Fix display of triggers (CASSANDRA-9081) + * Fix NullPointerException when deleting or setting an element by index on + a null list collection (CASSANDRA-9077) + * Buffer bloom filter serialization (CASSANDRA-9066) + * Fix anti-compaction target bloom filter size (CASSANDRA-9060) + * Make FROZEN and TUPLE unreserved keywords in CQL (CASSANDRA-9047) + * Prevent AssertionError from SizeEstimatesRecorder (CASSANDRA-9034) + * Avoid overwriting index summaries for sstables with an older format that + does not support downsampling; rebuild summaries on startup when this + is detected (CASSANDRA-8993) + * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949) + * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085) + * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900) + * Check for overlap with non-early sstables in LCS (CASSANDRA-8739) + * Only calculate max purgable timestamp if we have to (CASSANDRA-8914) + * (cqlsh) Greatly improve performance of COPY FROM (CASSANDRA-8225) + * IndexSummary effectiveIndexInterval is now a guideline, not a rule (CASSANDRA-8993) + * Use correct bounds for page cache eviction of compressed files (CASSANDRA-8746) + * SSTableScanner enforces its bounds (CASSANDRA-8946) + * Cleanup cell equality (CASSANDRA-8947) + * Introduce intra-cluster message coalescing (CASSANDRA-8692) + * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839) + * Don't check if an sstable is live for offline compactions (CASSANDRA-8841) + * Don't set clientMode in SSTableLoader (CASSANDRA-8238) + * Fix SSTableRewriter with disabled early open (CASSANDRA-8535) + * Allow invalidating permissions and cache time (CASSANDRA-8722) + * Log warning when queries that will require ALLOW FILTERING in Cassandra 3.0 + are executed (CASSANDRA-8418) + * Fix cassandra-stress so it respects the CL passed in user mode (CASSANDRA-8948) + * Fix rare NPE in ColumnDefinition#hasIndexOption() (CASSANDRA-8786) + * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769) + * Use long for key count in cfstats (CASSANDRA-8913) + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832) + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860) + * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883) + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834) + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067) + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366) + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856) + * Fix parallelism adjustment in range and secondary index queries + when the first fetch does not satisfy the limit (CASSANDRA-8856) + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843) + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842) + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812) + * Improve assertions in Memory (CASSANDRA-8792) + * Fix SSTableRewriter cleanup (CASSANDRA-8802) + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758) + * 'nodetool info' prints exception against older node (CASSANDRA-8796) + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750) + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747) + * Enforce SSTableReader.first/last (CASSANDRA-8744) + * Cleanup SegmentedFile API (CASSANDRA-8749) + * Avoid overlap with early compaction replacement (CASSANDRA-8683) + * Safer Resource Management++ (CASSANDRA-8707) + * Write partition size estimates into a system table (CASSANDRA-7688) + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output + (CASSANDRA-8154) + * Show progress of streaming in nodetool netstats (CASSANDRA-8886) + * IndexSummaryBuilder utilises offheap memory, and shares data between + each IndexSummary opened from it (CASSANDRA-8757) + * markCompacting only succeeds if the exact SSTableReader instances being + marked are in the live set (CASSANDRA-8689) + * cassandra-stress support for varint (CASSANDRA-8882) + * Fix Adler32 digest for compressed sstables (CASSANDRA-8778) + * Add nodetool statushandoff/statusbackup (CASSANDRA-8912) + * Use stdout for progress and stats in sstableloader (CASSANDRA-8982) + * Correctly identify 2i datadir from older versions (CASSANDRA-9116) +Merged from 2.0: + * Ignore gossip SYNs after shutdown (CASSANDRA-9238) * Avoid overflow when calculating max sstable size in LCS (CASSANDRA-9235) * Make sstable blacklisting work with compression (CASSANDRA-9138) * Do not attempt to rebuild indexes if no index accepts any column (CASSANDRA-9196) http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 2f53ab9,1752b21..1f5c7de --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -21,8 -21,8 +21,9 @@@ import java.nio.ByteBuffer import java.io.*; import java.util.*; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.*; @@@ -109,14 -113,12 +118,14 @@@ public class Scrubber implements Closea public void scrub() { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); + Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); + SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline); try { - ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; } @@@ -143,33 -152,32 +153,22 @@@ // check for null key below } - ByteBuffer currentIndexKey = nextIndexKey; - long nextRowPositionFromIndex; - try - { - nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() - ? dataFile.length() - : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; - } - catch (Throwable th) - { - JVMStabilityInspector.inspectThrowable(th); - outputHandler.warn("Error reading index file", th); - nextIndexKey = null; - nextRowPositionFromIndex = dataFile.length(); - } + updateIndexKey(); long dataStart = dataFile.getFilePointer(); - long dataStartFromIndex = currentIndexKey == null - ? -1 - : rowStart + 2 + currentIndexKey.remaining(); - long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; ++ + long dataStartFromIndex = -1; + long dataSizeFromIndex = -1; + if (currentIndexKey != null) + { + dataStartFromIndex = currentRowPositionFromIndex + 2 + currentIndexKey.remaining(); - if (sstable.descriptor.version.hasRowSizeAndColumnCount) - dataStartFromIndex += 8; - + dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + } - if (!sstable.descriptor.version.hasRowSizeAndColumnCount) - { - dataSize = dataSizeFromIndex; - // avoid an NPE if key is null - String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key); - outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); - } - else - { - if (currentIndexKey != null) - outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex)); - } + dataSize = dataSizeFromIndex; + // avoid an NPE if key is null + String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey()); + outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); assert currentIndexKey != null || indexFile.isEOF(); @@@ -177,9 -186,22 +176,22 @@@ { if (key == null) throw new IOError(new IOException("Unable to read row key from data file")); + - if (!key.key.equals(currentIndexKey)) ++ if (!key.getKey().equals(currentIndexKey)) + { + throw new IOError(new IOException(String.format("Key from data file (%s) does not match key from index file (%s)", - ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)))); ++ ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey)))); + } + if (dataSize > dataFile.length()) - throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize)); + throw new IOError(new IOException("Impossible row size " + dataSize)); + if (dataStart != dataStartFromIndex) - outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataSizeFromIndex)); ++ outputHandler.warn(String.format("Data file row position %d different from index file row position %d", dataStart, dataSizeFromIndex)); + + if (dataSize != dataSizeFromIndex) - outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex)); ++ outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex)); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); if (prevKey != null && prevKey.compareTo(key) > 0) { @@@ -229,8 -254,9 +243,8 @@@ throwIfCommutative(key, th2); outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); - dataFile.seek(nextRowPositionFromIndex); - writer.resetAndTruncate(); badRows++; + seekToNextRow(); } } else @@@ -290,6 -309,45 +304,46 @@@ } } + private void updateIndexKey() + { + currentIndexKey = nextIndexKey; + currentRowPositionFromIndex = nextRowPositionFromIndex; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); + nextRowPositionFromIndex = indexFile.isEOF() + ? dataFile.length() - : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; ++ : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile, sstable.descriptor.version).position; + } + catch (Throwable th) + { ++ JVMStabilityInspector.inspectThrowable(th); + outputHandler.warn("Error reading index file", th); + nextIndexKey = null; + nextRowPositionFromIndex = dataFile.length(); + } + } + + private void seekToNextRow() + { + while(nextRowPositionFromIndex < dataFile.length()) + { + try + { + dataFile.seek(nextRowPositionFromIndex); + return; + } + catch (Throwable th) + { + throwIfFatal(th); + outputHandler.warn(String.format("Failed to seek to next row position %d", nextRowPositionFromIndex), th); + badRows++; + } + + updateIndexKey(); + } + } + private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) { // TODO bitch if the row is too large? if it is there's not much we can do ... @@@ -380,9 -438,30 +434,30 @@@ } @Override - public boolean shouldPurge(DecoratedKey key, long delTimestamp) + public long maxPurgeableTimestamp(DecoratedKey key) { - return false; + return Long.MIN_VALUE; } } + + @VisibleForTesting + public ScrubResult scrubWithResult() + { + scrub(); + return new ScrubResult(this); + } + + public static final class ScrubResult + { + public final int goodRows; + public final int badRows; + public final int emptyRows; + + public ScrubResult(Scrubber scrubber) + { + this.goodRows = scrubber.goodRows; + this.badRows = scrubber.badRows; + this.emptyRows = scrubber.emptyRows; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/SchemaLoader.java index db1758f,6b34b9a..c6a3855 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@@ -51,30 -53,17 +51,35 @@@ public class SchemaLoade private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class); @BeforeClass - public static void loadSchema() throws IOException, ConfigurationException + public static void loadSchema() throws ConfigurationException { - loadSchema(false); ++ loadSchema(null); ++ } ++ ++ public static void loadSchema(Integer compressionChunkLength) throws ConfigurationException ++ { + prepareServer(); + + // Migrations aren't happy if gossiper is not started. Even if we don't use migrations though, + // some tests now expect us to start gossip for them. + startGossiper(); + + // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly + // Schema.instance.load(schemaDefinition()); - for (KSMetaData ksm : schemaDefinition()) ++ for (KSMetaData ksm : schemaDefinition(compressionChunkLength)) + MigrationManager.announceNewKeyspace(ksm); } - public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException + @After + public void leakDetect() throws InterruptedException { - loadSchema(withOldCfIds, null); + System.gc(); + System.gc(); + System.gc(); + Thread.sleep(10); } - public static void loadSchema(boolean withOldCfIds, Integer compressionChunkLength) throws IOException, ConfigurationException + public static void prepareServer() { // Cleanup first cleanupAndLeaveDirs(); @@@ -103,7 -98,7 +108,7 @@@ Gossiper.instance.stop(); } - public static Collection<KSMetaData> schemaDefinition() throws ConfigurationException - public static Collection<KSMetaData> schemaDefinition(boolean withOldCfIds, Integer compressionChunkLength) throws ConfigurationException ++ public static Collection<KSMetaData> schemaDefinition(Integer compressionChunkLength) throws ConfigurationException { List<KSMetaData> schema = new ArrayList<KSMetaData>(); @@@ -355,15 -359,21 +360,15 @@@ final Map<String, String> indexOptions = Collections.singletonMap( SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, PerRowSecondaryIndexTest.TestIndex.class.getName()); - return standardCFMD(ksName, cfName) - .keyValidator(AsciiType.instance) - .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>() - {{ - ByteBuffer cName = ByteBuffer.wrap("indexed".getBytes(StandardCharsets.UTF_8)); - put(cName, new ColumnDefinition(cName, - AsciiType.instance, - IndexType.CUSTOM, - indexOptions, - ByteBufferUtil.bytesToHex(cName), - null, ColumnDefinition.Type.REGULAR)); - }}); + + CFMetaData cfm = CFMetaData.sparseCFMetaData(ksName, cfName, AsciiType.instance).keyValidator(AsciiType.instance); + + ByteBuffer cName = ByteBufferUtil.bytes("indexed"); + return cfm.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(cfm, cName, AsciiType.instance, null) + .setIndex("indexe1", IndexType.CUSTOM, indexOptions)); } - private static void useCompression(List<KSMetaData> schema) + private static void useCompression(List<KSMetaData> schema, Integer chunkLength) throws ConfigurationException { for (KSMetaData ksm : schema) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/db127a10/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index 383b1d0,08237a4..a19c76d --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -31,9 -30,12 +30,12 @@@ import java.util.Set import java.util.concurrent.ExecutionException; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.db.compaction.OperationType; ++import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.RequestExecutionException; + import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@@ -43,18 -45,17 +45,21 @@@ import org.apache.cassandra.SchemaLoade import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.compaction.Scrubber; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.db.compaction.Scrubber; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; -import static junit.framework.Assert.assertNotNull; +import static org.apache.cassandra.Util.cellname; import static org.apache.cassandra.Util.column; ++ ++import static junit.framework.Assert.assertNotNull; import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@@ -65,9 -66,16 +70,16 @@@ public class ScrubTest extends SchemaLo public String CF = "Standard1"; public String CF3 = "Standard2"; public String COUNTER_CF = "Counter1"; + private static Integer COMPRESSION_CHUNK_LENGTH = 4096; + + @BeforeClass - public static void loadSchema() throws IOException, ConfigurationException ++ public static void loadSchema() throws ConfigurationException + { - loadSchema(false, COMPRESSION_CHUNK_LENGTH); ++ loadSchema(COMPRESSION_CHUNK_LENGTH); + } @Test - public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException + public void testScrubOneRow() throws ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); Keyspace keyspace = Keyspace.open(KEYSPACE); @@@ -89,9 -97,71 +101,69 @@@ } @Test - public void testScrubCorruptedCounterRow() throws IOException, InterruptedException, ExecutionException + public void testScrubCorruptedCounterRow() throws IOException, WriteTimeoutException { - // skip the test when compression is enabled until CASSANDRA-9140 is complete + // When compression is enabled, for testing corrupted chunks we need enough partitions to cover + // at least 3 chunks of size COMPRESSION_CHUNK_LENGTH + int numPartitions = 1000; + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF); + cfs.clearUnsafe(); + + fillCounterCF(cfs, numPartitions); + + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), numPartitions*10); + assertEquals(numPartitions, rows.size()); + assertEquals(1, cfs.getSSTables().size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + //make sure to override at most 1 chunk when compression is enabled + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); + + // with skipCorrupted == false, the scrub is expected to fail - Scrubber scrubber = new Scrubber(cfs, sstable, false); ++ Scrubber scrubber = new Scrubber(cfs, sstable, false, false); + try + { + scrubber.scrub(); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (IOError err) {} + + // with skipCorrupted == true, the corrupt row will be skipped + Scrubber.ScrubResult scrubResult; - scrubber = new Scrubber(cfs, sstable, true); ++ scrubber = new Scrubber(cfs, sstable, true, false); + scrubResult = scrubber.scrubWithResult(); + scrubber.close(); - cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB); + + assertNotNull(scrubResult); + + boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); + if (compression) + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(numPartitions, scrubResult.badRows + scrubResult.goodRows); + //because we only corrupted 1 chunk and we chose enough partitions to cover at least 3 chunks + assertTrue(scrubResult.goodRows >= scrubResult.badRows * 2); + } + else + { + assertEquals(0, scrubResult.emptyRows); + assertEquals(1, scrubResult.badRows); + assertEquals(numPartitions-1, scrubResult.goodRows); + } + assertEquals(1, cfs.getSSTables().size()); + + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(scrubResult.goodRows, rows.size()); + } + - + @Test - public void testScrubCorruptedRowInSmallFile() throws IOException, InterruptedException, ExecutionException ++ public void testScrubCorruptedRowInSmallFile() throws IOException, WriteTimeoutException + { + // cannot test this with compression assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); CompactionManager.instance.disableAutoCompaction(); @@@ -105,20 -175,10 +177,12 @@@ assertEquals(2, rows.size()); SSTableReader sstable = cfs.getSSTables().iterator().next(); + + // overwrite one row with garbage - long row0Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("0"), sstable.partitioner), SSTableReader.Operator.EQ).position; - long row1Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("1"), sstable.partitioner), SSTableReader.Operator.EQ).position; - long startPosition = row0Start < row1Start ? row0Start : row1Start; - long endPosition = row0Start < row1Start ? row1Start : row0Start; - - RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw"); - file.seek(startPosition); - file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition))); - file.close(); + overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); // with skipCorrupted == false, the scrub is expected to fail - Scrubber scrubber = new Scrubber(cfs, sstable, false); + Scrubber scrubber = new Scrubber(cfs, sstable, false, false); try { scrubber.scrub(); @@@ -138,7 -199,35 +202,35 @@@ } @Test + public void testScrubOneRowWithCorruptedKey() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + // cannot test this with compression + assumeTrue(!Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))); + + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.clearUnsafe(); + + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, 4); + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + overrideWithGarbage(sstable, 0, 2); + + CompactionManager.instance.performScrub(cfs, false); + + // check data is still there + rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + assertEquals(4, rows.size()); + } + + @Test - public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException + public void testScrubDeletedRow() throws ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); Keyspace keyspace = Keyspace.open(KEYSPACE); @@@ -241,6 -330,42 +333,42 @@@ assert rows.size() == 6 : "Got " + rows.size(); } + private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException + { + boolean compression = Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false")); + long startPosition, endPosition; + + if (compression) + { // overwrite with garbage the compression chunks from key1 to key2 + CompressionMetadata compData = CompressionMetadata.create(sstable.getFilename()); + + CompressionMetadata.Chunk chunk1 = compData.chunkFor( - sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner), SSTableReader.Operator.EQ).position); ++ sstable.getPosition(RowPosition.ForKey.get(key1, sstable.partitioner), SSTableReader.Operator.EQ).position); + CompressionMetadata.Chunk chunk2 = compData.chunkFor( - sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner), SSTableReader.Operator.EQ).position); ++ sstable.getPosition(RowPosition.ForKey.get(key2, sstable.partitioner), SSTableReader.Operator.EQ).position); + + startPosition = Math.min(chunk1.offset, chunk2.offset); + endPosition = Math.max(chunk1.offset + chunk1.length, chunk2.offset + chunk2.length); + } + else + { // overwrite with garbage from key1 to key2 - long row0Start = sstable.getPosition(RowPosition.forKey(key1, sstable.partitioner), SSTableReader.Operator.EQ).position; - long row1Start = sstable.getPosition(RowPosition.forKey(key2, sstable.partitioner), SSTableReader.Operator.EQ).position; ++ long row0Start = sstable.getPosition(RowPosition.ForKey.get(key1, sstable.partitioner), SSTableReader.Operator.EQ).position; ++ long row1Start = sstable.getPosition(RowPosition.ForKey.get(key2, sstable.partitioner), SSTableReader.Operator.EQ).position; + startPosition = Math.min(row0Start, row1Start); + endPosition = Math.max(row0Start, row1Start); + } + + overrideWithGarbage(sstable, startPosition, endPosition); + } + - private void overrideWithGarbage(SSTable sstable, long startPosition, long endPosition) throws IOException ++ private void overrideWithGarbage(SSTableReader sstable, long startPosition, long endPosition) throws IOException + { + RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw"); + file.seek(startPosition); + file.writeBytes(StringUtils.repeat('z', (int) (endPosition - startPosition))); + file.close(); + } + private static boolean isRowOrdered(List<Row> rows) { DecoratedKey prev = null;
