Author: jbellis
Date: Thu Feb 11 03:49:49 2010
New Revision: 908830
URL: http://svn.apache.org/viewvc?rev=908830&view=rev
Log:
encapsulate commitlog file operations in CommitLogSegment
patch by jbellis; reviewed by junrao for CASSANDRA-783
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Feb 11 03:49:49 2010
@@ -40,6 +40,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
@@ -366,14 +367,14 @@
Table.flusherLock.writeLock().lock();
try
{
- final CommitLog.CommitLogContext ctx =
CommitLog.instance().getContext(); // this is harmless if !writeCommitLog
-
if (oldMemtable.isFrozen())
{
return null;
}
- logger_.info(columnFamily_ + " has reached its threshold;
switching in a fresh Memtable");
oldMemtable.freeze();
+
+ final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance().getContext() : null;
+ logger_.info(columnFamily_ + " has reached its threshold;
switching in a fresh Memtable at " + ctx);
final Condition condition = submitFlush(oldMemtable);
memtable_ = new Memtable(table_, columnFamily_);
// a second executor that makes sure the onMemtableFlushes get
called in the right order,
@@ -387,7 +388,7 @@
{
// if we're not writing to the commit log, we are
replaying the log, so marking
// the log header with "you can discard anything
written before the context" is not valid
- onMemtableFlush(ctx);
+ CommitLog.instance().discardCompletedSegments(table_,
columnFamily_, ctx);
}
}
});
@@ -533,19 +534,6 @@
}
/*
- * This method is called when the Memtable is frozen and ready to be
flushed
- * to disk. This method informs the CommitLog that a particular
ColumnFamily
- * is being flushed to disk.
- */
- void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
- {
- if (cLogCtx.isValidContext())
- {
- CommitLog.instance().onMemtableFlush(table_, columnFamily_,
cLogCtx);
- }
- }
-
- /*
* Called after the Memtable flushes its in-memory data, or we add a file
* via bootstrap. This information is
* cached in the ColumnFamilyStore. This is useful for reads because the
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RecoveryManager.java
Thu Feb 11 03:49:49 2010
@@ -53,7 +53,7 @@
Arrays.sort(files, new FileUtils.FileComparator());
logger_.info("Replaying " + StringUtils.join(files, ", "));
- CommitLog.instance().recover(files);
+ CommitLog.recover(files);
FileUtils.delete(files);
logger_.info("Log replay complete");
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu
Feb 11 03:49:49 2010
@@ -28,6 +28,7 @@
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.SSTableDeletingReference;
import org.apache.cassandra.io.SSTableReader;
@@ -396,7 +397,7 @@
{
if (writeCommitLog)
{
- Future<CommitLog.CommitLogContext> future =
CommitLog.instance().add(mutation, serializedMutation);
+ Future<CommitLogSegment.CommitLogContext> future =
CommitLog.instance().add(mutation, serializedMutation);
if (waitForCommitLog)
{
try
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Feb 11 03:49:49 2010
@@ -23,9 +23,7 @@
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.concurrent.StageManager;
@@ -68,15 +66,12 @@
* inherited the old's dirty bitflags, getting a zero for any given bit in the
anding
* means that either the CF was clean in the old CL or it has been flushed
since the
* switch in the new.)
- *
- * The CommitLog class itself is "mostly a singleton." open() always returns
one
- * instance, but log replay will bypass that.
*/
public class CommitLog
{
private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after
log gets this big
+
private static final Logger logger = Logger.getLogger(CommitLog.class);
- private static final Map<String, CommitLogHeader> clHeaders = new
HashMap<String, CommitLogHeader>();
public static CommitLog instance()
{
@@ -88,81 +83,21 @@
public static final CommitLog instance = new CommitLog();
}
- public static class CommitLogContext
- {
- /* Commit Log associated with this operation */
- public final String file;
- /* Offset within the Commit Log where this row as added */
- public final long position;
-
- public CommitLogContext(String file, long position)
- {
- this.file = file;
- this.position = position;
- }
-
- public boolean isValidContext()
- {
- return (position != -1L);
- }
-
- @Override
- public String toString()
- {
- return "CommitLogContext(" +
- "file='" + file + '\'' +
- ", position=" + position +
- ')';
- }
- }
-
- public static class CommitLogFileComparator implements Comparator<String>
- {
- public int compare(String f, String f2)
- {
- return (int)(getCreationTime(f) - getCreationTime(f2));
- }
- }
+ private final Deque<CommitLogSegment> segments = new
ArrayDeque<CommitLogSegment>();
public static void setSegmentSize(int size)
{
SEGMENT_SIZE = size;
}
- public static int getSegmentCount()
+ public int getSegmentCount()
{
- return clHeaders.size();
- }
-
- static long getCreationTime(String file)
- {
- String[] entries = FBUtilities.strip(file, "-.");
- return Long.parseLong(entries[entries.length - 2]);
- }
-
- private static BufferedRandomAccessFile createWriter(String file) throws
IOException
- {
- return new BufferedRandomAccessFile(file, "rw");
+ return segments.size();
}
- /* Current commit log file */
- private String logFile;
- /* header for current commit log */
- private CommitLogHeader clHeader;
- private BufferedRandomAccessFile logWriter;
private final ExecutorService executor = new CommitLogExecutorService();
- /*
- * Generates a file name of the format CommitLog-<table>-<timestamp>.log
in the
- * directory specified by the Database Descriptor.
- */
- private void setNextFileName()
- {
- logFile = DatabaseDescriptor.getLogFileLocation() + File.separator +
- "CommitLog-" + System.currentTimeMillis() + ".log";
- }
-
- /*
+ /**
* param @ table - name of table for which we are maintaining
* this commit log.
* param @ recoverymode - is commit log being instantiated in
@@ -170,17 +105,11 @@
*/
private CommitLog()
{
- setNextFileName();
- try
- {
- logWriter = CommitLog.createWriter(logFile);
- writeCommitLogHeader();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
-
+ // all old segments are recovered and deleted before CommitLog is
instantiated.
+ // All we need to do is create a new one.
+ int cfSize = Table.TableMetadata.getColumnFamilyCount();
+ segments.add(new CommitLogSegment(cfSize));
+
if (DatabaseDescriptor.getCommitLogSync() ==
DatabaseDescriptor.CommitLogSync.periodic)
{
final Runnable syncer = new WrappedRunnable()
@@ -216,50 +145,7 @@
}
}
- String getLogFile()
- {
- return logFile;
- }
-
- private CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile
logReader) throws IOException
- {
- int size = (int)logReader.readLong();
- byte[] bytes = new byte[size];
- logReader.read(bytes);
- ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
- return CommitLogHeader.serializer().deserialize(new
DataInputStream(byteStream));
- }
-
- /*
- * This is invoked on startup via the ctor. It basically
- * writes a header with all bits set to zero.
- */
- private void writeCommitLogHeader() throws IOException
- {
- int cfSize = Table.TableMetadata.getColumnFamilyCount();
- clHeader = new CommitLogHeader(cfSize);
- writeCommitLogHeader(logWriter, clHeader.toByteArray());
- }
-
- /** writes header at the beginning of the file, then seeks back to current
position */
- private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
- {
- long currentPos = logWriter.getFilePointer();
- logWriter.seek(0);
-
- writeCommitLogHeader(logWriter, bytes);
-
- logWriter.seek(currentPos);
- }
-
- private static void writeCommitLogHeader(BufferedRandomAccessFile
logWriter, byte[] bytes) throws IOException
- {
- logWriter.writeLong(bytes.length);
- logWriter.write(bytes);
- logWriter.sync();
- }
-
- public void recover(File[] clogs) throws IOException
+ public static void recover(File[] clogs) throws IOException
{
Set<Table> tablesRecovered = new HashSet<Table>();
assert
StageManager.getStage(StageManager.MUTATION_STAGE).getCompletedTaskCount() == 0;
@@ -268,7 +154,7 @@
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
BufferedRandomAccessFile reader = new
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
- final CommitLogHeader clHeader = readCommitLogHeader(reader);
+ final CommitLogHeader clHeader =
CommitLogHeader.readCommitLogHeader(reader);
/* seek to the lowest position where any CF has non-flushed data */
int lowPos = CommitLogHeader.getLowestPosition(clHeader);
if (lowPos == 0)
@@ -357,13 +243,13 @@
}
}
- // flush replayed tables, allowing commitlog segments to be removed
+ // flush replayed tables
List<Future<?>> futures = new ArrayList<Future<?>>();
for (Table table : tablesRecovered)
{
futures.addAll(table.flush());
}
- // wait for flushes to finish before continuing with startup
+ // wait for flushes to finish
for (Future<?> future : futures)
{
try
@@ -377,31 +263,18 @@
}
}
- /*
- * Update the header of the commit log if a new column family
- * is encountered for the first time.
- */
- private void maybeUpdateHeader(RowMutation rm) throws IOException
+ private CommitLogSegment currentSegment()
{
- Table table = Table.open(rm.getTable());
- for (ColumnFamily columnFamily : rm.getColumnFamilies())
- {
- int id = table.getColumnFamilyId(columnFamily.name());
- if (!clHeader.isDirty(id))
- {
- clHeader.turnOn(id, logWriter.getFilePointer());
- seekAndWriteCommitLogHeader(clHeader.toByteArray());
- }
- }
+ return segments.getLast();
}
- public CommitLogContext getContext() throws IOException
+ public CommitLogSegment.CommitLogContext getContext() throws IOException
{
- Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+ Callable<CommitLogSegment.CommitLogContext> task = new
Callable<CommitLogSegment.CommitLogContext>()
{
- public CommitLogContext call() throws Exception
+ public CommitLogSegment.CommitLogContext call() throws Exception
{
- return new CommitLogContext(logFile,
logWriter.getFilePointer());
+ return currentSegment().getContext();
}
};
try
@@ -424,9 +297,9 @@
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- public Future<CommitLogContext> add(RowMutation rowMutation, Object
serializedRow) throws IOException
+ public Future<CommitLogSegment.CommitLogContext> add(RowMutation
rowMutation, Object serializedRow) throws IOException
{
- Callable<CommitLogContext> task = new LogRecordAdder(rowMutation,
serializedRow);
+ Callable<CommitLogSegment.CommitLogContext> task = new
LogRecordAdder(rowMutation, serializedRow);
return executor.submit(task);
}
@@ -436,15 +309,14 @@
* The bit flag associated with this column family is set in the
* header and this is used to decide if the log file can be deleted.
*/
- public void onMemtableFlush(final String tableName, final String cf, final
CommitLog.CommitLogContext cLogCtx) throws IOException
+ public void discardCompletedSegments(final String tableName, final String
cf, final CommitLogSegment.CommitLogContext context) throws IOException
{
Callable task = new Callable()
{
public Object call() throws IOException
{
- Table table = Table.open(tableName);
- int id = table.getColumnFamilyId(cf);
- discardCompletedSegments(cLogCtx, id);
+ int id = Table.open(tableName).getColumnFamilyId(cf);
+ discardCompletedSegmentsInternal(context, id);
return null;
}
};
@@ -462,41 +334,23 @@
}
}
- /*
- * Delete log segments whose contents have been turned into SSTables.
+ /**
+ * Delete log segments whose contents have been turned into SSTables. NOT
threadsafe.
*
- * param @ cLogCtx The commitLog context .
+ * param @ context The commitLog context .
* param @ id id of the columnFamily being flushed to disk.
*
*/
- private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx,
int id) throws IOException
+ private void
discardCompletedSegmentsInternal(CommitLogSegment.CommitLogContext context, int
id) throws IOException
{
if (logger.isDebugEnabled())
- logger.debug("discard completed log segments for " + cLogCtx + ",
column family " + id + ". CFIDs are " +
Table.TableMetadata.getColumnFamilyIDString());
- /* retrieve the commit log header associated with the file in the
context */
- if (clHeaders.get(cLogCtx.file) == null)
- {
- if (logFile.equals(cLogCtx.file))
- {
- /* this means we are dealing with the current commit log. */
- clHeaders.put(cLogCtx.file, clHeader);
- }
- else
- {
- logger.error("Unknown commitlog file " + cLogCtx.file);
- return;
- }
- }
+ logger.debug("discard completed log segments for " + context + ",
column family " + id + ". CFIDs are " +
Table.TableMetadata.getColumnFamilyIDString());
/*
* log replay assumes that we only have to look at entries past the
last
* flush position, so verify that this flush happens after the last.
*/
- assert cLogCtx.position >= clHeaders.get(cLogCtx.file).getPosition(id);
-
- /* Sort the commit logs based on creation time */
- List<String> oldFiles = new ArrayList<String>(clHeaders.keySet());
- Collections.sort(oldFiles, new CommitLogFileComparator());
+ assert context.position >
context.getSegment().getHeader().getPosition(id) : "discard called on obsolete
context " + context;
/*
* Loop through all the commit log files in the history. Now process
@@ -504,78 +358,47 @@
* these files the header needs to modified by resetting the dirty
* bit corresponding to the flushed CF.
*/
- for (String oldFile : oldFiles)
+ for (CommitLogSegment segment : segments)
{
- CommitLogHeader header = clHeaders.get(oldFile);
- if (oldFile.equals(cLogCtx.file))
+ CommitLogHeader header = segment.getHeader();
+ if (segment.equals(context.getSegment()))
{
// we can't just mark the segment where the flush happened
clean,
// since there may have been writes to it between when the
flush
// started and when it finished. so mark the flush position as
// the replay point for this CF, instead.
if (logger.isDebugEnabled())
- logger.debug("Marking replay position " + cLogCtx.position
+ " on commit log " + oldFile);
- header.turnOn(id, cLogCtx.position);
- if (oldFile.equals(logFile))
- {
- seekAndWriteCommitLogHeader(header.toByteArray());
- }
- else
- {
- writeOldCommitLogHeader(oldFile, header);
- }
+ logger.debug("Marking replay position " + context.position
+ " on commit log " + segment);
+ header.turnOn(id, context.position);
+ segment.writeHeader();
break;
}
header.turnOff(id);
if (header.isSafeToDelete())
{
- logger.info("Deleting obsolete commit log:" + oldFile);
- DeletionService.submitDelete(oldFile);
- clHeaders.remove(oldFile);
+ logger.info("Discarding obsolete commit log:" + segment);
+ segment.close();
+ DeletionService.submitDelete(segment.getPath());
+ // usually this will be the first (remaining) segment, but not
always, if segment A contains
+ // writes to a CF that is unflushed but is followed by segment
B whose CFs are all flushed.
+ segments.remove(segment);
}
else
{
if (logger.isDebugEnabled())
- logger.debug("Not safe to delete commit log " + oldFile +
"; dirty is " + header.dirtyString());
- writeOldCommitLogHeader(oldFile, header);
+ logger.debug("Not safe to delete commit log " + segment +
"; dirty is " + header.dirtyString());
+ segment.writeHeader();
}
}
}
- private void writeOldCommitLogHeader(String oldFile, CommitLogHeader
header) throws IOException
- {
- BufferedRandomAccessFile logWriter = CommitLog.createWriter(oldFile);
- writeCommitLogHeader(logWriter, header.toByteArray());
- logWriter.close();
- }
-
- private boolean maybeRollLog() throws IOException
- {
- if (logWriter.length() >= SEGMENT_SIZE)
- {
- /* Rolls the current log file over to a new one. */
- setNextFileName();
- String oldLogFile = logWriter.getPath();
- logWriter.close();
-
- /* point reader/writer to a new commit log file. */
- logWriter = CommitLog.createWriter(logFile);
- /* squirrel away the old commit log header */
- clHeaders.put(oldLogFile, new CommitLogHeader(clHeader));
- clHeader.clear();
- writeCommitLogHeader(logWriter, clHeader.toByteArray());
- return true;
- }
- return false;
- }
-
void sync() throws IOException
{
- logWriter.sync();
+ currentSegment().sync();
}
- class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
+ class LogRecordAdder implements Callable<CommitLogSegment.CommitLogContext>
{
final RowMutation rowMutation;
final Object serializedRow;
@@ -586,40 +409,18 @@
this.serializedRow = serializedRow;
}
- public CommitLog.CommitLogContext call() throws Exception
+ public CommitLogSegment.CommitLogContext call() throws Exception
{
- long currentPosition = -1L;
- try
+ CommitLogSegment.CommitLogContext context =
currentSegment().write(rowMutation, serializedRow);
+
+ // roll log if necessary
+ if (currentSegment().length() >= SEGMENT_SIZE)
{
- currentPosition = logWriter.getFilePointer();
- CommitLogContext cLogCtx = new CommitLogContext(logFile,
currentPosition);
- maybeUpdateHeader(rowMutation);
- Checksum checkum = new CRC32();
- if (serializedRow instanceof DataOutputBuffer)
- {
- DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
- logWriter.writeLong(buffer.getLength());
- logWriter.write(buffer.getData(), 0, buffer.getLength());
- checkum.update(buffer.getData(), 0, buffer.getLength());
- }
- else
- {
- assert serializedRow instanceof byte[];
- byte[] bytes = (byte[]) serializedRow;
- logWriter.writeLong(bytes.length);
- logWriter.write(bytes);
- checkum.update(bytes, 0, bytes.length);
- }
- logWriter.writeLong(checkum.getValue());
- maybeRollLog();
- return cLogCtx;
- }
- catch (IOException e)
- {
- if ( currentPosition != -1 )
- logWriter.seek(currentPosition);
- throw e;
+ sync();
+ segments.add(new
CommitLogSegment(currentSegment().getHeader().getColumnFamilyCount()));
}
+
+ return context;
}
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogExecutorService.java
Thu Feb 11 03:49:49 2010
@@ -11,9 +11,9 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.WrappedRunnable;
-public class CommitLogExecutorService extends AbstractExecutorService
implements CommitLogExecutorServiceMBean
+class CommitLogExecutorService extends AbstractExecutorService implements
CommitLogExecutorServiceMBean
{
- BlockingQueue<CheaterFutureTask> queue;
+ private final BlockingQueue<CheaterFutureTask> queue;
private volatile long completedTaskCount = 0;
@@ -92,8 +92,8 @@
queue.take().run();
}
- private ArrayList<CheaterFutureTask> incompleteTasks = new
ArrayList<CheaterFutureTask>();
- private ArrayList taskValues = new ArrayList(); // TODO not sure how to
generify this
+ private final ArrayList<CheaterFutureTask> incompleteTasks = new
ArrayList<CheaterFutureTask>();
+ private final ArrayList taskValues = new ArrayList(); // TODO not sure how
to generify this
private void processWithSyncBatch() throws Exception
{
CheaterFutureTask firstTask = queue.take();
@@ -188,26 +188,26 @@
{
throw new UnsupportedOperationException();
}
-}
-
-class CheaterFutureTask<V> extends FutureTask<V>
-{
- private Callable rawCallable;
- public CheaterFutureTask(Callable<V> callable)
+ private static class CheaterFutureTask<V> extends FutureTask<V>
{
- super(callable);
- rawCallable = callable;
- }
+ private final Callable rawCallable;
- public Callable getRawCallable()
- {
- return rawCallable;
- }
+ public CheaterFutureTask(Callable<V> callable)
+ {
+ super(callable);
+ rawCallable = callable;
+ }
- @Override
- public void set(V v)
- {
- super.set(v);
+ public Callable getRawCallable()
+ {
+ return rawCallable;
+ }
+
+ @Override
+ public void set(V v)
+ {
+ super.set(v);
+ }
}
-}
\ No newline at end of file
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
Thu Feb 11 03:49:49 2010
@@ -24,6 +24,7 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.utils.BitSetSerializer;
class CommitLogHeader
@@ -70,14 +71,7 @@
this.dirty = dirty;
this.lastFlushedAt = lastFlushedAt;
}
-
- CommitLogHeader(CommitLogHeader clHeader)
- {
- dirty = (BitSet)clHeader.dirty.clone();
- lastFlushedAt = new int[clHeader.lastFlushedAt.length];
- System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0,
lastFlushedAt.length);
- }
-
+
boolean isDirty(int index)
{
return dirty.get(index);
@@ -105,12 +99,6 @@
return dirty.isEmpty();
}
- void clear()
- {
- dirty.clear();
- Arrays.fill(lastFlushedAt, 0);
- }
-
byte[] toByteArray() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -152,6 +140,20 @@
return sb.toString();
}
+ static CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile
logReader) throws IOException
+ {
+ int size = (int)logReader.readLong();
+ byte[] bytes = new byte[size];
+ logReader.read(bytes);
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
+ return serializer().deserialize(new DataInputStream(byteStream));
+ }
+
+ public int getColumnFamilyCount()
+ {
+ return lastFlushedAt.length;
+ }
+
static class CommitLogHeaderSerializer implements
ICompactSerializer<CommitLogHeader>
{
public void serialize(CommitLogHeader clHeader, DataOutputStream dos)
throws IOException
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=908830&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Thu Feb 11 03:49:49 2010
@@ -0,0 +1,193 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+public class CommitLogSegment
+{
+ private static final Logger logger =
Logger.getLogger(CommitLogSegment.class);
+
+ private final BufferedRandomAccessFile logWriter;
+ private final CommitLogHeader header;
+
+ public CommitLogSegment(int cfCount)
+ {
+ this.header = new CommitLogHeader(cfCount);
+ String logFile = DatabaseDescriptor.getLogFileLocation() +
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
+ logger.info("Creating new commitlog segment " + logFile);
+
+ try
+ {
+ logWriter = createWriter(logFile);
+ writeCommitLogHeader(header.toByteArray());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void writeHeader() throws IOException
+ {
+ seekAndWriteCommitLogHeader(header.toByteArray());
+ }
+
+ /** writes header at the beginning of the file, then seeks back to current
position */
+ void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
+ {
+ long currentPos = logWriter.getFilePointer();
+ logWriter.seek(0);
+
+ writeCommitLogHeader(bytes);
+
+ logWriter.seek(currentPos);
+ }
+
+ private void writeCommitLogHeader(byte[] bytes) throws IOException
+ {
+ logWriter.writeLong(bytes.length);
+ logWriter.write(bytes);
+ logWriter.sync();
+ }
+
+ private static BufferedRandomAccessFile createWriter(String file) throws
IOException
+ {
+ return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
+ }
+
+ public CommitLogSegment.CommitLogContext write(RowMutation rowMutation,
Object serializedRow) throws IOException
+ {
+ long currentPosition = -1L;
+ try
+ {
+ currentPosition = logWriter.getFilePointer();
+ CommitLogSegment.CommitLogContext cLogCtx = new
CommitLogSegment.CommitLogContext(currentPosition);
+ Table table = Table.open(rowMutation.getTable());
+
+ // update header
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ {
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if (!header.isDirty(id))
+ {
+ header.turnOn(id, logWriter.getFilePointer());
+ seekAndWriteCommitLogHeader(header.toByteArray());
+ }
+ }
+
+ // write mutation, w/ checksum
+ Checksum checkum = new CRC32();
+ if (serializedRow instanceof DataOutputBuffer)
+ {
+ DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
+ logWriter.writeLong(buffer.getLength());
+ logWriter.write(buffer.getData(), 0, buffer.getLength());
+ checkum.update(buffer.getData(), 0, buffer.getLength());
+ }
+ else
+ {
+ assert serializedRow instanceof byte[];
+ byte[] bytes = (byte[]) serializedRow;
+ logWriter.writeLong(bytes.length);
+ logWriter.write(bytes);
+ checkum.update(bytes, 0, bytes.length);
+ }
+ logWriter.writeLong(checkum.getValue());
+
+ return cLogCtx;
+ }
+ catch (IOException e)
+ {
+ if (currentPosition != -1)
+ logWriter.seek(currentPosition);
+ throw e;
+ }
+ }
+
+ public void sync() throws IOException
+ {
+ logWriter.sync();
+ }
+
+ public CommitLogContext getContext()
+ {
+ return new CommitLogContext(logWriter.getFilePointer());
+ }
+
+ public CommitLogHeader getHeader()
+ {
+ return header;
+ }
+
+ public String getPath()
+ {
+ return logWriter.getPath();
+ }
+
+ public long length()
+ {
+ try
+ {
+ return logWriter.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void close()
+ {
+ try
+ {
+ logWriter.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CommitLogSegment(" + logWriter.getPath() + ')';
+ }
+
+ public class CommitLogContext
+ {
+ public final long position;
+
+ public CommitLogContext(long position)
+ {
+ assert position >= 0;
+ this.position = position;
+ }
+
+ public CommitLogSegment getSegment()
+ {
+ return CommitLogSegment.this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CommitLogContext(" +
+ "file='" + logWriter.getPath() + '\'' +
+ ", position=" + position +
+ ')';
+ }
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=908830&r1=908829&r2=908830&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
Thu Feb 11 03:49:49 2010
@@ -32,7 +32,7 @@
@Test
public void testCleanup() throws IOException, ExecutionException,
InterruptedException
{
- assert CommitLog.getSegmentCount() == 0;
+ assert CommitLog.instance().getSegmentCount() == 1;
CommitLog.setSegmentSize(1000);
Table table = Table.open("Keyspace1");
@@ -49,14 +49,14 @@
rm.add(new QueryPath("Standard2", null, "Column1".getBytes()),
value, 0);
rm.apply();
}
- assert CommitLog.getSegmentCount() > 1;
+ assert CommitLog.instance().getSegmentCount() > 1;
// nothing should get removed after flushing just Standard1
store1.forceBlockingFlush();
- assert CommitLog.getSegmentCount() > 1;
+ assert CommitLog.instance().getSegmentCount() > 1;
// after flushing Standard2 we should be able to clean out all segments
store2.forceBlockingFlush();
- assert CommitLog.getSegmentCount() == 1;
+ assert CommitLog.instance().getSegmentCount() == 1;
}
}