Author: jbellis
Date: Thu Oct 6 15:22:04 2011
New Revision: 1179662
URL: http://svn.apache.org/viewvc?rev=1179662&view=rev
Log:
close scrubbed sstable fd before deleting it
patch by jbellis; reviewed by slebresne for CASSANDRA-3318
Modified:
cassandra/branches/cassandra-1.0.0/CHANGES.txt
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1179662&r1=1179661&r2=1179662&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Thu Oct 6 15:22:04 2011
@@ -1,4 +1,5 @@
1.0.0-final
+ * close scrubbed sstable fd before deleting it (CASSANDRA-3318)
* fix bug preventing obsolete commitlog segments from being removed
(CASSANDRA-3269)
* tolerate whitespace in seed CDL (CASSANDRA-3263)
Modified:
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1179662&r1=1179661&r2=1179662&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Thu Oct 6 15:22:04 2011
@@ -483,10 +483,13 @@ public class CompactionManager implement
// row header (key or data size) is corrupt. (This means our position
in the index file will be one row
// "ahead" of the data file.)
final RandomAccessReader dataFile = sstable.openDataReader(true);
-
- String indexFilename =
sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
- RandomAccessReader indexFile = RandomAccessReader.open(new
File(indexFilename), true);
+ RandomAccessReader indexFile = RandomAccessReader.open(new
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+ executor.beginCompaction(scrubInfo);
+
+ SSTableWriter writer = null;
+ SSTableReader newSstable = null;
+ int goodRows = 0, badRows = 0, emptyRows = 0;
try
{
@@ -497,170 +500,155 @@ public class CompactionManager implement
assert firstRowPositionFromIndex == 0 :
firstRowPositionFromIndex;
}
- SSTableReader newSstable = null;
-
- // errors when creating the writer may leave empty temp files.
- SSTableWriter writer = maybeCreateWriter(cfs,
- compactionFileLocation,
- expectedBloomFilterSize,
- null,
-
Collections.singletonList(sstable));
-
- int goodRows = 0, badRows = 0, emptyRows = 0;
+ // TODO errors when creating the writer may leave empty temp files.
+ writer = maybeCreateWriter(cfs, compactionFileLocation,
expectedBloomFilterSize, null, Collections.singletonList(sstable));
- executor.beginCompaction(scrubInfo);
-
- try
+ while (!dataFile.isEOF())
{
- while (!dataFile.isEOF())
+ long rowStart = dataFile.getFilePointer();
+ if (logger.isDebugEnabled())
+ logger.debug("Reading row at " + rowStart);
+
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
{
- long rowStart = dataFile.getFilePointer();
+ key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ dataSize = sstable.descriptor.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
if (logger.isDebugEnabled())
- logger.debug("Reading row at " + rowStart);
+ logger.debug(String.format("row %s is %s bytes",
ByteBufferUtil.bytesToHex(key.key), dataSize));
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ // check for null key below
+ }
- DecoratedKey key = null;
- long dataSize = -1;
- try
- {
- key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
- dataSize = sstable.descriptor.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
- if (logger.isDebugEnabled())
- logger.debug(String.format("row %s is %s bytes",
ByteBufferUtil.bytesToHex(key.key), dataSize));
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- // check for null key below
- }
+ ByteBuffer currentIndexKey = nextIndexKey;
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null :
ByteBufferUtil.readWithShortLength(indexFile);
+ nextRowPositionFromIndex = indexFile.isEOF() ?
dataFile.length() : indexFile.readLong();
+ }
+ catch (Throwable th)
+ {
+ logger.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
- ByteBuffer currentIndexKey = nextIndexKey;
- long nextRowPositionFromIndex;
- try
- {
- nextIndexKey = indexFile.isEOF() ? null :
ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF() ?
dataFile.length() : indexFile.readLong();
- }
- catch (Throwable th)
- {
- logger.warn("Error reading index file", th);
- nextIndexKey = null;
- nextRowPositionFromIndex = dataFile.length();
- }
-
- long dataStart = dataFile.getFilePointer();
- long dataStartFromIndex = currentIndexKey == null
- ? -1
- : rowStart + 2 +
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
- long dataSizeFromIndex = nextRowPositionFromIndex -
dataStartFromIndex;
- assert currentIndexKey != null || indexFile.isEOF();
- if (logger.isDebugEnabled() && currentIndexKey != null)
- logger.debug(String.format("Index doublecheck: row %s
is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
+ long dataStart = dataFile.getFilePointer();
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
+ : rowStart + 2 +
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+ long dataSizeFromIndex = nextRowPositionFromIndex -
dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (logger.isDebugEnabled() && currentIndexKey != null)
+ logger.debug(String.format("Index doublecheck: row %s is
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
- writer.mark();
- try
+ writer.mark();
+ try
+ {
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row
key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row size
" + dataSize));
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
{
- if (key == null)
- throw new IOError(new IOException("Unable to read
row key from data file"));
- if (dataSize > dataFile.length())
- throw new IOError(new IOException("Impossible row
size " + dataSize));
- SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
- AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
- if (!key.key.equals(currentIndexKey) || dataStart !=
dataStartFromIndex)
- logger.warn("Index file contained a different key
or row size; using key from data file");
+ emptyRows++;
}
- catch (Throwable th)
+ else
{
- throwIfFatal(th);
- logger.warn("Non-fatal error reading row (stacktrace
follows)", th);
- writer.resetAndTruncate();
-
- if (currentIndexKey != null
- && (key == null ||
!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize
!= dataSizeFromIndex))
+ writer.append(compactedRow);
+ goodRows++;
+ }
+ if (!key.key.equals(currentIndexKey) || dataStart !=
dataStartFromIndex)
+ logger.warn("Index file contained a different key or
row size; using key from data file");
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ logger.warn("Non-fatal error reading row (stacktrace
follows)", th);
+ writer.resetAndTruncate();
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey) ||
dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ {
+ logger.info(String.format("Retrying from row index;
data is %s bytes starting at %s",
+ dataSizeFromIndex,
dataStartFromIndex));
+ key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, currentIndexKey);
+ try
{
- logger.info(String.format("Retrying from row
index; data is %s bytes starting at %s",
- dataSizeFromIndex,
dataStartFromIndex));
- key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, currentIndexKey);
- try
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex,
dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
{
- SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex,
dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
+ emptyRows++;
}
- catch (Throwable th2)
+ else
{
- throwIfFatal(th2);
- // Skipping rows is dangerous for counters
(see CASSANDRA-2759)
- if (isCommutative)
- throw new IOError(th2);
-
- logger.warn("Retry failed too. Skipping to
next row (retry's stacktrace follows)", th2);
- writer.resetAndTruncate();
- dataFile.seek(nextRowPositionFromIndex);
- badRows++;
+ writer.append(compactedRow);
+ goodRows++;
}
}
- else
+ catch (Throwable th2)
{
+ throwIfFatal(th2);
// Skipping rows is dangerous for counters (see
CASSANDRA-2759)
if (isCommutative)
- throw new IOError(th);
+ throw new IOError(th2);
- logger.warn("Row at " + dataStart + " is
unreadable; skipping to next");
- if (currentIndexKey != null)
- dataFile.seek(nextRowPositionFromIndex);
+ logger.warn("Retry failed too. Skipping to next
row (retry's stacktrace follows)", th2);
+ writer.resetAndTruncate();
+ dataFile.seek(nextRowPositionFromIndex);
badRows++;
}
}
+ else
+ {
+ // Skipping rows is dangerous for counters (see
CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th);
+
+ logger.warn("Row at " + dataStart + " is unreadable;
skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
}
-
- if (writer.getFilePointer() > 0)
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
- }
- finally
- {
- writer.cleanupIfNecessary();
}
- if (newSstable != null)
- {
- cfs.replaceCompactedSSTables(Arrays.asList(sstable),
Arrays.asList(newSstable));
- logger.info("Scrub of " + sstable + " complete: " + goodRows +
" rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
- if (badRows > 0)
- logger.warn("Unable to recover " + badRows + " rows that
were skipped. You can attempt manual recovery from the pre-scrub snapshot.
You can also run nodetool repair to transfer the data from a healthy replica,
if any");
- }
- else
- {
- cfs.markCompacted(Arrays.asList(sstable));
- if (badRows > 0)
- logger.warn("No valid rows found while scrubbing " +
sstable + "; it is marked for deletion now. If you want to attempt manual
recovery, you can find a copy in the pre-scrub snapshot");
- else
- logger.info("Scrub of " + sstable + " complete; looks like
all " + emptyRows + " rows were tombstoned");
- }
+ if (writer.getFilePointer() > 0)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
finally
{
+ if (writer != null)
+ writer.cleanupIfNecessary();
FileUtils.closeQuietly(dataFile);
FileUtils.closeQuietly(indexFile);
executor.finishCompaction(scrubInfo);
}
+
+ if (newSstable == null)
+ {
+ cfs.markCompacted(Arrays.asList(sstable));
+ if (badRows > 0)
+ logger.warn("No valid rows found while scrubbing " + sstable +
"; it is marked for deletion now. If you want to attempt manual recovery, you
can find a copy in the pre-scrub snapshot");
+ else
+ logger.info("Scrub of " + sstable + " complete; looks like all
" + emptyRows + " rows were tombstoned");
+ }
+ else
+ {
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable),
Arrays.asList(newSstable));
+ logger.info("Scrub of " + sstable + " complete: " + goodRows + "
rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+ if (badRows > 0)
+ logger.warn("Unable to recover " + badRows + " rows that were
skipped. You can attempt manual recovery from the pre-scrub snapshot. You can
also run nodetool repair to transfer the data from a healthy replica, if any");
+ }
}
private void throwIfFatal(Throwable th)