Repository: cassandra Updated Branches: refs/heads/trunk 137016566 -> 2d7909dc1
New tool added to verify all data in sstables. Patch by Jeff Jirsa; reviewed by Branimir Lambov for CASSANDRA-5791 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2d7909dc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2d7909dc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2d7909dc Branch: refs/heads/trunk Commit: 2d7909dc12dc038a65742ca03d375140c5257158 Parents: 1370165 Author: T Jake Luciani <j...@apache.org> Authored: Tue Mar 31 09:53:33 2015 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Tue Mar 31 09:53:33 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + bin/sstableverify | 55 +++ bin/sstableverify.bat | 41 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 5 + .../db/compaction/CompactionManager.java | 36 ++ .../cassandra/db/compaction/OperationType.java | 3 +- .../cassandra/db/compaction/Verifier.java | 280 ++++++++++++ .../io/compress/CompressedSequentialWriter.java | 2 +- .../apache/cassandra/io/sstable/Component.java | 4 +- .../io/util/ChecksummedSequentialWriter.java | 2 +- .../io/util/DataIntegrityMetadata.java | 86 ++-- .../cassandra/service/StorageService.java | 12 + .../cassandra/service/StorageServiceMBean.java | 8 + .../org/apache/cassandra/tools/NodeProbe.java | 19 +- .../org/apache/cassandra/tools/NodeTool.java | 33 +- .../cassandra/tools/StandaloneVerifier.java | 222 ++++++++++ .../org/apache/cassandra/db/VerifyTest.java | 428 +++++++++++++++++++ 18 files changed, 1206 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b1d7da5..beb05ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * New tool added to validate all sstables in a node (CASSANDRA-5791) * Push notification when tracing completes for an operation (CASSANDRA-7807) * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) * Compressed Commit Log (CASSANDRA-6809) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 641be77..3af2f92 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,8 @@ using the provided 'sstableupgrade' tool. New features ------------ + - A new tool has been added bin/sstableverify that checks for errors/bitrot + in all sstables. Unlike scrub, this is a non-invasive tool. - Authentication & Authorization APIs have been updated to introduce roles. Roles and Permissions granted to them are inherited, supporting role based access control. The role concept supercedes that of users http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/bin/sstableverify ---------------------------------------------------------------------- diff --git a/bin/sstableverify b/bin/sstableverify new file mode 100755 index 0000000..c3e40c7 --- /dev/null +++ b/bin/sstableverify @@ -0,0 +1,55 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + for include in /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh \ + ~/.cassandra.in.sh \ + "`dirname "$0"`/cassandra.in.sh"; do + if [ -r "$include" ]; then + . "$include" + break + fi + done +elif [ -r "$CASSANDRA_INCLUDE" ]; then + . "$CASSANDRA_INCLUDE" +fi + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -x "$JAVA_HOME/bin/java" ]; then + JAVA="$JAVA_HOME/bin/java" +else + JAVA="`which java`" +fi + +if [ -z "$CLASSPATH" ]; then + echo "You must set the CLASSPATH var" >&2 + exit 1 +fi + +if [ "x$MAX_HEAP_SIZE" = "x" ]; then + MAX_HEAP_SIZE="256M" +fi + +"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" -Xmx$MAX_HEAP_SIZE \ + -Dcassandra.storagedir="$cassandra_storagedir" \ + -Dlogback.configurationFile=logback-tools.xml \ + org.apache.cassandra.tools.StandaloneVerifier "$@" + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/bin/sstableverify.bat ---------------------------------------------------------------------- diff --git a/bin/sstableverify.bat b/bin/sstableverify.bat new file mode 100644 index 0000000..aa08826 --- /dev/null +++ b/bin/sstableverify.bat @@ -0,0 +1,41 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one or more +@REM contributor license agreements. See the NOTICE file distributed with +@REM this work for additional information regarding copyright ownership. +@REM The ASF licenses this file to You under the Apache License, Version 2.0 +@REM (the "License"); you may not use this file except in compliance with +@REM the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. + +@echo off +if "%OS%" == "Windows_NT" setlocal + +pushd "%~dp0" +call cassandra.in.bat + +if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneVerifier +if NOT DEFINED JAVA_HOME goto :err + +REM ***** JAVA options ***** +set JAVA_OPTS=^ + -Dlogback.configurationFile=logback-tools.xml + +set TOOLS_PARAMS= + +"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %* +goto finally + +:err +echo JAVA_HOME environment variable must be set! +pause + +:finally + +ENDLOCAL http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index dbd55a0..ca77954 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1442,6 +1442,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return true; } + public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) throws ExecutionException, InterruptedException + { + return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); + } + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException { return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 4c86469..977c46e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -323,6 +323,25 @@ public class CompactionManager implements CompactionManagerMBean }); } + public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException + { + assert !cfs.isIndex(); + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) + { + return input; + } + + @Override + public void execute(SSTableReader input) throws IOException + { + verifyOne(cfs, input, extendedVerify); + } + }); + } + public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException { return parallelAllSSTableOperation(cfs, new OneSSTableOperation() @@ -651,6 +670,23 @@ public class CompactionManager implements CompactionManagerMBean } } + private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean extendedVerify) throws IOException + { + Verifier verifier = new Verifier(cfs, sstable, false); + + CompactionInfo.Holder verifyInfo = verifier.getVerifyInfo(); + metrics.beginCompaction(verifyInfo); + try + { + verifier.verify(extendedVerify); + } + finally + { + verifier.close(); + metrics.finishCompaction(verifyInfo); + } + } + /** * Determines if a cleanup would actually remove any data in this SSTable based * on a set of owned ranges. http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 15d18f6..a14f13f 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -31,7 +31,8 @@ public enum OperationType /** Compaction for tombstone removal */ TOMBSTONE_COMPACTION("Tombstone Compaction"), UNKNOWN("Unknown compaction type"), - ANTICOMPACTION("Anticompaction after repair"); + ANTICOMPACTION("Anticompaction after repair"), + VERIFY("Verify"); private final String type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java new file mode 100644 index 0000000..7afea0f --- /dev/null +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import com.google.common.base.Throwables; +import com.google.common.collect.Sets; +import org.apache.cassandra.db.*; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataIntegrityMetadata; +import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.OutputHandler; + +import java.io.Closeable; +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +public class Verifier implements Closeable +{ + private final ColumnFamilyStore cfs; + private final SSTableReader sstable; + + private final CompactionController controller; + + + private final RandomAccessReader dataFile; + private final RandomAccessReader indexFile; + private final VerifyInfo verifyInfo; + private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + + private int goodRows; + private int badRows; + + private final OutputHandler outputHandler; + private FileDigestValidator validator; + + public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean isOffline) throws IOException + { + this(cfs, sstable, new OutputHandler.LogOutput(), isOffline); + } + + public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline) throws IOException + { + this.cfs = cfs; + this.sstable = sstable; + this.outputHandler = outputHandler; + this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata); + + this.controller = new VerifyController(cfs); + + this.dataFile = isOffline + ? sstable.openDataReader() + : sstable.openDataReader(CompactionManager.instance.getRateLimiter()); + this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))); + this.verifyInfo = new VerifyInfo(dataFile, sstable); + } + + public void verify(boolean extended) throws IOException + { + long rowStart = 0; + + outputHandler.output(String.format("Verifying %s (%s bytes)", sstable, dataFile.length())); + outputHandler.output(String.format("Checking computed hash of %s ", sstable)); + + + // Verify will use the adler32 Digest files, which works for both compressed and uncompressed sstables + try + { + validator = null; + + if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists()) + { + validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor); + validator.validate(); + } + else + { + outputHandler.output("Data digest missing, assuming extended verification of disk atoms"); + extended = true; + } + } + catch (IOException e) + { + outputHandler.debug(e.getMessage()); + markAndThrow(); + } + finally + { + FileUtils.closeQuietly(validator); + } + + if ( !extended ) + return; + + outputHandler.output("Extended Verify requested, proceeding to inspect atoms"); + + + try + { + ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + { + long firstRowPositionFromIndex = rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + if (firstRowPositionFromIndex != 0) + markAndThrow(); + } + + DecoratedKey prevKey = null; + + while (!dataFile.isEOF()) + { + + if (verifyInfo.isStopRequested()) + throw new CompactionInterruptedException(verifyInfo.getCompactionInfo()); + + rowStart = dataFile.getFilePointer(); + outputHandler.debug("Reading row at " + rowStart); + + DecoratedKey key = null; + long dataSize = -1; + try + { + key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + } + catch (Throwable th) + { + throwIfFatal(th); + // check for null key below + } + + ByteBuffer currentIndexKey = nextIndexKey; + long nextRowPositionFromIndex = 0; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); + nextRowPositionFromIndex = indexFile.isEOF() + ? dataFile.length() + : rowIndexEntrySerializer.deserialize(indexFile, sstable.descriptor.version).position; + } + catch (Throwable th) + { + markAndThrow(); + } + + long dataStart = dataFile.getFilePointer(); + long dataStartFromIndex = currentIndexKey == null + ? -1 + : rowStart + 2 + currentIndexKey.remaining(); + + dataSize = nextRowPositionFromIndex - dataStartFromIndex; + // avoid an NPE if key is null + String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey()); + outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize)); + + assert currentIndexKey != null || indexFile.isEOF(); + + try + { + if (key == null || dataSize > dataFile.length()) + markAndThrow(); + + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, true); + if ( (prevKey != null && prevKey.compareTo(key) > 0) || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex ) + markAndThrow(); + + goodRows++; + prevKey = key; + + + outputHandler.debug(String.format("Row %s at %s valid, moving to next row at %s ", goodRows, rowStart, nextRowPositionFromIndex)); + dataFile.seek(nextRowPositionFromIndex); + } + catch (Throwable th) + { + badRows++; + markAndThrow(); + } + } + } + catch (Throwable t) + { + throw Throwables.propagate(t); + } + finally + { + controller.close(); + } + + outputHandler.output("Verify of " + sstable + " succeeded. All " + goodRows + " rows read successfully"); + } + + public void close() + { + FileUtils.closeQuietly(dataFile); + FileUtils.closeQuietly(indexFile); + } + + private void throwIfFatal(Throwable th) + { + if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError)) + throw (Error) th; + } + + private void markAndThrow() throws IOException + { + sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE); + throw new CorruptSSTableException(new Exception(String.format("Invalid SSTable %s, please force repair", sstable.getFilename())), sstable.getFilename()); + } + + public CompactionInfo.Holder getVerifyInfo() + { + return verifyInfo; + } + + private static class VerifyInfo extends CompactionInfo.Holder + { + private final RandomAccessReader dataFile; + private final SSTableReader sstable; + + public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable) + { + this.dataFile = dataFile; + this.sstable = sstable; + } + + public CompactionInfo getCompactionInfo() + { + try + { + return new CompactionInfo(sstable.metadata, + OperationType.VERIFY, + dataFile.getFilePointer(), + dataFile.length()); + } + catch (Exception e) + { + throw new RuntimeException(); + } + } + } + + private static class VerifyController extends CompactionController + { + public VerifyController(ColumnFamilyStore cfs) + { + super(cfs, Integer.MAX_VALUE); + } + + @Override + public long maxPurgeableTimestamp(DecoratedKey key) + { + return Long.MIN_VALUE; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index b4d9dcc..fc679d5 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -145,7 +145,7 @@ public class CompressedSequentialWriter extends SequentialWriter // write corresponding checksum compressed.buffer.rewind(); - crcMetadata.appendDirect(compressed.buffer); + crcMetadata.appendDirect(compressed.buffer, true); lastFlushOffset += compressedLength + 4; // adjust our bufferOffset to account for the new uncompressed data we've now written out http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/sstable/Component.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java index 7f6cc79..a431f29 100644 --- a/src/java/org/apache/cassandra/io/sstable/Component.java +++ b/src/java/org/apache/cassandra/io/sstable/Component.java @@ -47,8 +47,8 @@ public class Component COMPRESSION_INFO("CompressionInfo.db"), // statistical metadata about the content of the sstable STATS("Statistics.db"), - // holds sha1 sum of the data file (to be checked by sha1sum) - DIGEST("Digest.sha1"), + // holds adler32 checksum of the data file + DIGEST("Digest.adler32"), // holds the CRC32 for chunks in an a uncompressed file. CRC("CRC.db"), // holds SSTable Index Summary (sampling of Index component) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java index 5c5637a..d28a14d 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java @@ -41,7 +41,7 @@ public class ChecksummedSequentialWriter extends SequentialWriter ByteBuffer toAppend = buffer.duplicate(); toAppend.position(0); toAppend.limit(buffer.position()); - crcMetadata.appendDirect(toAppend); + crcMetadata.appendDirect(toAppend, false); } public void writeFullChecksum(Descriptor descriptor) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 464f9d2..2b59545 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; import java.util.zip.Checksum; import com.google.common.base.Charsets; @@ -86,6 +87,58 @@ public class DataIntegrityMetadata } } + public static FileDigestValidator fileDigestValidator(Descriptor desc) throws IOException + { + return new FileDigestValidator(desc); + } + + public static class FileDigestValidator implements Closeable + { + private final Checksum checksum; + private final RandomAccessReader digestReader; + private final RandomAccessReader dataReader; + private final Descriptor descriptor; + private long storedDigestValue; + private long calculatedDigestValue; + + public FileDigestValidator(Descriptor descriptor) throws IOException + { + this.descriptor = descriptor; + checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(); + digestReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DIGEST))); + dataReader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.DATA))); + try + { + storedDigestValue = Long.parseLong(digestReader.readLine()); + } + catch (Exception e) + { + // Attempting to create a FileDigestValidator without a DIGEST file will fail + throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA)); + } + + } + + // Validate the entire file + public void validate() throws IOException + { + CheckedInputStream checkedInputStream = new CheckedInputStream(dataReader, checksum); + byte[] chunk = new byte[64 * 1024]; + + while( checkedInputStream.read(chunk) > 0 ) { } + calculatedDigestValue = checkedInputStream.getChecksum().getValue(); + if (storedDigestValue != calculatedDigestValue) { + throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA)); + } + } + + public void close() + { + this.digestReader.close(); + } + } + + public static class ChecksumWriter { private final Adler32 incrementalChecksum = new Adler32(); @@ -116,45 +169,28 @@ public class DataIntegrityMetadata // CompressedSequentialWriters serialize the partial checksums inline with the compressed data chunks they // corroborate, whereas ChecksummedSequentialWriters serialize them to a different file. - public void append(byte[] buffer, int start, int end, boolean checksumIncrementalResult) + public void appendDirect(ByteBuffer bb, boolean checksumIncrementalResult) { try { - int incrementalChecksumValue; - incrementalChecksum.update(buffer, start, end); - incrementalChecksumValue = (int) incrementalChecksum.getValue(); - incrementalOut.writeInt(incrementalChecksumValue); - incrementalChecksum.reset(); + ByteBuffer toAppend = bb.duplicate(); + toAppend.mark(); + FBUtilities.directCheckSum(incrementalChecksum, toAppend); + toAppend.reset(); - fullChecksum.update(buffer, start, end); + int incrementalChecksumValue = (int) incrementalChecksum.getValue(); + incrementalOut.writeInt(incrementalChecksumValue); + FBUtilities.directCheckSum(fullChecksum, toAppend); if (checksumIncrementalResult) { ByteBuffer byteBuffer = ByteBuffer.allocate(4); byteBuffer.putInt(incrementalChecksumValue); fullChecksum.update(byteBuffer.array(), 0, byteBuffer.array().length); } - } - catch (IOException e) - { - throw new IOError(e); - } - } - - public void appendDirect(ByteBuffer bb) - { - try - { - ByteBuffer toAppend = bb.duplicate(); - toAppend.mark(); - FBUtilities.directCheckSum(incrementalChecksum, toAppend); - toAppend.reset(); - - incrementalOut.writeInt((int) incrementalChecksum.getValue()); incrementalChecksum.reset(); - FBUtilities.directCheckSum(fullChecksum, toAppend); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2c365cb..7e76b2a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2358,6 +2358,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return status.statusCode; } + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + { + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.verify(extendedVerify); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; + } + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 3d04058..4406ec6 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -240,6 +240,14 @@ public interface StorageServiceMBean extends NotificationEmitter public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** + * Verify (checksums of) the given keyspace. + * If columnFamilies array is empty, all CFs are verified. + * + * The entire sstable will be read to ensure each cell validates if extendedVerify is true + */ + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + + /** * Rewrite all sstables to the latest version. * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index edb2478..5a1d6b4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -207,6 +207,11 @@ public class NodeProbe implements AutoCloseable return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies); } + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies); + } + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); @@ -217,7 +222,7 @@ public class NodeProbe implements AutoCloseable if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0) { failed = true; - out.println("Aborted cleaning up atleast one table in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted cleaning up at least one table in keyspace "+keyspaceName+", check server logs for more information."); } } @@ -226,10 +231,20 @@ public class NodeProbe implements AutoCloseable if (scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies) != 0) { failed = true; - out.println("Aborted scrubbing atleast one table in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information."); + } + } + + public void verify(PrintStream out, boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + if (verify(extendedVerify, keyspaceName, columnFamilies) != 0) + { + failed = true; + out.println("Aborted verifying at least one table in keyspace "+keyspaceName+", check server logs for more information."); } } + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 4ef2469..e6d4df6 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -87,6 +87,7 @@ public class NodeTool ClearSnapshot.class, Compact.class, Scrub.class, + Verify.class, Flush.class, UpgradeSSTable.class, DisableAutoCompaction.class, @@ -1298,6 +1299,36 @@ public class NodeTool } } + @Command(name = "verify", description = "Verify (check data checksum for) one or more tables") + public static class Verify extends NodeToolCmd + { + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + + @Option(title = "extended_verify", + name = {"-e", "--extended-verify"}, + description = "Verify each cell data, beyond simply checking sstable checksums") + private boolean extendedVerify = false; + + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalColumnFamilies(args); + + for (String keyspace : keyspaces) + { + try + { + probe.verify(System.out, extendedVerify, keyspace, cfnames); + } catch (Exception e) + { + throw new RuntimeException("Error occurred during verifying", e); + } + } + } + } + @Command(name = "disableautocompaction", description = "Disable autocompaction for the given keyspace and table") public static class DisableAutoCompaction extends NodeToolCmd { @@ -2435,7 +2466,7 @@ public class NodeTool @Command(name = "stop", description = "Stop compaction") public static class Stop extends NodeToolCmd { - @Arguments(title = "compaction_type", usage = "<compaction type>", description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, INDEX_BUILD", required = true) + @Arguments(title = "compaction_type", usage = "<compaction type>", description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD", required = true) private OperationType compactionType = OperationType.UNKNOWN; @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/StandaloneVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java new file mode 100644 index 0000000..a4f3e80 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.tools; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.OutputHandler; +import org.apache.commons.cli.*; + +import java.io.File; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions; + +public class StandaloneVerifier +{ + private static final String TOOL_NAME = "sstableverify"; + private static final String VERBOSE_OPTION = "verbose"; + private static final String EXTENDED_OPTION = "extended"; + private static final String DEBUG_OPTION = "debug"; + private static final String HELP_OPTION = "help"; + + public static void main(String args[]) + { + Options options = Options.parseArgs(args); + try + { + // load keyspace descriptions. + Schema.instance.loadFromDisk(false); + + boolean hasFailed = false; + + if (Schema.instance.getCFMetaData(options.keyspaceName, options.cfName) == null) + throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", + options.keyspaceName, + options.cfName)); + + // Do not load sstables since they might be broken + Keyspace keyspace = Keyspace.openWithoutSSTables(options.keyspaceName); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName); + + OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); + Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true); + + boolean extended = options.extended; + + List<SSTableReader> sstables = new ArrayList<>(); + + // Verify sstables + for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) + { + Set<Component> components = entry.getValue(); + if (!components.contains(Component.DATA) || !components.contains(Component.PRIMARY_INDEX)) + continue; + + try + { + SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs); + sstables.add(sstable); + } + catch (Exception e) + { + JVMStabilityInspector.inspectThrowable(e); + System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage())); + if (options.debug) + e.printStackTrace(System.err); + } + } + + for (SSTableReader sstable : sstables) + { + try + { + Verifier verifier = new Verifier(cfs, sstable, handler, true); + try + { + verifier.verify(extended); + } + catch (CorruptSSTableException cs) + { + System.err.println(String.format("Error verifying %s: %s", sstable, cs.getMessage())); + hasFailed = true; + } + finally + { + verifier.close(); + } + } + catch (Exception e) + { + System.err.println(String.format("Error verifying %s: %s", sstable, e.getMessage())); + e.printStackTrace(System.err); + } + } + + CompactionManager.instance.finishCompactionsAndShutdown(5, TimeUnit.MINUTES); + + System.exit( hasFailed ? 1 : 0 ); // We need that to stop non daemonized threads + } + catch (Exception e) + { + System.err.println(e.getMessage()); + if (options.debug) + e.printStackTrace(System.err); + System.exit(1); + } + } + + private static class Options + { + public final String keyspaceName; + public final String cfName; + + public boolean debug; + public boolean verbose; + public boolean extended; + + private Options(String keyspaceName, String cfName) + { + this.keyspaceName = keyspaceName; + this.cfName = cfName; + } + + public static Options parseArgs(String cmdArgs[]) + { + CommandLineParser parser = new GnuParser(); + CmdLineOptions options = getCmdLineOptions(); + try + { + CommandLine cmd = parser.parse(options, cmdArgs, false); + + if (cmd.hasOption(HELP_OPTION)) + { + printUsage(options); + System.exit(0); + } + + String[] args = cmd.getArgs(); + if (args.length != 2) + { + String msg = args.length < 2 ? "Missing arguments" : "Too many arguments"; + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + String keyspaceName = args[0]; + String cfName = args[1]; + + Options opts = new Options(keyspaceName, cfName); + + opts.debug = cmd.hasOption(DEBUG_OPTION); + opts.verbose = cmd.hasOption(VERBOSE_OPTION); + opts.extended = cmd.hasOption(EXTENDED_OPTION); + + return opts; + } + catch (ParseException e) + { + errorMsg(e.getMessage(), options); + return null; + } + } + + private static void errorMsg(String msg, CmdLineOptions options) + { + System.err.println(msg); + printUsage(options); + System.exit(1); + } + + private static CmdLineOptions getCmdLineOptions() + { + CmdLineOptions options = new CmdLineOptions(); + options.addOption(null, DEBUG_OPTION, "display stack traces"); + options.addOption("e", EXTENDED_OPTION, "extended verification"); + options.addOption("v", VERBOSE_OPTION, "verbose output"); + options.addOption("h", HELP_OPTION, "display this help message"); + return options; + } + + public static void printUsage(CmdLineOptions options) + { + String usage = String.format("%s [options] <keyspace> <column_family>", TOOL_NAME); + StringBuilder header = new StringBuilder(); + header.append("--\n"); + header.append("Verify the sstable for the provided table." ); + header.append("\n--\n"); + header.append("Options are:"); + new HelpFormatter().printHelp(usage, header.toString(), options, ""); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java new file mode 100644 index 0000000..848978b --- /dev/null +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -0,0 +1,428 @@ +package org.apache.cassandra.db; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import com.google.common.base.Charsets; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.Verifier; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.*; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.lang3.StringUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.*; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.List; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; + +import static org.apache.cassandra.Util.cellname; +import static org.apache.cassandra.Util.column; +import static org.junit.Assert.fail; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class VerifyTest +{ + public static final String KEYSPACE = "Keyspace1"; + public static final String CF = "Standard1"; + public static final String CF2 = "Standard2"; + public static final String CF3 = "Standard3"; + public static final String CF4 = "Standard4"; + public static final String COUNTER_CF = "Counter1"; + public static final String COUNTER_CF2 = "Counter2"; + public static final String COUNTER_CF3 = "Counter3"; + public static final String COUNTER_CF4 = "Counter4"; + public static final String CORRUPT_CF = "Corrupt1"; + public static final String CORRUPT_CF2 = "Corrupt2"; + public static final String CORRUPTCOUNTER_CF = "CounterCorrupt1"; + public static final String CORRUPTCOUNTER_CF2 = "CounterCorrupt2"; + + public static final String CF_UUID = "UUIDKeys"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + CompressionParameters compressionParameters = new CompressionParameters(SnappyCompressor.instance, 32768, new HashMap<String, String>()); + + SchemaLoader.loadSchema(); + SchemaLoader.createKeyspace(KEYSPACE, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE, CF).compressionParameters(compressionParameters), + SchemaLoader.standardCFMD(KEYSPACE, CF2).compressionParameters(compressionParameters), + SchemaLoader.standardCFMD(KEYSPACE, CF3), + SchemaLoader.standardCFMD(KEYSPACE, CF4), + SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF), + SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF2), + CFMetaData.denseCFMetaData(KEYSPACE, COUNTER_CF, BytesType.instance).defaultValidator(CounterColumnType.instance).compressionParameters(compressionParameters), + CFMetaData.denseCFMetaData(KEYSPACE, COUNTER_CF2, BytesType.instance).defaultValidator(CounterColumnType.instance).compressionParameters(compressionParameters), + CFMetaData.denseCFMetaData(KEYSPACE, COUNTER_CF3, BytesType.instance).defaultValidator(CounterColumnType.instance), + CFMetaData.denseCFMetaData(KEYSPACE, COUNTER_CF4, BytesType.instance).defaultValidator(CounterColumnType.instance), + CFMetaData.denseCFMetaData(KEYSPACE, CORRUPTCOUNTER_CF, BytesType.instance).defaultValidator(CounterColumnType.instance), + CFMetaData.denseCFMetaData(KEYSPACE, CORRUPTCOUNTER_CF2, BytesType.instance).defaultValidator(CounterColumnType.instance), + SchemaLoader.standardCFMD(KEYSPACE, CF_UUID).keyValidator(UUIDType.instance)); + } + + + @Test + public void testVerifyCorrect() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + + fillCF(cfs, KEYSPACE, CF, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(false); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testVerifyCounterCorrect() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF); + + fillCounterCF(cfs, KEYSPACE, COUNTER_CF, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(false); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testExtendedVerifyCorrect() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2); + + fillCF(cfs, KEYSPACE, CF2, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(true); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testExtendedVerifyCounterCorrect() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF2); + + fillCounterCF(cfs, KEYSPACE, COUNTER_CF2, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(true); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testVerifyCorrectUncompressed() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF3); + + fillCF(cfs, KEYSPACE, CF3, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(false); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testVerifyCounterCorrectUncompressed() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF3); + + fillCounterCF(cfs, KEYSPACE, COUNTER_CF3, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(false); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testExtendedVerifyCorrectUncompressed() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF4); + + fillCF(cfs, KEYSPACE, CF4, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(true); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + @Test + public void testExtendedVerifyCounterCorrectUncompressed() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF4); + + fillCounterCF(cfs, KEYSPACE, COUNTER_CF4, 2); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(true); + } + catch (CorruptSSTableException err) + { + fail("Unexpected CorruptSSTableException"); + } + } + + + @Test + public void testVerifyIncorrectDigest() throws IOException, WriteTimeoutException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF); + + fillCF(cfs, KEYSPACE, CORRUPT_CF, 2); + + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + + RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"); + Long correctChecksum = Long.parseLong(file.readLine()); + file.close(); + + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); + + Verifier verifier = new Verifier(cfs, sstable, false); + try + { + verifier.verify(false); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (CorruptSSTableException err) {} + } + + + @Test + public void testVerifyCorruptRowCorrectDigest() throws IOException, WriteTimeoutException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2); + + fillCF(cfs, KEYSPACE, CORRUPT_CF2, 2); + + List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); + + SSTableReader sstable = cfs.getSSTables().iterator().next(); + + // overwrite one row with garbage + long row0Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("0"), sstable.partitioner), SSTableReader.Operator.EQ).position; + long row1Start = sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("1"), sstable.partitioner), SSTableReader.Operator.EQ).position; + long startPosition = row0Start < row1Start ? row0Start : row1Start; + long endPosition = row0Start < row1Start ? row1Start : row0Start; + + RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw"); + file.seek(startPosition); + file.writeBytes(StringUtils.repeat('z', (int) 2)); + file.close(); + + // Update the Digest to have the right Checksum + writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST)); + + Verifier verifier = new Verifier(cfs, sstable, false); + + // First a simple verify checking digest, which should succeed + try + { + verifier.verify(false); + } + catch (CorruptSSTableException err) + { + fail("Simple verify should have succeeded as digest matched"); + } + + // Now try extended verify + try + { + verifier.verify(true); + + } + catch (CorruptSSTableException err) + { + return; + } + fail("Expected a CorruptSSTableException to be thrown"); + + } + + protected void fillCF(ColumnFamilyStore cfs, String keyspace, String columnFamily, int rowsPerSSTable) + { + for (int i = 0; i < rowsPerSSTable; i++) + { + String key = String.valueOf(i); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(keyspace, columnFamily); + cf.addColumn(column("c1", "1", 1L)); + cf.addColumn(column("c2", "2", 1L)); + Mutation rm = new Mutation(keyspace, ByteBufferUtil.bytes(key), cf); + rm.apply(); + } + + cfs.forceBlockingFlush(); + } + + protected void fillCounterCF(ColumnFamilyStore cfs, String keyspace, String columnFamily, int rowsPerSSTable) throws WriteTimeoutException + { + for (int i = 0; i < rowsPerSSTable; i++) + { + String key = String.valueOf(i); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(keyspace, columnFamily); + Mutation rm = new Mutation(keyspace, ByteBufferUtil.bytes(key), cf); + rm.addCounter(columnFamily, cellname("Column1"), 100); + CounterMutation cm = new CounterMutation(rm, ConsistencyLevel.ONE); + cm.apply(); + } + + cfs.forceBlockingFlush(); + } + + protected long simpleFullChecksum(String filename) throws IOException + { + FileInputStream inputStream = new FileInputStream(filename); + Adler32 adlerChecksum = new Adler32(); + CheckedInputStream cinStream = new CheckedInputStream(inputStream, adlerChecksum); + byte[] b = new byte[128]; + while (cinStream.read(b) >= 0) { + } + return cinStream.getChecksum().getValue(); + } + + protected void writeChecksum(long checksum, String filePath) + { + File outFile = new File(filePath); + BufferedWriter out = null; + try + { + out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8); + out.write(String.valueOf(checksum)); + out.flush(); + out.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, outFile); + } + finally + { + FileUtils.closeQuietly(out); + } + + } + +}