Author: jbellis
Date: Sat Jun 19 16:11:49 2010
New Revision: 956250
URL: http://svn.apache.org/viewvc?rev=956250&view=rev
Log:
split commitlog header into separate file and add size checksum to mutations.
patch by mdennis and jbellis for CASSANDRA-1179
Added:
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
- copied, changed from r956168,
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sat Jun 19 16:11:49 2010
@@ -30,6 +30,8 @@ dev
* avoid reading large rows into memory during compaction (CASSANDRA-16)
* added hadoop OutputFormat (CASSANDRA-1101)
* efficient Streaming (no more anticompaction) (CASSANDRA-579)
+ * split commitlog header into separate file and add size checksum to
+ mutations (CASSANDRA-1179)
0.6.3
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Sat Jun 19 16:11:49 2010
@@ -880,7 +880,7 @@ public class DatabaseDescriptor
return dataFileDirectory;
}
- public static String getLogFileLocation()
+ public static String getCommitLogLocation()
{
return conf.commitlog_directory;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Sat Jun 19 16:11:49 2010
@@ -155,9 +155,8 @@ public class CommitLog
public static void recover() throws IOException
{
- String directory = DatabaseDescriptor.getLogFileLocation();
- File file = new File(directory);
- File[] files = file.listFiles(new FilenameFilter()
+ String directory = DatabaseDescriptor.getCommitLogLocation();
+ File[] files = new File(directory).listFiles(new FilenameFilter()
{
public boolean accept(File dir, String name)
{
@@ -170,7 +169,11 @@ public class CommitLog
Arrays.sort(files, new FileUtils.FileComparator());
logger.info("Replaying " + StringUtils.join(files, ", "));
recover(files);
- FileUtils.delete(files);
+ for (File f : files)
+ {
+
FileUtils.delete(CommitLogHeader.getHeaderPathFromSegmentPath(f.getAbsolutePath()));
// may not actually exist
+ FileUtils.deleteWithConfirm(f);
+ }
logger.info("Log replay complete");
}
@@ -180,28 +183,24 @@ public class CommitLog
final AtomicInteger counter = new AtomicInteger(0);
for (File file : clogs)
{
+ CommitLogHeader clHeader = null;
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
BufferedRandomAccessFile reader = new
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
- final CommitLogHeader clHeader;
+ int replayPosition = 0;
try
{
- clHeader = CommitLogHeader.readCommitLogHeader(reader);
+ clHeader =
CommitLogHeader.readCommitLogHeader(CommitLogHeader.getHeaderPathFromSegmentPath(file.getAbsolutePath()));
+ replayPosition = clHeader.getReplayPosition();
}
- catch (EOFException eofe)
+ catch (IOException ioe)
{
- logger.info("Attempted to recover an incomplete
CommitLogHeader. Everything is ok, don't panic.");
- continue;
+ logger.info("Attempted to read an incomplete, missing or
corrupt CommitLogHeader. Everything is ok, don't panic. CommitLog will be
replayed from the beginning", ioe);
}
+ reader.seek(replayPosition);
- /* seek to the lowest position where any CF has non-flushed data */
- int lowPos = CommitLogHeader.getLowestPosition(clHeader);
- if (lowPos == 0)
- break;
-
- reader.seek(lowPos);
if (logger.isDebugEnabled())
- logger.debug("Replaying " + file + " starting at " + lowPos);
+ logger.debug("Replaying " + file + " starting at " +
reader.getFilePointer());
/* read the logs populate RowMutation and apply */
while (!reader.isEOF())
@@ -211,29 +210,36 @@ public class CommitLog
long claimedCRC32;
byte[] bytes;
+
+ Checksum checksum = new CRC32();
try
{
- bytes = new byte[reader.readInt()]; // readInt can throw
EOFException too
+ // any of the reads may hit EOF
+ int size = reader.readInt();
+ long claimedSizeChecksum = reader.readLong();
+ checksum.update(size);
+ if (checksum.getValue() != claimedSizeChecksum || size <=
0)
+ break; // entry wasn't synced correctly/fully. that's
ok.
+
+ bytes = new byte[size];
reader.readFully(bytes);
claimedCRC32 = reader.readLong();
}
- catch (EOFException e)
+ catch(EOFException eof)
{
- // last CL entry didn't get completely written. that's ok.
- break;
+ break; // last CL entry didn't get completely written.
that's ok.
}
- ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
- Checksum checksum = new CRC32();
checksum.update(bytes, 0, bytes.length);
if (claimedCRC32 != checksum.getValue())
{
- // this part of the log must not have been fsynced.
probably the rest is bad too,
- // but just in case there is no harm in trying them.
+ // this entry must not have been fsynced. probably the
rest is bad too,
+ // but just in case there is no harm in trying them (since
we still read on an entry boundary)
continue;
}
/* deserialize the commit log entry */
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
final RowMutation rm =
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
if (logger.isDebugEnabled())
logger.debug(String.format("replaying mutation for %s.%s:
%s",
@@ -244,6 +250,7 @@ public class CommitLog
tablesRecovered.add(table);
final Collection<ColumnFamily> columnFamilies = new
ArrayList<ColumnFamily>(rm.getColumnFamilies());
final long entryLocation = reader.getFilePointer();
+ final CommitLogHeader finalHeader = clHeader;
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException
@@ -259,7 +266,7 @@ public class CommitLog
// null means the cf has been dropped
continue;
- if (clHeader.isDirty(columnFamily.id()) &&
entryLocation >= clHeader.getPosition(columnFamily.id()))
+ if (finalHeader == null ||
(finalHeader.isDirty(columnFamily.id()) && entryLocation >=
finalHeader.getPosition(columnFamily.id())))
newRm.add(columnFamily);
}
if (!newRm.isEmpty())
@@ -424,6 +431,7 @@ public class CommitLog
{
logger.info("Discarding obsolete commit log:" + segment);
segment.close();
+ DeletionService.submitDelete(segment.getHeaderPath());
DeletionService.submitDelete(segment.getPath());
// usually this will be the first (remaining) segment, but not
always, if segment A contains
// writes to a CF that is unflushed but is followed by segment
B whose CFs are all flushed.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Sat Jun 19 16:11:49 2010
@@ -19,39 +19,30 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
-import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.Pair;
-
-class CommitLogHeader
-{
- static CommitLogHeaderSerializer serializer = new
CommitLogHeaderSerializer();
+import org.apache.cassandra.io.ICompactSerializer2;
- static int getLowestPosition(CommitLogHeader clheader)
+public class CommitLogHeader
+{
+ public static String getHeaderPathFromSegment(CommitLogSegment segment)
{
- return clheader.lastFlushedAt.size() == 0 ? 0 :
Collections.min(clheader.lastFlushedAt.values(), new Comparator<Integer>(){
- public int compare(Integer o1, Integer o2)
- {
- if (o1 == 0)
- return 1;
- else if (o2 == 0)
- return -1;
- else
- return o1 - o2;
- }
- });
+ return getHeaderPathFromSegmentPath(segment.getPath());
+ }
+
+ public static String getHeaderPathFromSegmentPath(String segmentPath)
+ {
+ return segmentPath + ".header";
}
- private Map<Integer, Integer> lastFlushedAt; // position at which each CF
was last flushed
+ public static CommitLogHeaderSerializer serializer = new
CommitLogHeaderSerializer();
+
+ private Map<Integer, Integer> cfDirtiedAt; // position at which each CF
was last flushed
private final int cfCount; // we keep this in case cfcount changes in the
interim (size of lastFlushedAt is not a good indication).
CommitLogHeader()
@@ -64,46 +55,37 @@ class CommitLogHeader
* also builds an index of position to column family
* Id.
*/
- private CommitLogHeader(Map<Integer, Integer> lastFlushedAt, int cfCount)
+ private CommitLogHeader(Map<Integer, Integer> cfDirtiedAt, int cfCount)
{
this.cfCount = cfCount;
- this.lastFlushedAt = lastFlushedAt;
- assert lastFlushedAt.size() <= cfCount;
+ this.cfDirtiedAt = cfDirtiedAt;
+ assert cfDirtiedAt.size() <= cfCount;
}
boolean isDirty(int cfId)
{
- return lastFlushedAt.containsKey(cfId);
+ return cfDirtiedAt.containsKey(cfId);
}
int getPosition(int index)
{
- Integer x = lastFlushedAt.get(index);
+ Integer x = cfDirtiedAt.get(index);
return x == null ? 0 : x;
}
void turnOn(int cfId, long position)
{
- lastFlushedAt.put(cfId, (int)position);
+ cfDirtiedAt.put(cfId, (int)position);
}
void turnOff(int cfId)
{
- lastFlushedAt.remove(cfId);
+ cfDirtiedAt.remove(cfId);
}
boolean isSafeToDelete() throws IOException
{
- return lastFlushedAt.isEmpty();
- }
-
- byte[] toByteArray() throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- serializer.serialize(this, dos);
- dos.flush();
- return bos.toByteArray();
+ return cfDirtiedAt.isEmpty();
}
// we use cf ids. getting the cf names would be pretty pretty expensive.
@@ -111,7 +93,7 @@ class CommitLogHeader
{
StringBuilder sb = new StringBuilder("");
sb.append("CLH(dirty+flushed={");
- for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
+ for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
{
sb.append(entry.getKey()).append(":
").append(entry.getValue()).append(", ");
}
@@ -122,36 +104,67 @@ class CommitLogHeader
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<Integer, Integer> entry : lastFlushedAt.entrySet())
+ for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
sb.append(entry.getKey()).append(", ");
return sb.toString();
}
- static CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile
logReader) throws IOException
+ static void writeCommitLogHeader(CommitLogHeader header, String
headerFile) throws IOException
+ {
+ DataOutputStream out = null;
+ try
+ {
+ /*
+ * FileOutputStream doesn't sync on flush/close.
+ * As headers are "optional" now there is no reason to sync it.
+ * This provides nearly double the performance of BRAF, more under
heavey load.
+ */
+ out = new DataOutputStream(new FileOutputStream(headerFile));
+ serializer.serialize(header, out);
+ }
+ finally
+ {
+ if (out != null)
+ out.close();
+ }
+ }
+
+ static CommitLogHeader readCommitLogHeader(String headerFile) throws
IOException
+ {
+ DataInputStream reader = null;
+ try
+ {
+ reader = new DataInputStream(new FileInputStream(headerFile));
+ return serializer.deserialize(reader);
+ }
+ finally
+ {
+ if (reader != null)
+ reader.close();
+ }
+ }
+
+ int getReplayPosition()
{
- int statedSize = logReader.readInt();
- byte[] bytes = new byte[statedSize];
- logReader.readFully(bytes);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- return serializer.deserialize(new DataInputStream(byteStream));
+ return cfDirtiedAt.isEmpty() ? 0 :
Collections.min(cfDirtiedAt.values());
}
- static class CommitLogHeaderSerializer implements
ICompactSerializer<CommitLogHeader>
+ static class CommitLogHeaderSerializer implements
ICompactSerializer2<CommitLogHeader>
{
- public void serialize(CommitLogHeader clHeader, DataOutputStream dos)
throws IOException
+ public void serialize(CommitLogHeader clHeader, DataOutput dos) throws
IOException
{
- assert clHeader.lastFlushedAt.size() <= clHeader.cfCount;
+ assert clHeader.cfDirtiedAt.size() <= clHeader.cfCount;
Checksum checksum = new CRC32();
// write the first checksum after the fixed-size part, so we won't
read garbage lastFlushedAt data.
dos.writeInt(clHeader.cfCount); // 4
- dos.writeInt(clHeader.lastFlushedAt.size()); // 4
+ dos.writeInt(clHeader.cfDirtiedAt.size()); // 4
checksum.update(clHeader.cfCount);
- checksum.update(clHeader.lastFlushedAt.size());
+ checksum.update(clHeader.cfDirtiedAt.size());
dos.writeLong(checksum.getValue());
// write the 2nd checksum after the lastflushedat map
- for (Map.Entry<Integer, Integer> entry :
clHeader.lastFlushedAt.entrySet())
+ for (Map.Entry<Integer, Integer> entry :
clHeader.cfDirtiedAt.entrySet())
{
dos.writeInt(entry.getKey()); // 4
checksum.update(entry.getKey());
@@ -161,14 +174,14 @@ class CommitLogHeader
dos.writeLong(checksum.getValue());
// keep the size constant by padding for missing flushed-at
entries. these do not affect checksum.
- for (int i = clHeader.lastFlushedAt.entrySet().size(); i <
clHeader.cfCount; i++)
+ for (int i = clHeader.cfDirtiedAt.entrySet().size(); i <
clHeader.cfCount; i++)
{
dos.writeInt(0);
dos.writeInt(0);
}
}
- public CommitLogHeader deserialize(DataInputStream dis) throws
IOException
+ public CommitLogHeader deserialize(DataInput dis) throws IOException
{
Checksum checksum = new CRC32();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Sat Jun 19 16:11:49 2010
@@ -1,6 +1,4 @@
-package org.apache.cassandra.db.commitlog;
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,6 +18,7 @@ package org.apache.cassandra.db.commitlo
*
*/
+package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOError;
@@ -27,15 +26,13 @@ import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import org.apache.cassandra.config.CFMetaData;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -49,13 +46,13 @@ public class CommitLogSegment
public CommitLogSegment()
{
this.header = new CommitLogHeader();
- String logFile = DatabaseDescriptor.getLogFileLocation() +
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
+ String logFile = DatabaseDescriptor.getCommitLogLocation() +
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
logger.info("Creating new commitlog segment " + logFile);
try
{
logWriter = createWriter(logFile);
- writeCommitLogHeader(header.toByteArray());
+ writeHeader();
}
catch (IOException e)
{
@@ -70,25 +67,7 @@ public class CommitLogSegment
public void writeHeader() throws IOException
{
- seekAndWriteCommitLogHeader(header.toByteArray());
- }
-
- /** writes header at the beginning of the file, then seeks back to current
position */
- void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
- {
- long currentPos = logWriter.getFilePointer();
- logWriter.seek(0);
-
- writeCommitLogHeader(bytes);
-
- logWriter.seek(currentPos);
- }
-
- private void writeCommitLogHeader(byte[] bytes) throws IOException
- {
- logWriter.writeInt(bytes.length);
- logWriter.write(bytes);
- logWriter.sync();
+ CommitLogHeader.writeCommitLogHeader(header, getHeaderPath());
}
private static BufferedRandomAccessFile createWriter(String file) throws
IOException
@@ -121,29 +100,30 @@ public class CommitLogSegment
if (!header.isDirty(id))
{
header.turnOn(id, logWriter.getFilePointer());
- seekAndWriteCommitLogHeader(header.toByteArray());
+ writeHeader();
}
}
}
- // write mutation, w/ checksum
- Checksum checkum = new CRC32();
+ // write mutation, w/ checksum on the size and data
+ byte[] bytes;
+ Checksum checksum = new CRC32();
if (serializedRow instanceof DataOutputBuffer)
{
- DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
- logWriter.writeInt(buffer.getLength());
- logWriter.write(buffer.getData(), 0, buffer.getLength());
- checkum.update(buffer.getData(), 0, buffer.getLength());
+ bytes = ((DataOutputBuffer) serializedRow).getData();
}
else
{
assert serializedRow instanceof byte[];
- byte[] bytes = (byte[]) serializedRow;
- logWriter.writeInt(bytes.length);
- logWriter.write(bytes);
- checkum.update(bytes, 0, bytes.length);
+ bytes = (byte[]) serializedRow;
}
- logWriter.writeLong(checkum.getValue());
+
+ checksum.update(bytes.length);
+ logWriter.writeInt(bytes.length);
+ logWriter.writeLong(checksum.getValue());
+ logWriter.write(bytes);
+ checksum.update(bytes, 0, bytes.length);
+ logWriter.writeLong(checksum.getValue());
return cLogCtx;
}
@@ -175,6 +155,11 @@ public class CommitLogSegment
return logWriter.getPath();
}
+ public String getHeaderPath()
+ {
+ return CommitLogHeader.getHeaderPathFromSegment(this);
+ }
+
public long length()
{
try
Modified: cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Sat Jun
19 16:11:49 2010
@@ -45,7 +45,7 @@ public class CleanupHelper extends Schem
{
// clean up commitlog
String[] directoryNames = {
- DatabaseDescriptor.getLogFileLocation(),
+ DatabaseDescriptor.getCommitLogLocation(),
};
for (String dirName : directoryNames)
{
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Sat
Jun 19 16:11:49 2010
@@ -16,18 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.cassandra.db;
+import java.io.*;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.junit.Test;
+
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.junit.Test;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.db.commitlog.CommitLogHeader;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.Pair;
public class CommitLogTest extends CleanupHelper
{
@@ -63,13 +67,119 @@ public class CommitLogTest extends Clean
}
@Test
- public void testRecoveryWithPartiallyWrittenHeader() throws Exception
+ public void testRecoveryWithEmptyHeader() throws Exception
+ {
+ testRecovery(new byte[0], new byte[10]);
+ }
+
+ @Test
+ public void testRecoveryWithShortHeader() throws Exception
+ {
+ testRecovery(new byte[2], new byte[10]);
+ }
+
+ @Test
+ public void testRecoveryWithGarbageHeader() throws Exception
+ {
+ byte[] garbage = new byte[100];
+ (new java.util.Random()).nextBytes(garbage);
+ testRecovery(garbage, garbage);
+ }
+
+ @Test
+ public void testRecoveryWithEmptyLog() throws Exception
+ {
+ CommitLog.recover(new File[] {tmpFiles().right});
+ }
+
+ @Test
+ public void testRecoveryWithShortLog() throws Exception
+ {
+ // force EOF while reading log
+ testRecoveryWithBadSizeArgument(100, 10);
+ }
+
+ @Test
+ public void testRecoveryWithShortSize() throws Exception
+ {
+ testRecovery(new byte[0], new byte[2]);
+ }
+
+ @Test
+ public void testRecoveryWithShortCheckSum() throws Exception
+ {
+ testRecovery(new byte[0], new byte[6]);
+ }
+
+ @Test
+ public void testRecoveryWithGarbageLog() throws Exception
+ {
+ byte[] garbage = new byte[100];
+ (new java.util.Random()).nextBytes(garbage);
+ testRecovery(new byte[0], garbage);
+ }
+
+ @Test
+ public void testRecoveryWithBadSizeChecksum() throws Exception
+ {
+ Checksum checksum = new CRC32();
+ checksum.update(100);
+ testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+ }
+
+ @Test
+ public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
+ {
+ // many different combinations of 4 bytes (garbage) will be read as
zero by readInt()
+ testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
+ }
+
+ @Test
+ public void testRecoveryWithNegativeSizeArgument() throws Exception
+ {
+ // garbage from a partial/bad flush could be read as a negative size
even if there is no EOF
+ testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+ }
+
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize)
throws Exception
+ {
+ Checksum checksum = new CRC32();
+ checksum.update(size);
+ testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+ }
+
+ protected void testRecoveryWithBadSizeArgument(int size, int dataSize,
long checksum) throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dout = new DataOutputStream(out);
+ dout.writeInt(size);
+ dout.writeLong(checksum);
+ dout.write(new byte[dataSize]);
+ dout.close();
+ testRecovery(new byte[0], out.toByteArray());
+ }
+
+ protected Pair<File, File> tmpFiles() throws IOException
+ {
+ File logFile =
File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
+ File headerFile = new
File(CommitLogHeader.getHeaderPathFromSegmentPath(logFile.getAbsolutePath()));
+ logFile.deleteOnExit();
+ headerFile.deleteOnExit();
+ assert logFile.length() == 0;
+ assert headerFile.length() == 0;
+ return new Pair<File, File>(headerFile, logFile);
+ }
+
+ protected void testRecovery(byte[] headerData, byte[] logData) throws
Exception
{
- File tmpFile =
File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
- tmpFile.deleteOnExit();
- OutputStream out = new FileOutputStream(tmpFile);
- out.write(new byte[6]);
+ Pair<File, File> tmpFiles = tmpFiles();
+ File logFile = tmpFiles.right;
+ File headerFile = tmpFiles.left;
+ OutputStream lout = new FileOutputStream(logFile);
+ OutputStream hout = new FileOutputStream(headerFile);
+ lout.write(logData);
+ hout.write(headerData);
//statics make it annoying to test things correctly
- CommitLog.instance().recover(new File[] {tmpFile}); //CASSANDRA-1119
throws on failure
+ CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 /
CASSANDRA-1179 throw on failure*/
}
}
Copied:
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
(from r956168,
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java?p2=cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java&p1=cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java&r1=956168&r2=956250&rev=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
Sat Jun 19 16:11:49 2010
@@ -1,46 +1,23 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
package org.apache.cassandra.db;
+import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.Util;
import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.Util;
import static org.apache.cassandra.Util.column;
import static org.apache.cassandra.db.TableTest.assertColumns;
-public class RecoveryManagerTest extends CleanupHelper
+public class RecoveryManager3Test extends CleanupHelper
{
@Test
- public void testNothing() throws IOException {
- // TODO nothing to recover
- CommitLog.recover();
- }
-
- @Test
- public void testOne() throws IOException, ExecutionException,
InterruptedException
+ public void testMissingHeader() throws IOException, ExecutionException,
InterruptedException
{
Table table1 = Table.open("Keyspace1");
Table table2 = Table.open("Keyspace2");
@@ -64,6 +41,14 @@ public class RecoveryManagerTest extends
table1.getColumnFamilyStore("Standard1").clearUnsafe();
table2.getColumnFamilyStore("Standard3").clearUnsafe();
+ // nuke the header
+ for (File file : new
File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ {
+ if (file.getName().endsWith(".header"))
+ if (!file.delete())
+ throw new AssertionError();
+ }
+
CommitLog.recover();
assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
Sat Jun 19 16:11:49 2010
@@ -34,8 +34,7 @@ import static org.apache.cassandra.db.Ta
public class RecoveryManagerTest extends CleanupHelper
{
@Test
- public void testNothing() throws IOException {
- // TODO nothing to recover
+ public void testNothingToRecover() throws IOException {
CommitLog.recover();
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java?rev=956250&r1=956249&r2=956250&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/commitlog/CommitLogHeaderTest.java
Sat Jun 19 16:11:49 2010
@@ -18,23 +18,13 @@
package org.apache.cassandra.db.commitlog;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.Pair;
-import org.junit.Before;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.junit.Test;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
+import org.apache.cassandra.SchemaLoader;
public class CommitLogHeaderTest extends SchemaLoader
{
@@ -43,41 +33,42 @@ public class CommitLogHeaderTest extends
public void testEmptyHeader()
{
CommitLogHeader clh = new CommitLogHeader();
- assert CommitLogHeader.getLowestPosition(clh) == 0;
+ assert clh.getReplayPosition() == 0;
}
@Test
public void lowestPositionWithZero()
{
- // zero should never be the lowest position unless all positions are
zero.
CommitLogHeader clh = new CommitLogHeader();
clh.turnOn(2, 34);
- assert CommitLogHeader.getLowestPosition(clh) == 34;
+ assert clh.getReplayPosition() == 34;
clh.turnOn(100, 0);
- assert CommitLogHeader.getLowestPosition(clh) == 34;
+ assert clh.getReplayPosition() == 0;
clh.turnOn(65, 2);
- assert CommitLogHeader.getLowestPosition(clh) == 2;
+ assert clh.getReplayPosition() == 0;
}
@Test
public void lowestPositionEmpty()
{
CommitLogHeader clh = new CommitLogHeader();
- assert CommitLogHeader.getLowestPosition(clh) == 0;
+ assert clh.getReplayPosition() == 0;
}
@Test
public void constantSize() throws IOException
{
- CommitLogHeader clh = new CommitLogHeader();
- clh.turnOn(2, 34);
- byte[] one = clh.toByteArray();
-
- clh = new CommitLogHeader();
+ CommitLogHeader clh0 = new CommitLogHeader();
+ clh0.turnOn(2, 34);
+ ByteArrayOutputStream out0 = new ByteArrayOutputStream();
+ CommitLogHeader.serializer.serialize(clh0, new DataOutputStream(out0));
+
+ CommitLogHeader clh1 = new CommitLogHeader();
for (int i = 0; i < 5; i++)
- clh.turnOn(i, 1000 * i);
- byte[] two = clh.toByteArray();
-
- assert one.length == two.length;
+ clh1.turnOn(i, 1000 * i);
+ ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+ CommitLogHeader.serializer.serialize(clh1, new DataOutputStream(out1));
+
+ assert out0.toByteArray().length == out1.toByteArray().length;
}
}