Author: jbellis
Date: Wed Apr 28 19:05:10 2010
New Revision: 939053
URL: http://svn.apache.org/viewvc?rev=939053&view=rev
Log:
redo "add crc to commitlogheader," with fix for header serialization.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-999
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=939053&r1=939052&r2=939053&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Wed Apr 28 19:05:10 2010
@@ -19,10 +19,13 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.io.ICompactSerializer;
@@ -31,7 +34,7 @@ import org.apache.cassandra.utils.Pair;
class CommitLogHeader
{
- private static CommitLogHeaderSerializer serializer = new
CommitLogHeaderSerializer();
+ static CommitLogHeaderSerializer serializer = new
CommitLogHeaderSerializer();
static int getLowestPosition(CommitLogHeader clheader)
{
@@ -52,7 +55,6 @@ class CommitLogHeader
private final byte[] serializedCfMap; // serialized. only needed during
commit log recovery.
private final int cfCount; // we keep this in case cfcount changes in the
interim (size of lastFlushedAt is not a good indication).
- private transient final int maxSerializedSize;
private transient Map<Pair<String, String>, Integer> cfIdMap; // only
needed during recovery. created from this.serializedCfMap.
CommitLogHeader()
@@ -71,8 +73,6 @@ class CommitLogHeader
this.lastFlushedAt = lastFlushedAt;
this.serializedCfMap = serializedCfMap;
assert lastFlushedAt.size() <= cfCount;
- // (size of lastFlushedAt) + (size of map buf) + (size of cfCount int)
- maxSerializedSize = (8 * cfCount + 4) + (serializedCfMap.length + 4) +
(4);
}
boolean isDirty(int cfId)
@@ -150,14 +150,11 @@ class CommitLogHeader
byte[] toByteArray() throws IOException
{
- ByteArrayOutputStream bos = new
ByteArrayOutputStream(maxSerializedSize);
- DataOutputStream dos = new DataOutputStream(bos);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
serializer.serialize(this, dos);
- byte[] src = bos.toByteArray();
- assert src.length <= maxSerializedSize;
- byte[] dst = new byte[maxSerializedSize];
- System.arraycopy(src, 0, dst, 0, src.length);
- return dst;
+ dos.flush();
+ return bos.toByteArray();
}
// we use cf ids. getting the cf names would be pretty pretty expensive.
@@ -195,27 +192,70 @@ class CommitLogHeader
public void serialize(CommitLogHeader clHeader, DataOutputStream dos)
throws IOException
{
assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+ Checksum checksum = new CRC32();
+
+ // write the first checksum after the fixed-size part, so we won't
OOM allocating a bogus cfmap buffer
dos.writeInt(clHeader.cfCount); // 4
dos.writeInt(clHeader.serializedCfMap.length); // 4
- dos.write(clHeader.serializedCfMap); // colMap.length
dos.writeInt(clHeader.lastFlushedAt.size()); // 4
+ checksum.update(clHeader.cfCount);
+ checksum.update(clHeader.serializedCfMap.length);
+ checksum.update(clHeader.lastFlushedAt.size());
+ dos.writeLong(checksum.getValue());
+
+ // write the 2nd checksum after the cfmap and lastflushedat map
+ dos.write(clHeader.serializedCfMap); // colMap.length
+ checksum.update(clHeader.serializedCfMap, 0,
clHeader.serializedCfMap.length);
for (Map.Entry<Integer, Integer> entry :
clHeader.lastFlushedAt.entrySet())
{
dos.writeInt(entry.getKey()); // 4
+ checksum.update(entry.getKey());
dos.writeInt(entry.getValue()); // 4
+ checksum.update(entry.getValue());
+ }
+ dos.writeLong(checksum.getValue());
+
+ // keep the size constant by padding for missing flushed-at
entries. these do not affect checksum.
+ for (int i = clHeader.lastFlushedAt.entrySet().size(); i <
clHeader.cfCount; i++)
+ {
+ dos.writeInt(0);
+ dos.writeInt(0);
}
}
public CommitLogHeader deserialize(DataInputStream dis) throws
IOException
{
- int colCount = dis.readInt();
- byte[] map = new byte[dis.readInt()];
- dis.readFully(map);
- int size = dis.readInt();
+ Checksum checksum = new CRC32();
+
+ int cfCount = dis.readInt();
+ checksum.update(cfCount);
+ int cfMapLength = dis.readInt();
+ checksum.update(cfMapLength);
+ int lastFlushedAtSize = dis.readInt();
+ checksum.update(lastFlushedAtSize);
+ if (checksum.getValue() != dis.readLong())
+ {
+ throw new IOException("Invalid or corrupt commitlog header");
+ }
+
+ byte[] cfMap = new byte[cfMapLength];
+ dis.readFully(cfMap);
+ checksum.update(cfMap, 0, cfMap.length);
Map<Integer, Integer> lastFlushedAt = new HashMap<Integer,
Integer>();
- for (int i = 0; i < size; i++)
- lastFlushedAt.put(dis.readInt(), dis.readInt());
- return new CommitLogHeader(lastFlushedAt, map, colCount);
+ for (int i = 0; i < lastFlushedAtSize; i++)
+ {
+ int key = dis.readInt();
+ checksum.update(key);
+ int value = dis.readInt();
+ checksum.update(value);
+ lastFlushedAt.put(key, value);
+ }
+ if (checksum.getValue() != dis.readLong())
+ {
+ throw new IOException("Invalid or corrupt commitlog header");
+ }
+
+ return new CommitLogHeader(lastFlushedAt, cfMap, cfCount);
}
}
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=939053&r1=939052&r2=939053&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Wed Apr 28 19:05:10 2010
@@ -80,6 +80,16 @@ public class CommitLogHeaderTest extends
assert one.length == two.length;
}
+
+ @Test
+ public void byteArray() throws IOException
+ {
+ SchemaLoader.loadSchemaFromYaml();
+ CommitLogHeader clh = new CommitLogHeader();
+ assert clh.getCfIdMap().size() > 0;
+ CommitLogHeader clh2 = CommitLogHeader.serializer.deserialize(new
DataInputStream(new ByteArrayInputStream(clh.toByteArray())));
+ assert clh.getCfIdMap().equals(clh2.getCfIdMap());
+ }
@Test
public void cfMapSerialization() throws IOException