Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4364075 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4364075 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4364075 Branch: refs/heads/cassandra-3.0 Commit: d4364075dcc82c81990d98b39ed96d91ad40bf98 Parents: 89c558a 7d7ff7f Author: Marcus Eriksson <[email protected]> Authored: Thu Feb 11 08:28:26 2016 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Thu Feb 11 08:28:26 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 12 +++--------- .../org/apache/cassandra/db/compaction/Scrubber.java | 12 ++++-------- .../org/apache/cassandra/tools/StandaloneScrubber.java | 2 +- test/unit/org/apache/cassandra/db/ScrubTest.java | 12 ++++++------ 5 files changed, 15 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4364075/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4364075/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4364075/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 272c2f8,e9137e2..3dea9d9 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -57,8 -56,9 +57,6 @@@ public class Scrubber implements Closea private final ScrubInfo scrubInfo; private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; - private final boolean isOffline; - private SSTableReader newSstable; - private SSTableReader newInOrderSstable; -- private int goodRows; private int badRows; private int emptyRows; @@@ -70,37 -70,29 +68,35 @@@ private final OutputHandler outputHandler; - private static final Comparator<Row> rowComparator = new Comparator<Row>() + private static final Comparator<Partition> partitionComparator = new Comparator<Partition>() { - public int compare(Row r1, Row r2) + public int compare(Partition r1, Partition r2) { - return r1.key.compareTo(r2.key); + return r1.partitionKey().compareTo(r2.partitionKey()); } }; - private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator); + private final SortedSet<Partition> outOfOrder = new TreeSet<>(partitionComparator); - public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException + public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean checkData) throws IOException { - this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); + this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), checkData); } @SuppressWarnings("resource") - public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean checkData) throws IOException + public Scrubber(ColumnFamilyStore cfs, + LifecycleTransaction transaction, + boolean skipCorrupted, + OutputHandler outputHandler, - boolean isOffline, + boolean checkData) throws IOException { this.cfs = cfs; this.transaction = transaction; this.sstable = transaction.onlyOne(); this.outputHandler = outputHandler; this.skipCorrupted = skipCorrupted; - this.isOffline = isOffline; - this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata, + sstable.descriptor.version, + sstable.header); List<SSTableReader> toScrub = Collections.singletonList(sstable); @@@ -141,17 -137,10 +137,17 @@@ this.nextRowPositionFromIndex = 0; } + private UnfilteredRowIterator withValidation(UnfilteredRowIterator iter, String filename) + { + return checkData ? UnfilteredRowIterators.withValidation(iter, filename) : iter; + } + public void scrub() { + List<SSTableReader> finished = new ArrayList<>(); + boolean completed = false; outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); - try (SSTableRewriter writer = new SSTableRewriter(transaction, sstable.maxDataAge, isOffline)) - try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, transaction.isOffline())) ++ try (SSTableRewriter writer = new SSTableRewriter(transaction, sstable.maxDataAge, transaction.isOffline())) { nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null; if (indexAvailable()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4364075/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4364075/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index d5baec8,c0cde40..6dbbb1b --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -145,7 -167,7 +145,7 @@@ public class ScrubTes // with skipCorrupted == false, the scrub is expected to fail try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); - Scrubber scrubber = new Scrubber(cfs, txn, false, false, true)) - Scrubber scrubber = new Scrubber(cfs, txn, false, true);) ++ Scrubber scrubber = new Scrubber(cfs, txn, false, true)) { scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); @@@ -155,7 -177,7 +155,7 @@@ // with skipCorrupted == true, the corrupt rows will be skipped Scrubber.ScrubResult scrubResult; try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); - Scrubber scrubber = new Scrubber(cfs, txn, true, false, true)) - Scrubber scrubber = new Scrubber(cfs, txn, true, true);) ++ Scrubber scrubber = new Scrubber(cfs, txn, true, true)) { scrubResult = scrubber.scrubWithResult(); } @@@ -289,96 -340,146 +289,96 @@@ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); cfs.clearUnsafe(); - List<Row> rows; - // insert data and verify we get it back w/ range query fillCF(cfs, 10); - rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); - assertEquals(10, rows.size()); + assertOrderedAll(cfs, 10); - for (SSTableReader sstable : cfs.getSSTables()) + for (SSTableReader sstable : cfs.getLiveSSTables()) new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)).delete(); - CompactionManager.instance.performScrub(cfs, false, true, true); + CompactionManager.instance.performScrub(cfs, false, true); // check data is still there - rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); - assertEquals(10, rows.size()); + assertOrderedAll(cfs, 10); } @Test public void testScrubOutOfOrder() throws Exception { - CompactionManager.instance.disableAutoCompaction(); - Keyspace keyspace = Keyspace.open(KEYSPACE); - String columnFamily = CF3; - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily); - cfs.clearUnsafe(); - - /* - * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out. - * The test also assumes an ordered partitioner. - * - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); - cf.addColumn(new BufferCell(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L)); - - SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))), - cfs.metadata.getIndexInterval(), - cfs.metadata, - cfs.partitioner, - SSTableMetadata.createCollector(BytesType.instance)); - writer.append(Util.dk("a"), cf); - writer.append(Util.dk("b"), cf); - writer.append(Util.dk("z"), cf); - writer.append(Util.dk("c"), cf); - writer.append(Util.dk("y"), cf); - writer.append(Util.dk("d"), cf); - writer.finish(); - */ - - String root = System.getProperty("corrupt-sstable-root"); - assert root != null; - - File rootDir = new File(root); - assert rootDir.isDirectory(); - Descriptor desc = new Descriptor("jb", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY); - CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); - + // This test assumes ByteOrderPartitioner to create out-of-order SSTable + IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner(); + DatabaseDescriptor.setPartitionerUnsafe(new ByteOrderedPartitioner()); + + // Create out-of-order SSTable + File tempDir = File.createTempFile("ScrubTest.testScrubOutOfOrder", "").getParentFile(); + // create ks/cf directory + File tempDataDir = new File(tempDir, String.join(File.separator, KEYSPACE, CF3)); + tempDataDir.mkdirs(); try { - SSTableReader.open(desc, metadata); - fail("SSTR validation should have caught the out-of-order rows"); - } - catch (IllegalStateException ise) { /* this is expected */ } - - // open without validation for scrubbing - Set<Component> components = new HashSet<>(); - components.add(Component.COMPRESSION_INFO); - components.add(Component.DATA); - components.add(Component.PRIMARY_INDEX); - components.add(Component.FILTER); - components.add(Component.STATS); - components.add(Component.SUMMARY); - components.add(Component.TOC); - - SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); - if (sstable.last.compareTo(sstable.first) < 0) - sstable.last = sstable.first; - - try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); - Scrubber scrubber = new Scrubber(cfs, txn, false, true);) - { - scrubber.scrub(); - } - cfs.loadNewSSTables(); - List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); - assert isRowOrdered(rows) : "Scrub failed: " + rows; - assert rows.size() == 6 : "Got " + rows.size(); - } - - @Test - public void testScrub10791() throws Exception - { - // Table is created by StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix disabled. - CompactionManager.instance.disableAutoCompaction(); - Keyspace keyspace = Keyspace.open(KEYSPACE); - String columnFamily = CFI1; - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily); - cfs.clearUnsafe(); + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + String columnFamily = CF3; + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily); + cfs.clearUnsafe(); + + List<String> keys = Arrays.asList("t", "a", "b", "z", "c", "y", "d"); + String filename = cfs.getSSTablePath(tempDataDir); + Descriptor desc = Descriptor.fromFilename(filename); + + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); + try (SSTableTxnWriter writer = new SSTableTxnWriter(txn, createTestWriter(desc, (long) keys.size(), cfs.metadata, txn))) + { - String root = System.getProperty("corrupt-sstable-root"); - assert root != null; - File rootDir = new File(root); - assert rootDir.isDirectory(); - Descriptor desc = new Descriptor("ka", rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY); - - // open without validation for scrubbing - Set<Component> components = new HashSet<>(); - components.add(Component.DATA); - components.add(Component.PRIMARY_INDEX); - components.add(Component.FILTER); - components.add(Component.STATS); - components.add(Component.SUMMARY); - components.add(Component.TOC); - SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); - - try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); - Scrubber scrubber = new Scrubber(cfs, txn, false, true);) - { - scrubber.scrub(); - } + for (String k : keys) + { + PartitionUpdate update = UpdateBuilder.create(cfs.metadata, Util.dk(k)) + .newRow("someName").add("val", "someValue") + .build(); - cfs.loadNewSSTables(); - assertEquals(7, countCells(cfs)); - } + writer.append(update.unfilteredIterator()); + } + writer.finish(false); + } - private int countCells(ColumnFamilyStore cfs) - { - int cellCount = 0; - for (SSTableReader sstable : cfs.getSSTables()) - { - Iterator<OnDiskAtomIterator> it = sstable.getScanner(); - while (it.hasNext()) + try { - Iterator<OnDiskAtom> itr = it.next(); - while (itr.hasNext()) - { - ++cellCount; - itr.next(); - } + SSTableReader.open(desc, cfs.metadata); + fail("SSTR validation should have caught the out-of-order rows"); + } + catch (IllegalStateException ise) + { /* this is expected */ } + + // open without validation for scrubbing + Set<Component> components = new HashSet<>(); + if (new File(desc.filenameFor(Component.COMPRESSION_INFO)).exists()) + components.add(Component.COMPRESSION_INFO); + components.add(Component.DATA); + components.add(Component.PRIMARY_INDEX); + components.add(Component.FILTER); + components.add(Component.STATS); + components.add(Component.SUMMARY); + components.add(Component.TOC); + + SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); + if (sstable.last.compareTo(sstable.first) < 0) + sstable.last = sstable.first; + + try (LifecycleTransaction scrubTxn = LifecycleTransaction.offline(OperationType.SCRUB, sstable); - Scrubber scrubber = new Scrubber(cfs, scrubTxn, false, true, true)) ++ Scrubber scrubber = new Scrubber(cfs, scrubTxn, false, true)) + { + scrubber.scrub(); } + LifecycleTransaction.waitForDeletions(); + cfs.loadNewSSTables(); + assertOrderedAll(cfs, 7); + } + finally + { + FileUtils.deleteRecursive(tempDataDir); + // reset partitioner + DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner); } - return cellCount; } private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, ByteBuffer key2) throws IOException
