Author: jbellis
Date: Thu Jun 18 20:22:14 2009
New Revision: 786244
URL: http://svn.apache.org/viewvc?rev=786244&view=rev
Log:
update comments, perform some renames, r/m unused code.
patch by jbellis; reviewed by goffinet for CASSANDRA-237
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogEntry.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786244&r1=786243&r2=786244&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Thu Jun 18 20:22:14 2009
@@ -43,19 +43,26 @@
* header represented by the abstraction CommitLogHeader. The header
* contains a bit array and an array of longs and both the arrays are
* of size, #column families for the Table, the Commit Log represents.
+ *
* Whenever a ColumnFamily is written to, for the first time its bit flag
* is set to one in the CommitLogHeader. When it is flushed to disk by the
* Memtable its corresponding bit in the header is set to zero. This helps
* track which CommitLogs can be thrown away as a result of Memtable flushes.
- * However if a ColumnFamily is flushed and again written to disk then its
+ * Additionally, when a ColumnFamily is flushed and written to disk, its
* entry in the array of longs is updated with the offset in the Commit Log
* file where it was written. This helps speed up recovery since we can seek
* to these offsets and start processing the commit log.
- * Every Commit Log is rolled over everytime it reaches its threshold in size.
+ *
+ * Every Commit Log is rolled over everytime it reaches its threshold in size;
+ * the new log inherits the "dirty" bits from the old.
+ *
* Over time there could be a number of commit logs that would be generated.
- * Hovever whenever we flush a column family disk and update its bit flag we
- * take this bit array and bitwise & it with the headers of the other commit
- * logs that are older.
+ * To allow cleaning up non-active commit logs, whenever we flush a column
family and update its bit flag in
+ * the active CL, we take the dirty bit array and bitwise & it with the
headers of the older logs.
+ * If the result is 0, then it is safe to remove the older file. (Since the
new CL
+ * inherited the old's dirty bitflags, getting a zero for any given bit in the
anding
+ * means that either the CF was clean in the old CL or it has been flushed
since the
+ * switch in the new.)
*
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
*/
@@ -66,34 +73,24 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(CommitLog.class);
private static Map<String, CommitLogHeader> clHeaders_ = new
HashMap<String, CommitLogHeader>();
-
+
public static final class CommitLogContext
{
static CommitLogContext NULL = new CommitLogContext(null, -1L);
/* Commit Log associated with this operation */
- private String file_;
+ public final String file;
/* Offset within the Commit Log where this row as added */
- private long position_;
+ public final long position;
public CommitLogContext(String file, long position)
{
- file_ = file;
- position_ = position;
+ this.file = file;
+ this.position = position;
}
boolean isValidContext()
{
- return (position_ != -1L);
- }
-
- String file()
- {
- return file_;
- }
-
- long position()
- {
- return position_;
+ return (position != -1L);
}
}
@@ -193,13 +190,8 @@
*/
private void setNextFileName()
{
- logFile_ = DatabaseDescriptor.getLogFileLocation() +
- System.getProperty("file.separator") +
- "CommitLog-" +
- table_ +
- "-" +
- System.currentTimeMillis() +
- ".log";
+ logFile_ = DatabaseDescriptor.getLogFileLocation() +
System.getProperty("file.separator") +
+ "CommitLog-" + table_ + "-" + System.currentTimeMillis() +
".log";
}
/*
@@ -316,17 +308,6 @@
doRecovery(filesNeeded, header);
}
- private void printHeader(byte[] header)
- {
- StringBuilder sb = new StringBuilder("");
- for ( byte b : header )
- {
- sb.append(b);
- sb.append(" ");
- }
- logger_.debug(sb.toString());
- }
-
private void doRecovery(Stack<File> filesNeeded, byte[] header) throws
IOException
{
Table table = Table.open(table_);
@@ -395,19 +376,19 @@
}
catch ( IOException e )
{
- logger_.debug( LogUtil.throwableToString(e) );
+ logger_.error("Unexpected error reading " +
file.getName() + "; attempting to continue with the next entry", e);
}
}
reader.close();
- /* apply the rows read */
+ /* apply the rows read -- success will result in the CL file
being discarded */
table.flush(true);
}
- catch ( Throwable th )
+ catch (Throwable th)
{
- logger_.info( LogUtil.throwableToString(th) );
+ logger_.error("Fatal error reading " + file.getName(), th);
/* close the reader and delete this commit log. */
reader.close();
- FileUtils.delete( new File[]{file} );
+ FileUtils.delete(new File[]{ file });
}
}
}
@@ -449,8 +430,7 @@
long currentPosition = -1L;
CommitLogContext cLogCtx = null;
DataOutputBuffer cfBuffer = new DataOutputBuffer();
- long fileSize = 0L;
-
+
try
{
/* serialize the row */
@@ -494,11 +474,7 @@
/*
* Check if old commit logs can be deleted. However we cannot
* do this anymore in the Fast Sync mode and hence I think we
- * should get rid of Fast Sync mode altogether. If there is
- * a pathological event where few CF's are rarely being updated
- * then their Memtable never gets flushed.
- * This will prevent commit logs from being deleted. WE NEED to
- * fix this using some hueristic and force flushing such Memtables.
+ * should get rid of Fast Sync mode altogether.
*
* param @ cLogCtx The commitLog context .
* param @ id id of the columnFamily being flushed to disk.
@@ -507,14 +483,14 @@
private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws
IOException
{
/* retrieve the commit log header associated with the file in the
context */
- CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file());
+ CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
if(commitLogHeader == null )
{
- if( logFile_.equals(cLogCtx.file()) )
+ if( logFile_.equals(cLogCtx.file) )
{
/* this means we are dealing with the current commit log. */
commitLogHeader = clHeader_;
- clHeaders_.put(cLogCtx.file(), clHeader_);
+ clHeaders_.put(cLogCtx.file, clHeader_);
}
else
return;
@@ -525,7 +501,7 @@
* flush. Right now this cannot happen since Memtables are flushed on
a single
* thread.
*/
- if ( cLogCtx.position() < commitLogHeader.getPosition(id) )
+ if ( cLogCtx.position < commitLogHeader.getPosition(id) )
return;
commitLogHeader.turnOff(id);
/* Sort the commit logs based on creation time */
@@ -542,7 +518,7 @@
*/
for(String oldFile : oldFiles)
{
- if(oldFile.equals(cLogCtx.file()))
+ if(oldFile.equals(cLogCtx.file))
{
/*
* We need to turn on again. This is because we always keep
@@ -550,8 +526,8 @@
* commit log needs to be read. When a flush occurs we turn off
* perform & operation and then turn on with the new position.
*/
- commitLogHeader.turnOn(id, cLogCtx.position());
- writeCommitLogHeader(cLogCtx.file(),
commitLogHeader.toByteArray());
+ commitLogHeader.turnOn(id, cLogCtx.position);
+ writeCommitLogHeader(cLogCtx.file,
commitLogHeader.toByteArray());
break;
}
else
@@ -577,66 +553,30 @@
}
}
- private void checkThresholdAndRollLog()
+ private void checkThresholdAndRollLog() throws IOException
{
- try
- {
- if (logWriter_.getFileSize() >= SEGMENT_SIZE)
- {
- /* Rolls the current log file over to a new one. */
- setNextFileName();
- String oldLogFile = logWriter_.getFileName();
- //history_.add(oldLogFile);
- logWriter_.close();
-
- /* point reader/writer to a new commit log file. */
- // logWriter_ = SequenceFile.writer(logFile_);
- logWriter_ = CommitLog.createWriter(logFile_);
- /* squirrel away the old commit log header */
- clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
- /*
- * We need to zero out positions because the positions in
- * the old file do not make sense in the new one.
- */
- clHeader_.zeroPositions();
- writeCommitLogHeader(clHeader_.toByteArray(), false);
- // Get the list of files in commit log directory if it is
greater than a certain number
- // Force flush all the column families that way we ensure that
a slowly populated column family is not screwing up
- // by accumulating the commit logs .
- }
- }
- catch (IOException e)
+ if (logWriter_.getFileSize() >= SEGMENT_SIZE)
{
- logger_.info(LogUtil.throwableToString(e));
- }
- }
-
- public static void main(String[] args) throws Throwable
- {
- LogUtil.init();
-
- File logDir = new File(DatabaseDescriptor.getLogFileLocation());
- File[] files = logDir.listFiles();
- Arrays.sort( files, new FileUtils.FileComparator() );
+ /* Rolls the current log file over to a new one. */
+ setNextFileName();
+ String oldLogFile = logWriter_.getFileName();
+ //history_.add(oldLogFile);
+ logWriter_.close();
- byte[] bytes = new
byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
- for ( File file : files )
- {
- CommitLog clog = new CommitLog( file );
- clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, 0, bytes.length);
- CommitLogHeader clHeader =
CommitLogHeader.serializer().deserialize(bufIn);
+ /* point reader/writer to a new commit log file. */
+ // logWriter_ = SequenceFile.writer(logFile_);
+ logWriter_ = CommitLog.createWriter(logFile_);
+ /* squirrel away the old commit log header */
+ clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
/*
- StringBuilder sb = new StringBuilder("");
- for ( byte b : bytes )
- {
- sb.append(b);
- sb.append(" ");
- }
+ * We need to zero out positions because the positions in
+ * the old file do not make sense in the new one.
*/
- System.out.println("FILE:" + file);
- System.out.println(clHeader.toString());
+ clHeader_.zeroPositions();
+ writeCommitLogHeader(clHeader_.toByteArray(), false);
+ // Get the list of files in commit log directory if it is greater
than a certain number
+ // Force flush all the column families that way we ensure that a
slowly populated column family is not screwing up
+ // by accumulating the commit logs .
}
}
}
\ No newline at end of file
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=786244&r1=786243&r2=786244&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
Thu Jun 18 20:22:14 2009
@@ -19,13 +19,10 @@
package org.apache.cassandra.db;
import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.*;
/**
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
@@ -34,7 +31,7 @@
class CommitLogHeader
{
private static ICompactSerializer<CommitLogHeader> serializer_;
-
+
static
{
serializer_ = new CommitLogHeaderSerializer();
@@ -115,13 +112,13 @@
return true;
}
- private byte[] header_ = new byte[0];
- private int[] position_ = new int[0];
+ private byte[] dirty = new byte[0]; // columnfamilies with un-flushed data
in this CommitLog
+ private int[] lastFlushedAt = new int[0]; // position at which each CF was
last flushed
CommitLogHeader(int size)
{
- header_ = new byte[size];
- position_ = new int[size];
+ dirty = new byte[size];
+ lastFlushedAt = new int[size];
}
/*
@@ -129,89 +126,84 @@
* also builds an index of position to column family
* Id.
*/
- CommitLogHeader(byte[] header, int[] position)
+ CommitLogHeader(byte[] dirty, int[] lastFlushedAt)
{
- header_ = header;
- position_ = position;
+ this.dirty = dirty;
+ this.lastFlushedAt = lastFlushedAt;
}
CommitLogHeader(CommitLogHeader clHeader)
{
- header_ = new byte[clHeader.header_.length];
- System.arraycopy(clHeader.header_, 0, header_, 0, header_.length);
- position_ = new int[clHeader.position_.length];
- System.arraycopy(clHeader.position_, 0, position_, 0,
position_.length);
+ dirty = new byte[clHeader.dirty.length];
+ System.arraycopy(clHeader.dirty, 0, dirty, 0, dirty.length);
+ lastFlushedAt = new int[clHeader.lastFlushedAt.length];
+ System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0,
lastFlushedAt.length);
}
byte get(int index)
{
- return header_[index];
+ return dirty[index];
}
int getPosition(int index)
{
- return position_[index];
+ return lastFlushedAt[index];
}
void turnOn(int index, long position)
{
- turnOn(header_, index, position);
+ turnOn(dirty, index, position);
}
void turnOn(byte[] bytes, int index, long position)
{
bytes[index] = (byte)1;
- position_[index] = (int)position;
+ lastFlushedAt[index] = (int)position;
}
void turnOff(int index)
{
- turnOff(header_, index);
+ turnOff(dirty, index);
}
void turnOff(byte[] bytes, int index)
{
bytes[index] = (byte)0;
- position_[index] = 0;
+ lastFlushedAt[index] = 0;
}
boolean isSafeToDelete() throws IOException
{
- return isSafeToDelete(header_);
- }
-
- boolean isSafeToDelete(byte[] bytes) throws IOException
- {
- for ( byte b : bytes )
+ for (byte b : dirty)
{
- if ( b == 1 )
+ if (b == 1)
return false;
}
return true;
}
-
+
byte[] getBitSet()
{
- return header_;
+ return dirty;
}
int[] getPositions()
{
- return position_;
+ return lastFlushedAt;
}
void zeroPositions()
{
- int size = position_.length;
- position_ = new int[size];
+ int size = lastFlushedAt.length;
+ lastFlushedAt = new int[size];
}
void and (CommitLogHeader commitLogHeader)
{
- byte[] clh2 = commitLogHeader.header_;
- for ( int i = 0; i < header_.length; ++i )
+ byte[] clh2 = commitLogHeader.dirty;
+ for ( int i = 0; i < dirty.length; ++i )
{
- header_[i] = (byte)(header_[i] & clh2[i]);
+ dirty[i] = (byte)(dirty[i] & clh2[i]);
}
}
@@ -226,16 +218,16 @@
public String toString()
{
StringBuilder sb = new StringBuilder("");
- for ( int i = 0; i < header_.length; ++i )
+ for ( int i = 0; i < dirty.length; ++i )
{
- sb.append(header_[i]);
+ sb.append(dirty[i]);
sb.append(":");
Table table = Table.open( DatabaseDescriptor.getTables().get(0));
sb.append(table.getColumnFamilyName(i));
sb.append(" ");
}
sb.append(" | " );
- for ( int position : position_ )
+ for ( int position : lastFlushedAt)
{
sb.append(position);
sb.append(" ");