Author: jbellis
Date: Wed Nov 3 18:48:18 2010
New Revision: 1030605
URL: http://svn.apache.org/viewvc?rev=1030605&view=rev
Log:
enable skipping bad rows on LazilyCompacted path.
patch by jbellis; reviewed by Sylvain Lebresne for CASSANDRA-1702
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Nov 3 18:48:18 2010
@@ -15,6 +15,7 @@ dev
* include jna dependency in RPM package (CASSANDRA-1690)
* add --skip-keys option to stress.py (CASSANDRA-1696)
* improve cli handling of non-string column names (CASSANDRA-1701)
+ * enable skipping bad rows on LazilyCompacted path (CASSANDRA-1702)
0.7.0-beta3
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
Wed Nov 3 18:48:18 2010
@@ -64,6 +64,9 @@ public class ColumnSerializer implements
public Column deserialize(DataInput dis) throws IOException
{
ByteBuffer name = FBUtilities.readShortByteArray(dis);
+ if (name.remaining() <= 0)
+ throw new CorruptColumnException("invalid column name length " +
name.remaining());
+
int b = dis.readUnsignedByte();
if ((b & EXPIRATION_MASK) != 0)
{
@@ -97,4 +100,12 @@ public class ColumnSerializer implements
}
}
}
+
+ private static class CorruptColumnException extends IOException
+ {
+ public CorruptColumnException(String s)
+ {
+ super(s);
+ }
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Nov 3 18:48:18 2010
@@ -272,16 +272,21 @@ public class CompactionManager implement
writer = new SSTableWriter(newFilename, expectedBloomFilterSize,
cfs.metadata, cfs.partitioner);
while (nni.hasNext())
{
- AbstractCompactedRow row = nni.next();
+ writer.mark();
try
{
+ AbstractCompactedRow row = nni.next();
writer.append(row);
}
- catch (IOException ex)
+ catch (Exception e)
+ {
+ logger.error("non-fatal error during compaction", e);
+ writer.reset();
+ }
+ catch (IOError e)
{
- writer.abort();
- // rethrow the exception so that caller knows compaction
failed.
- throw ex;
+ logger.error("non-fatal error during compaction", e);
+ writer.reset();
}
totalkeysWritten++;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Wed Nov 3 18:48:18 2010
@@ -38,6 +38,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.ICompactionInfo;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.service.StorageService;
@@ -55,6 +56,7 @@ public class SSTableWriter extends SSTab
private SegmentedFile.Builder dbuilder;
private final BufferedRandomAccessFile dataFile;
private DecoratedKey lastWrittenKey;
+ private FileMark dataMark;
public SSTableWriter(String filename, long keyCount) throws IOException
{
@@ -99,6 +101,25 @@ public class SSTableWriter extends SSTab
}
}
+ public void mark()
+ {
+ dataMark = dataFile.mark();
+ iwriter.mark();
+ }
+
+ public void reset()
+ {
+ try
+ {
+ dataFile.reset(dataMark);
+ iwriter.reset();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
private long beforeAppend(DecoratedKey decoratedKey) throws IOException
{
if (decoratedKey == null)
@@ -121,8 +142,8 @@ public class SSTableWriter extends SSTab
if (logger.isTraceEnabled())
logger.trace("wrote " + decoratedKey + " at " + dataPosition);
- dbuilder.addPotentialBoundary(dataPosition);
iwriter.afterAppend(decoratedKey, dataPosition);
+ dbuilder.addPotentialBoundary(dataPosition);
}
public void append(AbstractCompactedRow row) throws IOException
@@ -176,7 +197,9 @@ public class SSTableWriter extends SSTab
iwriter.close();
// main data
+ long position = dataFile.getFilePointer();
dataFile.close(); // calls force
+ FileUtils.truncate(dataFile.getPath(), position);
// write sstable statistics
writeStatistics(descriptor, estimatedRowSize, estimatedColumnCount);
@@ -358,7 +381,8 @@ public class SSTableWriter extends SSTab
public final SegmentedFile.Builder builder;
public final IndexSummary summary;
public final BloomFilter bf;
-
+ private FileMark mark;
+
IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws
IOException
{
this.desc = desc;
@@ -396,11 +420,25 @@ public class SSTableWriter extends SSTab
stream.close();
// index
- indexFile.getChannel().force(true);
- indexFile.close();
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
// finalize in-memory index state
summary.complete();
}
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void reset() throws IOException
+ {
+ // we can't un-set the bloom filter addition, but extra keys in
there are harmless.
+ // we can't reset dbuilder either, but that is the last thing
called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.reset(mark);
+ }
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
Wed Nov 3 18:48:18 2010
@@ -19,7 +19,9 @@
package org.apache.cassandra.io.util;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.text.DecimalFormat;
import java.util.Comparator;
import java.util.List;
@@ -27,9 +29,6 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.jna.LastErrorException;
-import org.apache.cassandra.utils.CLibrary;
-
public class FileUtils
{
@@ -65,6 +64,20 @@ public class FileUtils
throw new IOException(String.format("Failed to rename %s to %s",
from.getPath(), to.getPath()));
}
+ public static void truncate(String path, long size) throws IOException
+ {
+ RandomAccessFile file;
+ try
+ {
+ file = new RandomAccessFile(path, "rw");
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ file.getChannel().truncate(size);
+ }
+
public static class FileComparator implements Comparator<File>
{
public int compare(File f, File f2)