Repository: cassandra
Updated Branches:
  refs/heads/trunk 137016566 -> 2d7909dc1


New tool added to verify all data in sstables.

Patch by Jeff Jirsa; reviewed by Branimir Lambov for CASSANDRA-5791


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2d7909dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2d7909dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2d7909dc

Branch: refs/heads/trunk
Commit: 2d7909dc12dc038a65742ca03d375140c5257158
Parents: 1370165
Author: T Jake Luciani <j...@apache.org>
Authored: Tue Mar 31 09:53:33 2015 -0400
Committer: T Jake Luciani <j...@apache.org>
Committed: Tue Mar 31 09:53:33 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   2 +
 bin/sstableverify                               |  55 +++
 bin/sstableverify.bat                           |  41 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../db/compaction/CompactionManager.java        |  36 ++
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/compaction/Verifier.java       | 280 ++++++++++++
 .../io/compress/CompressedSequentialWriter.java |   2 +-
 .../apache/cassandra/io/sstable/Component.java  |   4 +-
 .../io/util/ChecksummedSequentialWriter.java    |   2 +-
 .../io/util/DataIntegrityMetadata.java          |  86 ++--
 .../cassandra/service/StorageService.java       |  12 +
 .../cassandra/service/StorageServiceMBean.java  |   8 +
 .../org/apache/cassandra/tools/NodeProbe.java   |  19 +-
 .../org/apache/cassandra/tools/NodeTool.java    |  33 +-
 .../cassandra/tools/StandaloneVerifier.java     | 222 ++++++++++
 .../org/apache/cassandra/db/VerifyTest.java     | 428 +++++++++++++++++++
 18 files changed, 1206 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1d7da5..beb05ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * New tool added to validate all sstables in a node (CASSANDRA-5791)
  * Push notification when tracing completes for an operation (CASSANDRA-7807)
  * Delay "node up" and "node added" notifications until native protocol server 
is started (CASSANDRA-8236)
  * Compressed Commit Log (CASSANDRA-6809)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 641be77..3af2f92 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,8 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - A new tool has been added bin/sstableverify that checks for errors/bitrot
+     in all sstables.  Unlike scrub, this is a non-invasive tool. 
    - Authentication & Authorization APIs have been updated to introduce
      roles. Roles and Permissions granted to them are inherited, supporting
      role based access control. The role concept supercedes that of users

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/bin/sstableverify
----------------------------------------------------------------------
diff --git a/bin/sstableverify b/bin/sstableverify
new file mode 100755
index 0000000..c3e40c7
--- /dev/null
+++ b/bin/sstableverify
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   "`dirname "$0"`/cassandra.in.sh"; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" -Xmx$MAX_HEAP_SIZE \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+        org.apache.cassandra.tools.StandaloneVerifier "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/bin/sstableverify.bat
----------------------------------------------------------------------
diff --git a/bin/sstableverify.bat b/bin/sstableverify.bat
new file mode 100644
index 0000000..aa08826
--- /dev/null
+++ b/bin/sstableverify.bat
@@ -0,0 +1,41 @@
+@REM
+@REM  Licensed to the Apache Software Foundation (ASF) under one or more
+@REM  contributor license agreements.  See the NOTICE file distributed with
+@REM  this work for additional information regarding copyright ownership.
+@REM  The ASF licenses this file to You under the Apache License, Version 2.0
+@REM  (the "License"); you may not use this file except in compliance with
+@REM  the License.  You may obtain a copy of the License at
+@REM
+@REM      http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM  Unless required by applicable law or agreed to in writing, software
+@REM  distributed under the License is distributed on an "AS IS" BASIS,
+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM  See the License for the specific language governing permissions and
+@REM  limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED CASSANDRA_MAIN set 
CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneVerifier
+if NOT DEFINED JAVA_HOME goto :err
+
+REM ***** JAVA options *****
+set JAVA_OPTS=^
+ -Dlogback.configurationFile=logback-tools.xml
+
+set TOOLS_PARAMS=
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp 
%CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %*
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+:finally
+
+ENDLOCAL

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dbd55a0..ca77954 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1442,6 +1442,11 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return true;
     }
 
+    public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) 
throws ExecutionException, InterruptedException
+    {
+        return 
CompactionManager.instance.performVerify(ColumnFamilyStore.this, 
extendedVerify);
+    }
+
     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion) throws ExecutionException, InterruptedException
     {
         return 
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, 
excludeCurrentVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 4c86469..977c46e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -323,6 +323,25 @@ public class CompactionManager implements 
CompactionManagerMBean
         });
     }
 
+    public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final 
boolean extendedVerify) throws InterruptedException, ExecutionException
+    {
+        assert !cfs.isIndex();
+        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+        {
+            @Override
+            public Iterable<SSTableReader> 
filterSSTables(Iterable<SSTableReader> input)
+            {
+                return input;
+            }
+
+            @Override
+            public void execute(SSTableReader input) throws IOException
+            {
+                verifyOne(cfs, input, extendedVerify);
+            }
+        });
+    }
+
     public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore 
cfs, final boolean excludeCurrentVersion) throws InterruptedException, 
ExecutionException
     {
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@ -651,6 +670,23 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
     }
 
+    private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, 
boolean extendedVerify) throws IOException
+    {
+        Verifier verifier = new Verifier(cfs, sstable, false);
+
+        CompactionInfo.Holder verifyInfo = verifier.getVerifyInfo();
+        metrics.beginCompaction(verifyInfo);
+        try
+        {
+            verifier.verify(extendedVerify);
+        }
+        finally
+        {
+            verifier.close();
+            metrics.finishCompaction(verifyInfo);
+        }
+    }
+
     /**
      * Determines if a cleanup would actually remove any data in this SSTable 
based
      * on a set of owned ranges.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java 
b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 15d18f6..a14f13f 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -31,7 +31,8 @@ public enum OperationType
     /** Compaction for tombstone removal */
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unknown compaction type"),
-    ANTICOMPACTION("Anticompaction after repair");
+    ANTICOMPACTION("Anticompaction after repair"),
+    VERIFY("Verify");
 
     private final String type;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
new file mode 100644
index 0000000..7afea0f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.db.*;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.OutputHandler;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class Verifier implements Closeable
+{
+    private final ColumnFamilyStore cfs;
+    private final SSTableReader sstable;
+
+    private final CompactionController controller;
+
+
+    private final RandomAccessReader dataFile;
+    private final RandomAccessReader indexFile;
+    private final VerifyInfo verifyInfo;
+    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+
+    private int goodRows;
+    private int badRows;
+
+    private final OutputHandler outputHandler;
+    private FileDigestValidator validator;
+
+    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean 
isOffline) throws IOException
+    {
+        this(cfs, sstable, new OutputHandler.LogOutput(), isOffline);
+    }
+
+    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, 
OutputHandler outputHandler, boolean isOffline) throws IOException
+    {
+        this.cfs = cfs;
+        this.sstable = sstable;
+        this.outputHandler = outputHandler;
+        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+
+        this.controller = new VerifyController(cfs);
+
+        this.dataFile = isOffline
+                        ? sstable.openDataReader()
+                        : 
sstable.openDataReader(CompactionManager.instance.getRateLimiter());
+        this.indexFile = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
+        this.verifyInfo = new VerifyInfo(dataFile, sstable);
+    }
+
+    public void verify(boolean extended) throws IOException
+    {
+        long rowStart = 0;
+
+        outputHandler.output(String.format("Verifying %s (%s bytes)", sstable, 
dataFile.length()));
+        outputHandler.output(String.format("Checking computed hash of %s ", 
sstable));
+
+
+        // Verify will use the adler32 Digest files, which works for both 
compressed and uncompressed sstables
+        try
+        {
+            validator = null;
+
+            if (new 
File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
+            {
+                validator = 
DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
+                validator.validate();
+            }
+            else
+            {
+                outputHandler.output("Data digest missing, assuming extended 
verification of disk atoms");
+                extended = true;
+            }
+        }
+        catch (IOException e)
+        {
+            outputHandler.debug(e.getMessage());
+            markAndThrow();
+        }
+        finally
+        {
+            FileUtils.closeQuietly(validator);
+        }
+
+        if ( !extended )
+            return;
+
+        outputHandler.output("Extended Verify requested, proceeding to inspect 
atoms");
+
+
+        try
+        {
+            ByteBuffer nextIndexKey = 
ByteBufferUtil.readWithShortLength(indexFile);
+            {
+                long firstRowPositionFromIndex = 
rowIndexEntrySerializer.deserialize(indexFile, 
sstable.descriptor.version).position;
+                if (firstRowPositionFromIndex != 0)
+                    markAndThrow();
+            }
+
+            DecoratedKey prevKey = null;
+
+            while (!dataFile.isEOF())
+            {
+
+                if (verifyInfo.isStopRequested())
+                    throw new 
CompactionInterruptedException(verifyInfo.getCompactionInfo());
+
+                rowStart = dataFile.getFilePointer();
+                outputHandler.debug("Reading row at " + rowStart);
+
+                DecoratedKey key = null;
+                long dataSize = -1;
+                try
+                {
+                    key = 
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    // check for null key below
+                }
+
+                ByteBuffer currentIndexKey = nextIndexKey;
+                long nextRowPositionFromIndex = 0;
+                try
+                {
+                    nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
+                    nextRowPositionFromIndex = indexFile.isEOF()
+                                             ? dataFile.length()
+                                             : 
rowIndexEntrySerializer.deserialize(indexFile, 
sstable.descriptor.version).position;
+                }
+                catch (Throwable th)
+                {
+                    markAndThrow();
+                }
+
+                long dataStart = dataFile.getFilePointer();
+                long dataStartFromIndex = currentIndexKey == null
+                                        ? -1
+                                        : rowStart + 2 + 
currentIndexKey.remaining();
+
+                dataSize = nextRowPositionFromIndex - dataStartFromIndex;
+                // avoid an NPE if key is null
+                String keyName = key == null ? "(unreadable key)" : 
ByteBufferUtil.bytesToHex(key.getKey());
+                outputHandler.debug(String.format("row %s is %s bytes", 
keyName, dataSize));
+
+                assert currentIndexKey != null || indexFile.isEOF();
+
+                try
+                {
+                    if (key == null || dataSize > dataFile.length())
+                        markAndThrow();
+
+                    SSTableIdentityIterator atoms = new 
SSTableIdentityIterator(sstable, dataFile, key, true);
+                    if ( (prevKey != null && prevKey.compareTo(key) > 0) || 
!key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex )
+                        markAndThrow();
+                    
+                    goodRows++;
+                    prevKey = key;
+
+
+                    outputHandler.debug(String.format("Row %s at %s valid, 
moving to next row at %s ", goodRows, rowStart, nextRowPositionFromIndex));
+                    dataFile.seek(nextRowPositionFromIndex);
+                }
+                catch (Throwable th)
+                {
+                    badRows++;
+                    markAndThrow();
+                }
+            }
+        }
+        catch (Throwable t)
+        {
+            throw Throwables.propagate(t);
+        }
+        finally
+        {
+            controller.close();
+        }
+
+        outputHandler.output("Verify of " + sstable + " succeeded. All " + 
goodRows + " rows read successfully");
+    }
+
+    public void close()
+    {
+        FileUtils.closeQuietly(dataFile);
+        FileUtils.closeQuietly(indexFile);
+    }
+
+    private void throwIfFatal(Throwable th)
+    {
+        if (th instanceof Error && !(th instanceof AssertionError || th 
instanceof IOError))
+            throw (Error) th;
+    }
+
+    private void markAndThrow() throws IOException
+    {
+        
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        throw new CorruptSSTableException(new Exception(String.format("Invalid 
SSTable %s, please force repair", sstable.getFilename())), 
sstable.getFilename());
+    }
+
+    public CompactionInfo.Holder getVerifyInfo()
+    {
+        return verifyInfo;
+    }
+
+    private static class VerifyInfo extends CompactionInfo.Holder
+    {
+        private final RandomAccessReader dataFile;
+        private final SSTableReader sstable;
+
+        public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
+        {
+            this.dataFile = dataFile;
+            this.sstable = sstable;
+        }
+
+        public CompactionInfo getCompactionInfo()
+        {
+            try
+            {
+                return new CompactionInfo(sstable.metadata,
+                                          OperationType.VERIFY,
+                                          dataFile.getFilePointer(),
+                                          dataFile.length());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException();
+            }
+        }
+    }
+
+    private static class VerifyController extends CompactionController
+    {
+        public VerifyController(ColumnFamilyStore cfs)
+        {
+            super(cfs, Integer.MAX_VALUE);
+        }
+
+        @Override
+        public long maxPurgeableTimestamp(DecoratedKey key)
+        {
+            return Long.MIN_VALUE;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index b4d9dcc..fc679d5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -145,7 +145,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
 
             // write corresponding checksum
             compressed.buffer.rewind();
-            crcMetadata.appendDirect(compressed.buffer);
+            crcMetadata.appendDirect(compressed.buffer, true);
             lastFlushOffset += compressedLength + 4;
 
             // adjust our bufferOffset to account for the new uncompressed 
data we've now written out

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java 
b/src/java/org/apache/cassandra/io/sstable/Component.java
index 7f6cc79..a431f29 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -47,8 +47,8 @@ public class Component
         COMPRESSION_INFO("CompressionInfo.db"),
         // statistical metadata about the content of the sstable
         STATS("Statistics.db"),
-        // holds sha1 sum of the data file (to be checked by sha1sum)
-        DIGEST("Digest.sha1"),
+        // holds adler32 checksum of the data file
+        DIGEST("Digest.adler32"),
         // holds the CRC32 for chunks in an a uncompressed file.
         CRC("CRC.db"),
         // holds SSTable Index Summary (sampling of Index component)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 5c5637a..d28a14d 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -41,7 +41,7 @@ public class ChecksummedSequentialWriter extends 
SequentialWriter
         ByteBuffer toAppend = buffer.duplicate();
         toAppend.position(0);
         toAppend.limit(buffer.position());
-        crcMetadata.appendDirect(toAppend);
+        crcMetadata.appendDirect(toAppend, false);
     }
 
     public void writeFullChecksum(Descriptor descriptor)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java 
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 464f9d2..2b59545 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 import com.google.common.base.Charsets;
@@ -86,6 +87,58 @@ public class DataIntegrityMetadata
         }
     }
 
+    public static FileDigestValidator fileDigestValidator(Descriptor desc) 
throws IOException
+    {
+        return new FileDigestValidator(desc);
+    }
+
+    public static class FileDigestValidator implements Closeable
+    {
+        private final Checksum checksum;
+        private final RandomAccessReader digestReader;
+        private final RandomAccessReader dataReader;
+        private final Descriptor descriptor;
+        private long storedDigestValue;
+        private long calculatedDigestValue;
+
+        public FileDigestValidator(Descriptor descriptor) throws IOException
+        {
+            this.descriptor = descriptor;
+            checksum = descriptor.version.hasAllAdlerChecksums() ? new 
Adler32() : CRC32Factory.instance.create();
+            digestReader = RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.DIGEST)));
+            dataReader = RandomAccessReader.open(new 
File(descriptor.filenameFor(Component.DATA)));
+            try
+            {
+                storedDigestValue = Long.parseLong(digestReader.readLine());
+            }
+            catch (Exception e)
+            {
+                // Attempting to create a FileDigestValidator without a DIGEST 
file will fail
+                throw new IOException("Corrupted SSTable : " + 
descriptor.filenameFor(Component.DATA));
+            }
+
+        }
+
+        // Validate the entire file
+        public void validate() throws IOException
+        {
+            CheckedInputStream checkedInputStream = new 
CheckedInputStream(dataReader, checksum);
+            byte[] chunk = new byte[64 * 1024];
+
+            while( checkedInputStream.read(chunk) > 0 ) { }
+            calculatedDigestValue = 
checkedInputStream.getChecksum().getValue();
+            if (storedDigestValue != calculatedDigestValue) {
+                throw new IOException("Corrupted SSTable : " + 
descriptor.filenameFor(Component.DATA));
+            }
+        }
+
+        public void close()
+        {
+            this.digestReader.close();
+        }
+    }
+
+
     public static class ChecksumWriter
     {
         private final Adler32 incrementalChecksum = new Adler32();
@@ -116,45 +169,28 @@ public class DataIntegrityMetadata
 
         // CompressedSequentialWriters serialize the partial checksums inline 
with the compressed data chunks they
         // corroborate, whereas ChecksummedSequentialWriters serialize them to 
a different file.
-        public void append(byte[] buffer, int start, int end, boolean 
checksumIncrementalResult)
+        public void appendDirect(ByteBuffer bb, boolean 
checksumIncrementalResult)
         {
             try
             {
-                int incrementalChecksumValue;
 
-                incrementalChecksum.update(buffer, start, end);
-                incrementalChecksumValue = (int) 
incrementalChecksum.getValue();
-                incrementalOut.writeInt(incrementalChecksumValue);
-                incrementalChecksum.reset();
+                ByteBuffer toAppend = bb.duplicate();
+                toAppend.mark();
+                FBUtilities.directCheckSum(incrementalChecksum, toAppend);
+                toAppend.reset();
 
-                fullChecksum.update(buffer, start, end);
+                int incrementalChecksumValue = (int) 
incrementalChecksum.getValue();
+                incrementalOut.writeInt(incrementalChecksumValue);
 
+                FBUtilities.directCheckSum(fullChecksum, toAppend);
                 if (checksumIncrementalResult)
                 {
                     ByteBuffer byteBuffer = ByteBuffer.allocate(4);
                     byteBuffer.putInt(incrementalChecksumValue);
                     fullChecksum.update(byteBuffer.array(), 0, 
byteBuffer.array().length);
                 }
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public void appendDirect(ByteBuffer bb)
-        {
-            try
-            {
-                ByteBuffer toAppend = bb.duplicate();
-                toAppend.mark();
-                FBUtilities.directCheckSum(incrementalChecksum, toAppend);
-                toAppend.reset();
-
-                incrementalOut.writeInt((int) incrementalChecksum.getValue());
                 incrementalChecksum.reset();
 
-                FBUtilities.directCheckSum(fullChecksum, toAppend);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 2c365cb..7e76b2a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2358,6 +2358,18 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return status.statusCode;
     }
 
+    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
+        {
+            CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.verify(extendedVerify);
+            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+                status = oneStatus;
+        }
+        return status.statusCode;
+    }
+
     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
     {
         CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 3d04058..4406ec6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -240,6 +240,14 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public int scrub(boolean disableSnapshot, boolean skipCorrupted, String 
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException;
 
     /**
+     * Verify (checksums of) the given keyspace.
+     * If columnFamilies array is empty, all CFs are verified.
+     *
+     * The entire sstable will be read to ensure each cell validates if 
extendedVerify is true
+     */
+    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException;
+
+    /**
      * Rewrite all sstables to the latest version.
      * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables 
first.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index edb2478..5a1d6b4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -207,6 +207,11 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, 
columnFamilies);
     }
 
+    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
+    }
+
     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
     {
         return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, 
columnFamilies);
@@ -217,7 +222,7 @@ public class NodeProbe implements AutoCloseable
         if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0)
         {
             failed = true;
-            out.println("Aborted cleaning up atleast one table in keyspace 
"+keyspaceName+", check server logs for more information.");
+            out.println("Aborted cleaning up at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
         }
     }
 
@@ -226,10 +231,20 @@ public class NodeProbe implements AutoCloseable
         if (scrub(disableSnapshot, skipCorrupted, keyspaceName, 
columnFamilies) != 0)
         {
             failed = true;
-            out.println("Aborted scrubbing atleast one table in keyspace 
"+keyspaceName+", check server logs for more information.");
+            out.println("Aborted scrubbing at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
+        }
+    }
+
+    public void verify(PrintStream out, boolean extendedVerify, String 
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException
+    {
+        if (verify(extendedVerify, keyspaceName, columnFamilies) != 0)
+        {
+            failed = true;
+            out.println("Aborted verifying at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
         }
     }
 
+
     public void upgradeSSTables(PrintStream out, String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
     {
         if (upgradeSSTables(keyspaceName, excludeCurrentVersion, 
columnFamilies) != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 4ef2469..e6d4df6 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -87,6 +87,7 @@ public class NodeTool
                 ClearSnapshot.class,
                 Compact.class,
                 Scrub.class,
+                Verify.class,
                 Flush.class,
                 UpgradeSSTable.class,
                 DisableAutoCompaction.class,
@@ -1298,6 +1299,36 @@ public class NodeTool
         }
     }
 
+    @Command(name = "verify", description = "Verify (check data checksum for) 
one or more tables")
+    public static class Verify extends NodeToolCmd
+    {
+        @Arguments(usage = "[<keyspace> <tables>...]", description = "The 
keyspace followed by one or many tables")
+        private List<String> args = new ArrayList<>();
+
+        @Option(title = "extended_verify",
+            name = {"-e", "--extended-verify"},
+            description = "Verify each cell data, beyond simply checking 
sstable checksums")
+        private boolean extendedVerify = false;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            List<String> keyspaces = parseOptionalKeyspace(args, probe);
+            String[] cfnames = parseOptionalColumnFamilies(args);
+
+            for (String keyspace : keyspaces)
+            {
+                try
+                {
+                    probe.verify(System.out, extendedVerify, keyspace, 
cfnames);
+                } catch (Exception e)
+                {
+                    throw new RuntimeException("Error occurred during 
verifying", e);
+                }
+            }
+        }
+    }
+
     @Command(name = "disableautocompaction", description = "Disable 
autocompaction for the given keyspace and table")
     public static class DisableAutoCompaction extends NodeToolCmd
     {
@@ -2435,7 +2466,7 @@ public class NodeTool
     @Command(name = "stop", description = "Stop compaction")
     public static class Stop extends NodeToolCmd
     {
-        @Arguments(title = "compaction_type", usage = "<compaction type>", 
description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, 
INDEX_BUILD", required = true)
+        @Arguments(title = "compaction_type", usage = "<compaction type>", 
description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, 
VERIFY, INDEX_BUILD", required = true)
         private OperationType compactionType = OperationType.UNKNOWN;
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java 
b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
new file mode 100644
index 0000000..a4f3e80
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.commons.cli.*;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneVerifier
+{
+    private static final String TOOL_NAME = "sstableverify";
+    private static final String VERBOSE_OPTION  = "verbose";
+    private static final String EXTENDED_OPTION = "extended";
+    private static final String DEBUG_OPTION  = "debug";
+    private static final String HELP_OPTION  = "help";
+
+    public static void main(String args[])
+    {
+        Options options = Options.parseArgs(args);
+        try
+        {
+            // load keyspace descriptions.
+            Schema.instance.loadFromDisk(false);
+
+            boolean hasFailed = false;
+
+            if (Schema.instance.getCFMetaData(options.keyspaceName, 
options.cfName) == null)
+                throw new IllegalArgumentException(String.format("Unknown 
keyspace/table %s.%s",
+                                                                 
options.keyspaceName,
+                                                                 
options.cfName));
+
+            // Do not load sstables since they might be broken
+            Keyspace keyspace = 
Keyspace.openWithoutSSTables(options.keyspaceName);
+            ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(options.cfName);
+
+            OutputHandler handler = new 
OutputHandler.SystemOutput(options.verbose, options.debug);
+            Directories.SSTableLister lister = 
cfs.directories.sstableLister().skipTemporary(true);
+
+            boolean extended = options.extended;
+
+            List<SSTableReader> sstables = new ArrayList<>();
+
+            // Verify sstables
+            for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
+            {
+                Set<Component> components = entry.getValue();
+                if (!components.contains(Component.DATA) || 
!components.contains(Component.PRIMARY_INDEX))
+                    continue;
+
+                try
+                {
+                    SSTableReader sstable = 
SSTableReader.openNoValidation(entry.getKey(), components, cfs);
+                    sstables.add(sstable);
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    System.err.println(String.format("Error Loading %s: %s", 
entry.getKey(), e.getMessage()));
+                    if (options.debug)
+                        e.printStackTrace(System.err);
+                }
+            }
+
+            for (SSTableReader sstable : sstables)
+            {
+                try
+                {
+                    Verifier verifier = new Verifier(cfs, sstable, handler, 
true);
+                    try
+                    {
+                        verifier.verify(extended);
+                    }
+                    catch (CorruptSSTableException cs)
+                    {
+                        System.err.println(String.format("Error verifying %s: 
%s", sstable, cs.getMessage()));
+                        hasFailed = true;
+                    }
+                    finally
+                    {
+                        verifier.close();
+                    }
+                }
+                catch (Exception e)
+                {
+                    System.err.println(String.format("Error verifying %s: %s", 
sstable, e.getMessage()));
+                    e.printStackTrace(System.err);
+                }
+            }
+
+            CompactionManager.instance.finishCompactionsAndShutdown(5, 
TimeUnit.MINUTES);
+
+            System.exit( hasFailed ? 1 : 0 ); // We need that to stop non 
daemonized threads
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    private static class Options
+    {
+        public final String keyspaceName;
+        public final String cfName;
+
+        public boolean debug;
+        public boolean verbose;
+        public boolean extended;
+
+        private Options(String keyspaceName, String cfName)
+        {
+            this.keyspaceName = keyspaceName;
+            this.cfName = cfName;
+        }
+
+        public static Options parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length != 2)
+                {
+                    String msg = args.length < 2 ? "Missing arguments" : "Too 
many arguments";
+                    System.err.println(msg);
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                String keyspaceName = args[0];
+                String cfName = args[1];
+
+                Options opts = new Options(keyspaceName, cfName);
+
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                opts.extended = cmd.hasOption(EXTENDED_OPTION);
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,          "display stack 
traces");
+            options.addOption("e",  EXTENDED_OPTION,       "extended 
verification");
+            options.addOption("v",  VERBOSE_OPTION,        "verbose output");
+            options.addOption("h",  HELP_OPTION,           "display this help 
message");
+            return options;
+        }
+
+        public static void printUsage(CmdLineOptions options)
+        {
+            String usage = String.format("%s [options] <keyspace> 
<column_family>", TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Verify the sstable for the provided table." );
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, 
"");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d7909dc/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java 
b/test/unit/org/apache/cassandra/db/VerifyTest.java
new file mode 100644
index 0000000..848978b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -0,0 +1,428 @@
+package org.apache.cassandra.db;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import com.google.common.base.Charsets;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Verifier;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.fail;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class VerifyTest
+{
+    public static final String KEYSPACE = "Keyspace1";
+    public static final String CF = "Standard1";
+    public static final String CF2 = "Standard2";
+    public static final String CF3 = "Standard3";
+    public static final String CF4 = "Standard4";
+    public static final String COUNTER_CF = "Counter1";
+    public static final String COUNTER_CF2 = "Counter2";
+    public static final String COUNTER_CF3 = "Counter3";
+    public static final String COUNTER_CF4 = "Counter4";
+    public static final String CORRUPT_CF = "Corrupt1";
+    public static final String CORRUPT_CF2 = "Corrupt2";
+    public static final String CORRUPTCOUNTER_CF = "CounterCorrupt1";
+    public static final String CORRUPTCOUNTER_CF2 = "CounterCorrupt2";
+
+    public static final String CF_UUID = "UUIDKeys";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        CompressionParameters compressionParameters = new 
CompressionParameters(SnappyCompressor.instance, 32768, new HashMap<String, 
String>());
+
+        SchemaLoader.loadSchema();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF).compressionParameters(compressionParameters),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF2).compressionParameters(compressionParameters),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF4),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CORRUPT_CF),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CORRUPT_CF2),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
COUNTER_CF, 
BytesType.instance).defaultValidator(CounterColumnType.instance).compressionParameters(compressionParameters),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
COUNTER_CF2, 
BytesType.instance).defaultValidator(CounterColumnType.instance).compressionParameters(compressionParameters),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
COUNTER_CF3, BytesType.instance).defaultValidator(CounterColumnType.instance),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
COUNTER_CF4, BytesType.instance).defaultValidator(CounterColumnType.instance),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
CORRUPTCOUNTER_CF, 
BytesType.instance).defaultValidator(CounterColumnType.instance),
+                                    CFMetaData.denseCFMetaData(KEYSPACE, 
CORRUPTCOUNTER_CF2, 
BytesType.instance).defaultValidator(CounterColumnType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_UUID).keyValidator(UUIDType.instance));
+    }
+
+
+    @Test
+    public void testVerifyCorrect() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+
+        fillCF(cfs, KEYSPACE, CF, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(false);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testVerifyCounterCorrect() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
+
+        fillCounterCF(cfs, KEYSPACE, COUNTER_CF, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(false);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testExtendedVerifyCorrect() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
+
+        fillCF(cfs, KEYSPACE, CF2, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(true);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testExtendedVerifyCounterCorrect() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF2);
+
+        fillCounterCF(cfs, KEYSPACE, COUNTER_CF2, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(true);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testVerifyCorrectUncompressed() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF3);
+
+        fillCF(cfs, KEYSPACE, CF3, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(false);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testVerifyCounterCorrectUncompressed() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF3);
+
+        fillCounterCF(cfs, KEYSPACE, COUNTER_CF3, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(false);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testExtendedVerifyCorrectUncompressed() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF4);
+
+        fillCF(cfs, KEYSPACE, CF4, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(true);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+    @Test
+    public void testExtendedVerifyCounterCorrectUncompressed() throws 
IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF4);
+
+        fillCounterCF(cfs, KEYSPACE, COUNTER_CF4, 2);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(true);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Unexpected CorruptSSTableException");
+        }
+    }
+
+
+    @Test
+    public void testVerifyIncorrectDigest() throws IOException, 
WriteTimeoutException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF);
+
+        fillCF(cfs, KEYSPACE, CORRUPT_CF, 2);
+
+        List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new 
IdentityQueryFilter(), 1000);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+
+        RandomAccessFile file = new 
RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw");
+        Long correctChecksum = Long.parseLong(file.readLine());
+        file.close();
+
+        writeChecksum(++correctChecksum, 
sstable.descriptor.filenameFor(Component.DIGEST));
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+        try
+        {
+            verifier.verify(false);
+            fail("Expected a CorruptSSTableException to be thrown");
+        }
+        catch (CorruptSSTableException err) {}
+    }
+
+
+    @Test
+    public void testVerifyCorruptRowCorrectDigest() throws IOException, 
WriteTimeoutException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
+
+        fillCF(cfs, KEYSPACE, CORRUPT_CF2, 2);
+
+        List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new 
IdentityQueryFilter(), 1000);
+
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+
+        // overwrite one row with garbage
+        long row0Start = 
sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("0"), 
sstable.partitioner), SSTableReader.Operator.EQ).position;
+        long row1Start = 
sstable.getPosition(RowPosition.ForKey.get(ByteBufferUtil.bytes("1"), 
sstable.partitioner), SSTableReader.Operator.EQ).position;
+        long startPosition = row0Start < row1Start ? row0Start : row1Start;
+        long endPosition = row0Start < row1Start ? row1Start : row0Start;
+
+        RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), 
"rw");
+        file.seek(startPosition);
+        file.writeBytes(StringUtils.repeat('z', (int) 2));
+        file.close();
+
+        // Update the Digest to have the right Checksum
+        writeChecksum(simpleFullChecksum(sstable.getFilename()), 
sstable.descriptor.filenameFor(Component.DIGEST));
+
+        Verifier verifier = new Verifier(cfs, sstable, false);
+
+        // First a simple verify checking digest, which should succeed
+        try
+        {
+            verifier.verify(false);
+        }
+        catch (CorruptSSTableException err)
+        {
+            fail("Simple verify should have succeeded as digest matched");
+        }
+
+        // Now try extended verify
+        try
+        {
+            verifier.verify(true);
+
+        }
+        catch (CorruptSSTableException err)
+        {
+            return;
+        }
+        fail("Expected a CorruptSSTableException to be thrown");
+
+    }
+
+    protected void fillCF(ColumnFamilyStore cfs, String keyspace, String 
columnFamily, int rowsPerSSTable)
+    {
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+            ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(keyspace, columnFamily);
+            cf.addColumn(column("c1", "1", 1L));
+            cf.addColumn(column("c2", "2", 1L));
+            Mutation rm = new Mutation(keyspace, ByteBufferUtil.bytes(key), 
cf);
+            rm.apply();
+        }
+
+        cfs.forceBlockingFlush();
+    }
+
+    protected void fillCounterCF(ColumnFamilyStore cfs, String keyspace, 
String columnFamily, int rowsPerSSTable) throws WriteTimeoutException
+    {
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+            ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(keyspace, columnFamily);
+            Mutation rm = new Mutation(keyspace, ByteBufferUtil.bytes(key), 
cf);
+            rm.addCounter(columnFamily, cellname("Column1"), 100);
+            CounterMutation cm = new CounterMutation(rm, ConsistencyLevel.ONE);
+            cm.apply();
+        }
+
+        cfs.forceBlockingFlush();
+    }
+
+    protected long simpleFullChecksum(String filename) throws IOException
+    {
+        FileInputStream inputStream = new FileInputStream(filename);
+        Adler32 adlerChecksum = new Adler32();
+        CheckedInputStream cinStream = new CheckedInputStream(inputStream, 
adlerChecksum);
+        byte[] b = new byte[128];
+        while (cinStream.read(b) >= 0) {
+        }
+        return cinStream.getChecksum().getValue();
+    }
+
+    protected void writeChecksum(long checksum, String filePath)
+    {
+        File outFile = new File(filePath);
+        BufferedWriter out = null;
+        try
+        {
+            out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8);
+            out.write(String.valueOf(checksum));
+            out.flush();
+            out.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, outFile);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(out);
+        }
+
+    }
+
+}

Reply via email to