Author: jbellis
Date: Mon Apr 26 18:31:06 2010
New Revision: 938178

URL: http://svn.apache.org/viewvc?rev=938178&view=rev
Log:
add crc to commitlogheader
patch by jbellis; reviewed by gdusbabek for CASSANDRA-999

Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=938178&r1=938177&r2=938178&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
Mon Apr 26 18:31:06 2010
@@ -19,10 +19,13 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.io.ICompactSerializer;
@@ -195,27 +198,63 @@ class CommitLogHeader
         public void serialize(CommitLogHeader clHeader, DataOutputStream dos) 
throws IOException
         {
             assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+            Checksum checksum = new CRC32();
+
+            // write the first checksum after the fixed-size part, so we won't 
OOM allocating a bogus cfmap buffer
             dos.writeInt(clHeader.cfCount); // 4
             dos.writeInt(clHeader.serializedCfMap.length); // 4
-            dos.write(clHeader.serializedCfMap); // colMap.length
             dos.writeInt(clHeader.lastFlushedAt.size()); // 4
+            checksum.update(clHeader.cfCount);
+            checksum.update(clHeader.serializedCfMap.length);
+            checksum.update(clHeader.lastFlushedAt.size());
+            dos.writeLong(checksum.getValue());
+
+            // write the 2nd checksum after the cfmap and lastflushedat map
+            dos.write(clHeader.serializedCfMap); // colMap.length
+            checksum.update(clHeader.serializedCfMap, 0, 
clHeader.serializedCfMap.length);
             for (Map.Entry<Integer, Integer> entry : 
clHeader.lastFlushedAt.entrySet())
             {
                 dos.writeInt(entry.getKey()); // 4
+                checksum.update(entry.getKey());
                 dos.writeInt(entry.getValue()); // 4
+                checksum.update(entry.getValue());
             }
+            dos.writeLong(checksum.getValue());
         }
 
         public CommitLogHeader deserialize(DataInputStream dis) throws 
IOException
         {
-            int colCount = dis.readInt();
-            byte[] map = new byte[dis.readInt()];
-            dis.readFully(map);
-            int size = dis.readInt();
+            Checksum checksum = new CRC32();
+
+            int cfCount = dis.readInt();
+            checksum.update(cfCount);
+            int cfMapLength = dis.readInt();
+            checksum.update(cfMapLength);
+            int lastFlushedAtSize = dis.readInt();
+            checksum.update(lastFlushedAtSize);
+            if (checksum.getValue() != dis.readLong())
+            {
+                throw new IOException("Invalid or corrupt commitlog header");
+            }
+
+            byte[] cfMap = new byte[cfMapLength];
+            dis.readFully(cfMap);
+            checksum.update(cfMap, 0, cfMap.length);
             Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, 
Integer>();
-            for (int i = 0; i < size; i++)
-                lastFlushedAt.put(dis.readInt(), dis.readInt());
-            return new CommitLogHeader(lastFlushedAt, map, colCount);
+            for (int i = 0; i < lastFlushedAtSize; i++)
+            {
+                int key = dis.readInt();
+                checksum.update(key);
+                int value = dis.readInt();
+                checksum.update(value);
+                lastFlushedAt.put(key, value);
+            }
+            if (checksum.getValue() != dis.readLong())
+            {
+                throw new IOException("Invalid or corrupt commitlog header");
+            }
+
+            return new CommitLogHeader(lastFlushedAt, cfMap, cfCount);
         }
     }
 }


Reply via email to