Author: jbellis
Date: Tue Mar 1 16:33:04 2011
New Revision: 1075890
URL: http://svn.apache.org/viewvc?rev=1075890&view=rev
Log:
merge 1075870 (make nodetool scrub more robust)
Modified:
cassandra/branches/cassandra-0.7.3/ (props changed)
cassandra/branches/cassandra-0.7.3/CHANGES.txt
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/branches/cassandra-0.7.3/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Propchange: cassandra/branches/cassandra-0.7.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1071777
-/cassandra/branches/cassandra-0.7:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/trunk:1026516-1026734,1028929
Modified: cassandra/branches/cassandra-0.7.3/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.3/CHANGES.txt?rev=1075890&r1=1075889&r2=1075890&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.3/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7.3/CHANGES.txt Tue Mar 1 16:33:04 2011
@@ -15,7 +15,7 @@
* update memtable_throughput to be a long (CASSANDRA-2158)
* fix for compaction and cleanup writing old-format data into new-version
sstable (CASSANDRA-2211, -2216)
- * add nodetool scrub (CASSANDRA-2217)
+ * add nodetool scrub (CASSANDRA-2217, -2240)
* fix sstable2json large-row pagination (CASSANDRA-2188)
* fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
* fix BufferedRandomAccessFile bugs (CASSANDRA-2218, -2241)
@@ -34,6 +34,9 @@
* fix starting up on Windows when CASSANDRA_HOME contains whitespace
(CASSANDRA-2237)
* add [get|set][row|key]cacheSavePeriod to JMX (CASSANDRA-2100)
+ * fix Hadoop ColumnFamilyOutputFormat dropping of mutations
+ when batch fills up (CASSANDRA-2255)
+ * move file deletions off of scheduledtasks executor (CASSANDRA-2253)
0.7.2
Propchange:
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
Propchange:
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
Propchange:
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
Propchange:
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
Propchange:
cassandra/branches/cassandra-0.7.3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 1 16:33:04 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269,1075198,1075594,1075627
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269,1075198,1075594,1075627,1075870
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
Modified:
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1075890&r1=1075889&r2=1075890&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/db/CompactionManager.java
Tue Mar 1 16:33:04 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.DataOutput;
import java.io.File;
+import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
@@ -525,70 +526,132 @@ public class CompactionManager implement
String indexFilename =
sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
BufferedRandomAccessFile indexFile =
BufferedRandomAccessFile.getUncachingReader(indexFilename);
ByteBuffer nextIndexKey =
ByteBufferUtil.readWithShortLength(indexFile);
- assert indexFile.readLong() == 0;
+ {
+ // throw away variable so we don't have a side effect in the
assert
+ long firstRowPositionFromIndex = indexFile.readLong();
+ assert firstRowPositionFromIndex == 0 :
firstRowPositionFromIndex;
+ }
SSTableWriter writer = maybeCreateWriter(cfs,
compactionFileLocation, expectedBloomFilterSize, null);
executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile,
sstable));
+ int goodRows = 0, badRows = 0;
while (!dataFile.isEOF())
{
long rowStart = dataFile.getFilePointer();
if (logger.isDebugEnabled())
logger.debug("Reading row at " + rowStart);
- DecoratedKey key =
SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor,
ByteBufferUtil.readWithShortLength(dataFile));
+
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
+ {
+ key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ dataSize = sstable.descriptor.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("row %s is %s bytes",
ByteBufferUtil.bytesToHex(key.key), dataSize));
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ // check for null key below
+ }
+
ByteBuffer currentIndexKey = nextIndexKey;
- nextIndexKey = indexFile.isEOF() ? null :
ByteBufferUtil.readWithShortLength(indexFile);
- long nextRowPositionFromIndex = indexFile.isEOF() ?
dataFile.length() : indexFile.readLong();
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null :
ByteBufferUtil.readWithShortLength(indexFile);
+ nextRowPositionFromIndex = indexFile.isEOF() ?
dataFile.length() : indexFile.readLong();
+ }
+ catch (Throwable th)
+ {
+ logger.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
- long dataSize = sstable.descriptor.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
long dataStart = dataFile.getFilePointer();
- if (logger.isDebugEnabled())
- logger.debug(String.format("row %s is %s bytes",
ByteBufferUtil.bytesToHex(key.key), dataSize));
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
+ : rowStart + 2 +
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+ long dataSizeFromIndex = nextRowPositionFromIndex -
dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (logger.isDebugEnabled() && currentIndexKey != null)
+ logger.debug(String.format("Index doublecheck: row %s is
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
- SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
writer.mark();
try
{
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row
key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row size
" + dataSize));
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
writer.append(getCompactedRow(row, cfs,
sstable.descriptor, true));
+ goodRows++;
+ if (!key.key.equals(currentIndexKey) || dataStart !=
dataStartFromIndex)
+ logger.warn("Row scrubbed successfully but index file
contains a different key or row size; consider rebuilding the index as
described in
http://www.mail-archive.com/[email protected]/msg03325.html");
}
- catch (Exception e)
+ catch (Throwable th)
{
- logger.warn("Error reading row " +
ByteBufferUtil.bytesToHex(key.key) + "(stacktrace follows)", e);
+ throwIfFatal(th);
+ logger.warn("Non-fatal error reading row (stacktrace
follows)", th);
writer.reset();
-
- long dataStartFromIndex = rowStart + 2 +
currentIndexKey.remaining();
- if (!key.key.equals(currentIndexKey) || dataStart !=
dataStartFromIndex)
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey) ||
dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
{
- logger.info(String.format("Retrying %s as key %s from
row index",
-
ByteBufferUtil.bytesToHex(key.key),
ByteBufferUtil.bytesToHex(currentIndexKey)));
+ logger.info(String.format("Retrying from row index;
data is %s bytes starting at %s",
+ dataSizeFromIndex,
dataStartFromIndex));
key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, currentIndexKey);
- long dataSizeFromIndex = nextRowPositionFromIndex -
dataStartFromIndex;
- row = new SSTableIdentityIterator(sstable, dataFile,
key, dataStartFromIndex, dataSizeFromIndex, true);
try
{
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex,
dataSizeFromIndex, true);
writer.append(getCompactedRow(row, cfs,
sstable.descriptor, true));
+ goodRows++;
}
- catch (Exception e2)
+ catch (Throwable th2)
{
- logger.info("Retry failed too. Skipping to next
row (retry's stacktrace follows)", e2);
+ throwIfFatal(th2);
+ logger.warn("Retry failed too. Skipping to next
row (retry's stacktrace follows)", th2);
writer.reset();
dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
}
}
else
{
- logger.info("Skipping to next row");
- dataFile.seek(nextRowPositionFromIndex);
+ logger.warn("Row is unreadable; skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
}
}
}
- SSTableReader newSstable =
writer.closeAndOpenReader(sstable.maxDataAge);
- cfs.replaceCompactedSSTables(Arrays.asList(sstable),
Arrays.asList(newSstable));
- logger.info("Scrub of " + sstable + " complete");
+ if (writer.getFilePointer() > 0)
+ {
+ SSTableReader newSstable =
writer.closeAndOpenReader(sstable.maxDataAge);
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable),
Arrays.asList(newSstable));
+ logger.info("Scrub of " + sstable + " complete: " + goodRows +
" rows in new sstable");
+ if (badRows > 0)
+ logger.warn("Unable to recover " + badRows + " that were
skipped. You can attempt manual recovery from the pre-scrub snapshot. You can
also run nodetool repair to transfer the data from a healthy replica, if any");
+ }
+ else
+ {
+ cfs.markCompacted(Arrays.asList(sstable));
+ logger.warn("No valid rows found while scrubbing " + sstable +
"; it is marked for deletion now. If you want to attempt manual recovery, you
can find a copy in the pre-scrub snapshot");
+ }
}
}
+ private void throwIfFatal(Throwable th)
+ {
+ if (th instanceof Error && !(th instanceof AssertionError || th
instanceof IOError))
+ throw (Error) th;
+ }
+
/**
* This function goes over each file and removes the keys that the node is
not responsible for
* and only keeps keys that this node is responsible for.
Modified:
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1075890&r1=1075889&r2=1075890&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
(original)
+++
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
Tue Mar 1 16:33:04 2011
@@ -89,22 +89,31 @@ public class IndexHelper
return indexList;
}
+ public static Filter defreezeBloomFilter(FileDataInput file, boolean
usesOldBloomFilter) throws IOException
+ {
+ return defreezeBloomFilter(file, Integer.MAX_VALUE,
usesOldBloomFilter);
+ }
+
/**
* De-freeze the bloom filter.
*
* @param file - source file
+ * @param maxSize - sanity check: if filter claimes to be larger than this
it is bogus
* @param useOldBuffer - do we need to reuse old buffer?
*
* @return bloom filter summarizing the column information
* @throws java.io.IOException if an I/O error occurs.
+ * Guarantees that file's current position will be just after the bloom
filter, even if
+ * the filter cannot be deserialized, UNLESS EOFException is thrown.
*/
- public static Filter defreezeBloomFilter(FileDataInput file, boolean
useOldBuffer) throws IOException
+ public static Filter defreezeBloomFilter(FileDataInput file, long maxSize,
boolean useOldBuffer) throws IOException
{
int size = file.readInt();
+ if (size > maxSize || size <= 0)
+ throw new EOFException("bloom filter claims to be longer than
entire row size");
ByteBuffer bytes = file.readBytes(size);
DataInputStream stream = new
DataInputStream(ByteBufferUtil.inputStream(bytes));
-
return useOldBuffer
? LegacyBloomFilter.serializer().deserialize(stream)
: BloomFilter.serializer().deserialize(stream);
Modified:
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1075890&r1=1075889&r2=1075890&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
(original)
+++
cassandra/branches/cassandra-0.7.3/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
Tue Mar 1 16:33:04 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.io.sstable;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOError;
import java.io.IOException;
import java.util.ArrayList;
@@ -83,11 +84,14 @@ public class SSTableIdentityIterator imp
{
try
{
- IndexHelper.defreezeBloomFilter(file,
sstable.descriptor.usesOldBloomFilter);
+ IndexHelper.defreezeBloomFilter(file, dataSize,
sstable.descriptor.usesOldBloomFilter);
}
catch (Exception e)
{
- logger.info("Invalid bloom filter in " + sstable + "; will
rebuild it");
+ if (e instanceof EOFException)
+ throw (EOFException) e;
+
+ logger.debug("Invalid bloom filter in {}; will rebuild
it", sstable);
// deFreeze should have left the file position ready to
deserialize index
}
try
@@ -96,7 +100,7 @@ public class SSTableIdentityIterator imp
}
catch (Exception e)
{
- logger.info("Invalid row summary in " + sstable + "; will
rebuild it");
+ logger.debug("Invalid row summary in {}; will rebuild it",
sstable);
}
file.seek(this.dataStart);
}
Modified:
cassandra/branches/cassandra-0.7.3/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.3/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1075890&r1=1075889&r2=1075890&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.3/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
(original)
+++
cassandra/branches/cassandra-0.7.3/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Tue Mar 1 16:33:04 2011
@@ -91,8 +91,8 @@ public class LazilyCompactedRowTest exte
assertEquals(out1.getLength(), rowSize1 + 8);
assertEquals(out2.getLength(), rowSize2 + 8);
// bloom filter
- IndexHelper.defreezeBloomFilter(in1, false);
- IndexHelper.defreezeBloomFilter(in2, false);
+ IndexHelper.defreezeBloomFilter(in1, rowSize1, false);
+ IndexHelper.defreezeBloomFilter(in2, rowSize2, false);
// index
int indexSize1 = in1.readInt();
int indexSize2 = in2.readInt();