Author: jbellis
Date: Wed Nov  3 18:48:18 2010
New Revision: 1030605

URL: http://svn.apache.org/viewvc?rev=1030605&view=rev
Log:
enable skipping bad rows on LazilyCompacted path.
patch by jbellis; reviewed by Sylvain Lebresne for CASSANDRA-1702

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Nov  3 18:48:18 2010
@@ -15,6 +15,7 @@ dev
  * include jna dependency in RPM package (CASSANDRA-1690)
  * add --skip-keys option to stress.py (CASSANDRA-1696)
  * improve cli handling of non-string column names (CASSANDRA-1701)
+ * enable skipping bad rows on LazilyCompacted path (CASSANDRA-1702)
 
 
 0.7.0-beta3

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnSerializer.java
 Wed Nov  3 18:48:18 2010
@@ -64,6 +64,9 @@ public class ColumnSerializer implements
     public Column deserialize(DataInput dis) throws IOException
     {
         ByteBuffer name = FBUtilities.readShortByteArray(dis);
+        if (name.remaining() <= 0)
+            throw new CorruptColumnException("invalid column name length " + 
name.remaining());
+
         int b = dis.readUnsignedByte();
         if ((b & EXPIRATION_MASK) != 0)
         {
@@ -97,4 +100,12 @@ public class ColumnSerializer implements
             }
         }
     }
+
+    private static class CorruptColumnException extends IOException
+    {
+        public CorruptColumnException(String s)
+        {
+            super(s);
+        }
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
 Wed Nov  3 18:48:18 2010
@@ -272,16 +272,21 @@ public class CompactionManager implement
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, 
cfs.metadata, cfs.partitioner);
             while (nni.hasNext())
             {
-                AbstractCompactedRow row = nni.next();
+                writer.mark();
                 try
                 {
+                    AbstractCompactedRow row = nni.next();
                     writer.append(row);
                 }
-                catch (IOException ex)
+                catch (Exception e)
+                {
+                    logger.error("non-fatal error during compaction", e);
+                    writer.reset();
+                }
+                catch (IOError e)
                 {
-                    writer.abort();
-                    // rethrow the exception so that caller knows compaction 
failed.
-                    throw ex;
+                    logger.error("non-fatal error during compaction", e);
+                    writer.reset();
                 }
                 totalkeysWritten++;
             }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
 Wed Nov  3 18:48:18 2010
@@ -38,6 +38,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.ICompactionInfo;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
@@ -55,6 +56,7 @@ public class SSTableWriter extends SSTab
     private SegmentedFile.Builder dbuilder;
     private final BufferedRandomAccessFile dataFile;
     private DecoratedKey lastWrittenKey;
+    private FileMark dataMark;
 
     public SSTableWriter(String filename, long keyCount) throws IOException
     {
@@ -99,6 +101,25 @@ public class SSTableWriter extends SSTab
         }
     }
 
+    public void mark()
+    {
+        dataMark = dataFile.mark();
+        iwriter.mark();
+    }
+
+    public void reset()
+    {
+        try
+        {
+            dataFile.reset(dataMark);
+            iwriter.reset();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
     private long beforeAppend(DecoratedKey decoratedKey) throws IOException
     {
         if (decoratedKey == null)
@@ -121,8 +142,8 @@ public class SSTableWriter extends SSTab
 
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
-        dbuilder.addPotentialBoundary(dataPosition);
         iwriter.afterAppend(decoratedKey, dataPosition);
+        dbuilder.addPotentialBoundary(dataPosition);
     }
 
     public void append(AbstractCompactedRow row) throws IOException
@@ -176,7 +197,9 @@ public class SSTableWriter extends SSTab
         iwriter.close();
 
         // main data
+        long position = dataFile.getFilePointer();
         dataFile.close(); // calls force
+        FileUtils.truncate(dataFile.getPath(), position);
 
         // write sstable statistics
         writeStatistics(descriptor, estimatedRowSize, estimatedColumnCount);
@@ -358,7 +381,8 @@ public class SSTableWriter extends SSTab
         public final SegmentedFile.Builder builder;
         public final IndexSummary summary;
         public final BloomFilter bf;
-        
+        private FileMark mark;
+
         IndexWriter(Descriptor desc, IPartitioner part, long keyCount) throws 
IOException
         {
             this.desc = desc;
@@ -396,11 +420,25 @@ public class SSTableWriter extends SSTab
             stream.close();
 
             // index
-            indexFile.getChannel().force(true);
-            indexFile.close();
+            long position = indexFile.getFilePointer();
+            indexFile.close(); // calls force
+            FileUtils.truncate(indexFile.getPath(), position);
 
             // finalize in-memory index state
             summary.complete();
         }
+
+        public void mark()
+        {
+            mark = indexFile.mark();
+        }
+
+        public void reset() throws IOException
+        {
+            // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
+            // we can't reset dbuilder either, but that is the last thing 
called in afterappend so
+            // we assume that if that worked then we won't be trying to reset.
+            indexFile.reset(mark);
+        }
     }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java?rev=1030605&r1=1030604&r2=1030605&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/FileUtils.java
 Wed Nov  3 18:48:18 2010
@@ -19,7 +19,9 @@
 package org.apache.cassandra.io.util;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.text.DecimalFormat;
 import java.util.Comparator;
 import java.util.List;
@@ -27,9 +29,6 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.jna.LastErrorException;
-import org.apache.cassandra.utils.CLibrary;
-
 
 public class FileUtils
 {
@@ -65,6 +64,20 @@ public class FileUtils
             throw new IOException(String.format("Failed to rename %s to %s", 
from.getPath(), to.getPath()));
     }
 
+    public static void truncate(String path, long size) throws IOException
+    {
+        RandomAccessFile file;
+        try
+        {
+            file = new RandomAccessFile(path, "rw");
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+        file.getChannel().truncate(size);
+    }
+
     public static class FileComparator implements Comparator<File>
     {
         public int compare(File f, File f2)


Reply via email to