Author: jbellis
Date: Sat Jun 19 16:11:49 2010
New Revision: 956250

URL: http://svn.apache.org/viewvc?rev=956250&view=rev
Log:
split commitlog header into separate file and add size checksum to mutations.  
patch by mdennis and jbellis for CASSANDRA-1179

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
      - copied, changed from r956168, 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sat Jun 19 16:11:49 2010
@@ -30,6 +30,8 @@ dev
  * avoid reading large rows into memory during compaction (CASSANDRA-16)
  * added hadoop OutputFormat (CASSANDRA-1101)
  * efficient Streaming (no more anticompaction) (CASSANDRA-579)
+ * split commitlog header into separate file and add size checksum to
+   mutations (CASSANDRA-1179)
 
 
 0.6.3

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Sat Jun 19 16:11:49 2010
@@ -880,7 +880,7 @@ public class DatabaseDescriptor
         return dataFileDirectory;
     }
 
-    public static String getLogFileLocation()
+    public static String getCommitLogLocation()
     {
         return conf.commitlog_directory;
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Sat Jun 19 16:11:49 2010
@@ -155,9 +155,8 @@ public class CommitLog
 
     public static void recover() throws IOException
     {
-        String directory = DatabaseDescriptor.getLogFileLocation();
-        File file = new File(directory);
-        File[] files = file.listFiles(new FilenameFilter()
+        String directory = DatabaseDescriptor.getCommitLogLocation();
+        File[] files = new File(directory).listFiles(new FilenameFilter()
         {
             public boolean accept(File dir, String name)
             {
@@ -170,7 +169,11 @@ public class CommitLog
         Arrays.sort(files, new FileUtils.FileComparator());
         logger.info("Replaying " + StringUtils.join(files, ", "));
         recover(files);
-        FileUtils.delete(files);
+        for (File f : files)
+        {
+            
FileUtils.delete(CommitLogHeader.getHeaderPathFromSegmentPath(f.getAbsolutePath()));
 // may not actually exist
+            FileUtils.deleteWithConfirm(f);
+        }
         logger.info("Log replay complete");
     }
 
@@ -180,28 +183,24 @@ public class CommitLog
         final AtomicInteger counter = new AtomicInteger(0);
         for (File file : clogs)
         {
+            CommitLogHeader clHeader = null;
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new 
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
 
-            final CommitLogHeader clHeader;
+            int replayPosition = 0;
             try
             {
-                clHeader = CommitLogHeader.readCommitLogHeader(reader);
+                clHeader = 
CommitLogHeader.readCommitLogHeader(CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath()));
+                replayPosition = clHeader.getReplayPosition();
             }
-            catch (EOFException eofe)
+            catch (IOException ioe)
             {
-                logger.info("Attempted to recover an incomplete 
CommitLogHeader.  Everything is ok, don't panic.");
-                continue;
+                logger.info("Attempted to read an incomplete, missing or 
corrupt CommitLogHeader.  Everything is ok, don't panic.  CommitLog will be 
replayed from the beginning", ioe);
             }
+            reader.seek(replayPosition);
 
-            /* seek to the lowest position where any CF has non-flushed data */
-            int lowPos = CommitLogHeader.getLowestPosition(clHeader);
-            if (lowPos == 0)
-                break;
-
-            reader.seek(lowPos);
             if (logger.isDebugEnabled())
-                logger.debug("Replaying " + file + " starting at " + lowPos);
+                logger.debug("Replaying " + file + " starting at " + 
reader.getFilePointer());
 
             /* read the logs populate RowMutation and apply */
             while (!reader.isEOF())
@@ -211,29 +210,36 @@ public class CommitLog
 
                 long claimedCRC32;
                 byte[] bytes;
+
+                Checksum checksum = new CRC32();
                 try
                 {
-                    bytes = new byte[reader.readInt()]; // readInt can throw 
EOFException too
+                    // any of the reads may hit EOF
+                    int size = reader.readInt();
+                    long claimedSizeChecksum = reader.readLong();
+                    checksum.update(size);
+                    if (checksum.getValue() != claimedSizeChecksum || size <= 
0)
+                        break; // entry wasn't synced correctly/fully.  that's 
ok.
+
+                    bytes = new byte[size];
                     reader.readFully(bytes);
                     claimedCRC32 = reader.readLong();
                 }
-                catch (EOFException e)
+                catch(EOFException eof)
                 {
-                    // last CL entry didn't get completely written.  that's ok.
-                    break;
+                    break; // last CL entry didn't get completely written.  
that's ok.
                 }
 
-                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
-                Checksum checksum = new CRC32();
                 checksum.update(bytes, 0, bytes.length);
                 if (claimedCRC32 != checksum.getValue())
                 {
-                    // this part of the log must not have been fsynced.  
probably the rest is bad too,
-                    // but just in case there is no harm in trying them.
+                    // this entry must not have been fsynced.  probably the 
rest is bad too,
+                    // but just in case there is no harm in trying them (since 
we still read on an entry boundary)
                     continue;
                 }
 
                 /* deserialize the commit log entry */
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
                 final RowMutation rm = 
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
                 if (logger.isDebugEnabled())
                     logger.debug(String.format("replaying mutation for %s.%s: 
%s",
@@ -244,6 +250,7 @@ public class CommitLog
                 tablesRecovered.add(table);
                 final Collection<ColumnFamily> columnFamilies = new 
ArrayList<ColumnFamily>(rm.getColumnFamilies());
                 final long entryLocation = reader.getFilePointer();
+                final CommitLogHeader finalHeader = clHeader;
                 Runnable runnable = new WrappedRunnable()
                 {
                     public void runMayThrow() throws IOException
@@ -259,7 +266,7 @@ public class CommitLog
                                 // null means the cf has been dropped
                                 continue;
                             
-                            if (clHeader.isDirty(columnFamily.id()) && 
entryLocation >= clHeader.getPosition(columnFamily.id()))
+                            if (finalHeader == null || 
(finalHeader.isDirty(columnFamily.id()) && entryLocation >= 
finalHeader.getPosition(columnFamily.id())))
                                 newRm.add(columnFamily);
                         }
                         if (!newRm.isEmpty())
@@ -424,6 +431,7 @@ public class CommitLog
             {
                 logger.info("Discarding obsolete commit log:" + segment);
                 segment.close();
+                DeletionService.submitDelete(segment.getHeaderPath());
                 DeletionService.submitDelete(segment.getPath());
                 // usually this will be the first (remaining) segment, but not 
always, if segment A contains
                 // writes to a CF that is unflushed but is followed by segment 
B whose CFs are all flushed.

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
Sat Jun 19 16:11:49 2010
@@ -19,39 +19,30 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.Pair;
-
-class CommitLogHeader
-{    
-    static CommitLogHeaderSerializer serializer = new 
CommitLogHeaderSerializer();
+import org.apache.cassandra.io.ICompactSerializer2;
 
-    static int getLowestPosition(CommitLogHeader clheader)
+public class CommitLogHeader
+{
+    public static String getHeaderPathFromSegment(CommitLogSegment segment)
     {
-        return clheader.lastFlushedAt.size() == 0 ? 0 : 
Collections.min(clheader.lastFlushedAt.values(), new Comparator<Integer>(){
-            public int compare(Integer o1, Integer o2)
-            {
-                if (o1 == 0)
-                    return 1;
-                else if (o2 == 0)
-                    return -1;
-                else
-                    return o1 - o2;
-            }
-        });
+        return getHeaderPathFromSegmentPath(segment.getPath());
+    }
+
+    public static String getHeaderPathFromSegmentPath(String segmentPath)
+    {
+        return segmentPath + ".header";
     }
 
-    private Map<Integer, Integer> lastFlushedAt; // position at which each CF 
was last flushed
+    public static CommitLogHeaderSerializer serializer = new 
CommitLogHeaderSerializer();
+
+    private Map<Integer, Integer> cfDirtiedAt; // position at which each CF 
was last flushed
     private final int cfCount; // we keep this in case cfcount changes in the 
interim (size of lastFlushedAt is not a good indication).
     
     CommitLogHeader()
@@ -64,46 +55,37 @@ class CommitLogHeader
      * also builds an index of position to column family
      * Id.
     */
-    private CommitLogHeader(Map<Integer, Integer> lastFlushedAt, int cfCount)
+    private CommitLogHeader(Map<Integer, Integer> cfDirtiedAt, int cfCount)
     {
         this.cfCount = cfCount;
-        this.lastFlushedAt = lastFlushedAt;
-        assert lastFlushedAt.size() <= cfCount;
+        this.cfDirtiedAt = cfDirtiedAt;
+        assert cfDirtiedAt.size() <= cfCount;
     }
         
     boolean isDirty(int cfId)
     {
-        return lastFlushedAt.containsKey(cfId);
+        return cfDirtiedAt.containsKey(cfId);
     } 
     
     int getPosition(int index)
     {
-        Integer x = lastFlushedAt.get(index);
+        Integer x = cfDirtiedAt.get(index);
         return x == null ? 0 : x;
     }
     
     void turnOn(int cfId, long position)
     {
-        lastFlushedAt.put(cfId, (int)position);
+        cfDirtiedAt.put(cfId, (int)position);
     }
 
     void turnOff(int cfId)
     {
-        lastFlushedAt.remove(cfId);
+        cfDirtiedAt.remove(cfId);
     }
 
     boolean isSafeToDelete() throws IOException
     {
-        return lastFlushedAt.isEmpty();
-    }
-
-    byte[] toByteArray() throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        serializer.serialize(this, dos);
-        dos.flush();
-        return bos.toByteArray();
+        return cfDirtiedAt.isEmpty();
     }
     
     // we use cf ids. getting the cf names would be pretty pretty expensive.
@@ -111,7 +93,7 @@ class CommitLogHeader
     {
         StringBuilder sb = new StringBuilder("");
         sb.append("CLH(dirty+flushed={");
-        for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
+        for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
         {       
             sb.append(entry.getKey()).append(": 
").append(entry.getValue()).append(", ");
         }
@@ -122,36 +104,67 @@ class CommitLogHeader
     public String dirtyString()
     {
         StringBuilder sb = new StringBuilder();
-        for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
+        for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
             sb.append(entry.getKey()).append(", ");
         return sb.toString();
     }
 
-    static CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile 
logReader) throws IOException
+    static void writeCommitLogHeader(CommitLogHeader header, String 
headerFile) throws IOException
+    {
+        DataOutputStream out = null;
+        try
+        {
+            /*
+             * FileOutputStream doesn't sync on flush/close.
+             * As headers are "optional" now there is no reason to sync it.
+             * This provides nearly double the performance of BRAF, more under 
heavey load.
+             */
+            out = new DataOutputStream(new FileOutputStream(headerFile));
+            serializer.serialize(header, out);
+        }
+        finally
+        {
+            if (out != null)
+                out.close();
+        }
+    }
+
+    static CommitLogHeader readCommitLogHeader(String headerFile) throws 
IOException
+    {
+        DataInputStream reader = null;
+        try
+        {
+            reader = new DataInputStream(new FileInputStream(headerFile));
+            return serializer.deserialize(reader);
+        }
+        finally
+        {
+            if (reader != null)
+                reader.close();
+        }
+    }
+
+    int getReplayPosition()
     {
-        int statedSize = logReader.readInt();
-        byte[] bytes = new byte[statedSize];
-        logReader.readFully(bytes);
-        ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
-        return serializer.deserialize(new DataInputStream(byteStream));
+        return cfDirtiedAt.isEmpty() ? 0 : 
Collections.min(cfDirtiedAt.values());
     }
 
-    static class CommitLogHeaderSerializer implements 
ICompactSerializer<CommitLogHeader>
+    static class CommitLogHeaderSerializer implements 
ICompactSerializer2<CommitLogHeader>
     {
-        public void serialize(CommitLogHeader clHeader, DataOutputStream dos) 
throws IOException
+        public void serialize(CommitLogHeader clHeader, DataOutput dos) throws 
IOException
         {
-            assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+            assert clHeader.cfDirtiedAt.size() <= clHeader.cfCount;
             Checksum checksum = new CRC32();
 
             // write the first checksum after the fixed-size part, so we won't 
read garbage lastFlushedAt data.
             dos.writeInt(clHeader.cfCount); // 4
-            dos.writeInt(clHeader.lastFlushedAt.size()); // 4
+            dos.writeInt(clHeader.cfDirtiedAt.size()); // 4
             checksum.update(clHeader.cfCount);
-            checksum.update(clHeader.lastFlushedAt.size());
+            checksum.update(clHeader.cfDirtiedAt.size());
             dos.writeLong(checksum.getValue());
 
             // write the 2nd checksum after the lastflushedat map
-            for (Map.Entry<Integer, Integer> entry : 
clHeader.lastFlushedAt.entrySet())
+            for (Map.Entry<Integer, Integer> entry : 
clHeader.cfDirtiedAt.entrySet())
             {
                 dos.writeInt(entry.getKey()); // 4
                 checksum.update(entry.getKey());
@@ -161,14 +174,14 @@ class CommitLogHeader
             dos.writeLong(checksum.getValue());
 
             // keep the size constant by padding for missing flushed-at 
entries.  these do not affect checksum.
-            for (int i = clHeader.lastFlushedAt.entrySet().size(); i < 
clHeader.cfCount; i++)
+            for (int i = clHeader.cfDirtiedAt.entrySet().size(); i < 
clHeader.cfCount; i++)
             {
                 dos.writeInt(0);
                 dos.writeInt(0);
             }
         }
 
-        public CommitLogHeader deserialize(DataInputStream dis) throws 
IOException
+        public CommitLogHeader deserialize(DataInput dis) throws IOException
         {
             Checksum checksum = new CRC32();
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 Sat Jun 19 16:11:49 2010
@@ -1,6 +1,4 @@
-package org.apache.cassandra.db.commitlog;
 /*
- * 
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +18,7 @@ package org.apache.cassandra.db.commitlo
  * 
  */
 
+package org.apache.cassandra.db.commitlog;
 
 import java.io.File;
 import java.io.IOError;
@@ -27,15 +26,13 @@ import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import org.apache.cassandra.config.CFMetaData;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 
@@ -49,13 +46,13 @@ public class CommitLogSegment
     public CommitLogSegment()
     {
         this.header = new CommitLogHeader();
-        String logFile = DatabaseDescriptor.getLogFileLocation() + 
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
+        String logFile = DatabaseDescriptor.getCommitLogLocation() + 
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
         logger.info("Creating new commitlog segment " + logFile);
 
         try
         {
             logWriter = createWriter(logFile);
-            writeCommitLogHeader(header.toByteArray());
+            writeHeader();
         }
         catch (IOException e)
         {
@@ -70,25 +67,7 @@ public class CommitLogSegment
 
     public void writeHeader() throws IOException
     {
-        seekAndWriteCommitLogHeader(header.toByteArray());
-    }
-
-    /** writes header at the beginning of the file, then seeks back to current 
position */
-    void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
-    {
-        long currentPos = logWriter.getFilePointer();
-        logWriter.seek(0);
-
-        writeCommitLogHeader(bytes);
-
-        logWriter.seek(currentPos);
-    }
-
-    private void writeCommitLogHeader(byte[] bytes) throws IOException
-    {
-        logWriter.writeInt(bytes.length);
-        logWriter.write(bytes);
-        logWriter.sync();
+        CommitLogHeader.writeCommitLogHeader(header, getHeaderPath());
     }
 
     private static BufferedRandomAccessFile createWriter(String file) throws 
IOException
@@ -121,29 +100,30 @@ public class CommitLogSegment
                     if (!header.isDirty(id))
                     {
                         header.turnOn(id, logWriter.getFilePointer());
-                        seekAndWriteCommitLogHeader(header.toByteArray());
+                        writeHeader();
                     }
                 }
             }
 
-            // write mutation, w/ checksum
-            Checksum checkum = new CRC32();
+            // write mutation, w/ checksum on the size and data
+            byte[] bytes;
+            Checksum checksum = new CRC32();
             if (serializedRow instanceof DataOutputBuffer)
             {
-                DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
-                logWriter.writeInt(buffer.getLength());
-                logWriter.write(buffer.getData(), 0, buffer.getLength());
-                checkum.update(buffer.getData(), 0, buffer.getLength());
+                bytes = ((DataOutputBuffer) serializedRow).getData();
             }
             else
             {
                 assert serializedRow instanceof byte[];
-                byte[] bytes = (byte[]) serializedRow;
-                logWriter.writeInt(bytes.length);
-                logWriter.write(bytes);
-                checkum.update(bytes, 0, bytes.length);
+                bytes = (byte[]) serializedRow;
             }
-            logWriter.writeLong(checkum.getValue());
+
+            checksum.update(bytes.length);
+            logWriter.writeInt(bytes.length);
+            logWriter.writeLong(checksum.getValue());
+            logWriter.write(bytes);
+            checksum.update(bytes, 0, bytes.length);
+            logWriter.writeLong(checksum.getValue());
 
             return cLogCtx;
         }
@@ -175,6 +155,11 @@ public class CommitLogSegment
         return logWriter.getPath();
     }
 
+    public String getHeaderPath()
+    {
+        return CommitLogHeader.getHeaderPathFromSegment(this);
+    }
+
     public long length()
     {
         try

Modified: cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Sat Jun 
19 16:11:49 2010
@@ -45,7 +45,7 @@ public class CleanupHelper extends Schem
     {
         // clean up commitlog
         String[] directoryNames = {
-                DatabaseDescriptor.getLogFileLocation(),
+                DatabaseDescriptor.getCommitLogLocation(),
         };
         for (String dirName : directoryNames)
         {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Sat 
Jun 19 16:11:49 2010
@@ -16,18 +16,22 @@
 * specific language governing permissions and limitations
 * under the License.
 */
+
 package org.apache.cassandra.db;
 
+import java.io.*;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.junit.Test;
+
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.junit.Test;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.db.commitlog.CommitLogHeader;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.Pair;
 
 public class CommitLogTest extends CleanupHelper
 {
@@ -63,13 +67,119 @@ public class CommitLogTest extends Clean
     }
 
     @Test
-    public void testRecoveryWithPartiallyWrittenHeader() throws Exception
+    public void testRecoveryWithEmptyHeader() throws Exception
+    {
+        testRecovery(new byte[0], new byte[10]);
+    }
+
+    @Test
+    public void testRecoveryWithShortHeader() throws Exception
+    {
+        testRecovery(new byte[2], new byte[10]);
+    }
+
+    @Test
+    public void testRecoveryWithGarbageHeader() throws Exception
+    {
+        byte[] garbage = new byte[100];
+        (new java.util.Random()).nextBytes(garbage);
+        testRecovery(garbage, garbage);
+    }
+
+    @Test
+    public void testRecoveryWithEmptyLog() throws Exception
+    {
+        CommitLog.recover(new File[] {tmpFiles().right});
+    }
+
+    @Test
+    public void testRecoveryWithShortLog() throws Exception
+    {
+        // force EOF while reading log
+        testRecoveryWithBadSizeArgument(100, 10);
+    }
+
+    @Test
+    public void testRecoveryWithShortSize() throws Exception
+    {
+        testRecovery(new byte[0], new byte[2]);
+    }
+
+    @Test
+    public void testRecoveryWithShortCheckSum() throws Exception
+    {
+        testRecovery(new byte[0], new byte[6]);
+    }
+
+    @Test
+    public void testRecoveryWithGarbageLog() throws Exception
+    {
+        byte[] garbage = new byte[100];
+        (new java.util.Random()).nextBytes(garbage);
+        testRecovery(new byte[0], garbage);
+    }
+
+    @Test
+    public void testRecoveryWithBadSizeChecksum() throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(100);
+        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+    }
+    
+    @Test
+    public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
+    {
+        // many different combinations of 4 bytes (garbage) will be read as 
zero by readInt()
+        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
+    }
+
+    @Test
+    public void testRecoveryWithNegativeSizeArgument() throws Exception
+    {
+        // garbage from a partial/bad flush could be read as a negative size 
even if there is no EOF
+        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) 
throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(size);
+        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, 
long checksum) throws Exception
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputStream dout = new DataOutputStream(out);
+        dout.writeInt(size);
+        dout.writeLong(checksum);
+        dout.write(new byte[dataSize]);
+        dout.close();
+        testRecovery(new byte[0], out.toByteArray());
+    }
+
+    protected Pair<File, File> tmpFiles() throws IOException
+    {
+        File logFile = 
File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
+        File headerFile = new 
File(CommitLogHeader.getHeaderPathFromSegmentPath(logFile.getAbsolutePath()));
+        logFile.deleteOnExit();
+        headerFile.deleteOnExit();
+        assert logFile.length() == 0;
+        assert headerFile.length() == 0;
+        return new Pair<File, File>(headerFile, logFile);
+    }
+
+    protected void testRecovery(byte[] headerData, byte[] logData) throws 
Exception
     {
-        File tmpFile = 
File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
-        tmpFile.deleteOnExit();
-        OutputStream out = new FileOutputStream(tmpFile);
-        out.write(new byte[6]);
+        Pair<File, File> tmpFiles = tmpFiles();
+        File logFile = tmpFiles.right;
+        File headerFile = tmpFiles.left;
+        OutputStream lout = new FileOutputStream(logFile);
+        OutputStream hout = new FileOutputStream(headerFile);
+        lout.write(logData);
+        hout.write(headerData);
         //statics make it annoying to test things correctly
-        CommitLog.instance().recover(new File[] {tmpFile}); //CASSANDRA-1119 
throws on failure
+        CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 / 
CASSANDRA-1179 throw on failure*/
     }
 }

Copied: 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java 
(from r956168, 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java?p2=cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java&p1=cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java&r1=956168&r2=956250&rev=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java 
Sat Jun 19 16:11:49 2010
@@ -1,46 +1,23 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
 package org.apache.cassandra.db;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.Util;
 import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 
-import org.apache.cassandra.Util;
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.db.TableTest.assertColumns;
 
-public class RecoveryManagerTest extends CleanupHelper
+public class RecoveryManager3Test extends CleanupHelper
 {
     @Test
-    public void testNothing() throws IOException {
-        // TODO nothing to recover
-        CommitLog.recover();
-    }
-
-    @Test
-    public void testOne() throws IOException, ExecutionException, 
InterruptedException
+    public void testMissingHeader() throws IOException, ExecutionException, 
InterruptedException
     {
         Table table1 = Table.open("Keyspace1");
         Table table2 = Table.open("Keyspace2");
@@ -64,6 +41,14 @@ public class RecoveryManagerTest extends
         table1.getColumnFamilyStore("Standard1").clearUnsafe();
         table2.getColumnFamilyStore("Standard3").clearUnsafe();
 
+        // nuke the header
+        for (File file : new 
File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+        {
+            if (file.getName().endsWith(".header"))
+                if (!file.delete())
+                    throw new AssertionError();
+        }
+
         CommitLog.recover();
 
         assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java 
Sat Jun 19 16:11:49 2010
@@ -34,8 +34,7 @@ import static org.apache.cassandra.db.Ta
 public class RecoveryManagerTest extends CleanupHelper
 {
     @Test
-    public void testNothing() throws IOException {
-        // TODO nothing to recover
+    public void testNothingToRecover() throws IOException {
         CommitLog.recover();
     }
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
 Sat Jun 19 16:11:49 2010
@@ -18,23 +18,13 @@
 
 package org.apache.cassandra.db.commitlog;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.Pair;
-import org.junit.Before;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 import org.junit.Test;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
+import org.apache.cassandra.SchemaLoader;
 
 public class CommitLogHeaderTest extends SchemaLoader
 {
@@ -43,41 +33,42 @@ public class CommitLogHeaderTest extends
     public void testEmptyHeader()
     {
         CommitLogHeader clh = new CommitLogHeader();
-        assert CommitLogHeader.getLowestPosition(clh) == 0;
+        assert clh.getReplayPosition() == 0;
     }
     
     @Test
     public void lowestPositionWithZero()
     {
-        // zero should never be the lowest position unless all positions are 
zero.
         CommitLogHeader clh = new CommitLogHeader();
         clh.turnOn(2, 34);
-        assert CommitLogHeader.getLowestPosition(clh) == 34;
+        assert clh.getReplayPosition() == 34;
         clh.turnOn(100, 0);
-        assert CommitLogHeader.getLowestPosition(clh) == 34;
+        assert clh.getReplayPosition() == 0;
         clh.turnOn(65, 2);
-        assert CommitLogHeader.getLowestPosition(clh) == 2;
+        assert clh.getReplayPosition() == 0;
     }
     
     @Test
     public void lowestPositionEmpty()
     {
         CommitLogHeader clh = new CommitLogHeader();
-        assert CommitLogHeader.getLowestPosition(clh) == 0;
+        assert clh.getReplayPosition() == 0;
     }
     
     @Test
     public void constantSize() throws IOException
     {
-        CommitLogHeader clh = new CommitLogHeader();
-        clh.turnOn(2, 34);
-        byte[] one = clh.toByteArray();
-        
-        clh = new CommitLogHeader();
+        CommitLogHeader clh0 = new CommitLogHeader();
+        clh0.turnOn(2, 34);
+        ByteArrayOutputStream out0 = new ByteArrayOutputStream();
+        CommitLogHeader.serializer.serialize(clh0, new DataOutputStream(out0));
+
+        CommitLogHeader clh1 = new CommitLogHeader();
         for (int i = 0; i < 5; i++)
-            clh.turnOn(i, 1000 * i);
-        byte[] two = clh.toByteArray();
-        
-        assert one.length == two.length;
+            clh1.turnOn(i, 1000 * i);
+        ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+        CommitLogHeader.serializer.serialize(clh1, new DataOutputStream(out1));
+
+        assert out0.toByteArray().length == out1.toByteArray().length;
     }
 }


Reply via email to