Author: jbellis
Date: Tue Feb 9 19:22:35 2010
New Revision: 908163
URL: http://svn.apache.org/viewvc?rev=908163&view=rev
Log:
r/m underscores from CommitLog. patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=908163&r1=908162&r2=908163&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Tue Feb 9 19:22:35 2010
@@ -40,8 +40,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/*
* Commit Log tracks every write operation into the system. The aim
@@ -77,8 +75,8 @@
public class CommitLog
{
private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after
log gets this big
- private static final Logger logger_ = Logger.getLogger(CommitLog.class);
- private static final Map<String, CommitLogHeader> clHeaders_ = new
HashMap<String, CommitLogHeader>();
+ private static final Logger logger = Logger.getLogger(CommitLog.class);
+ private static final Map<String, CommitLogHeader> clHeaders = new
HashMap<String, CommitLogHeader>();
public static CommitLog instance()
{
@@ -133,7 +131,7 @@
public static int getSegmentCount()
{
- return clHeaders_.size();
+ return clHeaders.size();
}
static long getCreationTime(String file)
@@ -148,10 +146,10 @@
}
/* Current commit log file */
- private String logFile_;
+ private String logFile;
/* header for current commit log */
- private CommitLogHeader clHeader_;
- private BufferedRandomAccessFile logWriter_;
+ private CommitLogHeader clHeader;
+ private BufferedRandomAccessFile logWriter;
private final ExecutorService executor = new CommitLogExecutorService();
/*
@@ -160,7 +158,7 @@
*/
private void setNextFileName()
{
- logFile_ = DatabaseDescriptor.getLogFileLocation() + File.separator +
+ logFile = DatabaseDescriptor.getLogFileLocation() + File.separator +
"CommitLog-" + System.currentTimeMillis() + ".log";
}
@@ -175,7 +173,7 @@
setNextFileName();
try
{
- logWriter_ = CommitLog.createWriter(logFile_);
+ logWriter = CommitLog.createWriter(logFile);
writeCommitLogHeader();
}
catch (IOException e)
@@ -220,7 +218,7 @@
String getLogFile()
{
- return logFile_;
+ return logFile;
}
private CommitLogHeader readCommitLogHeader(BufferedRandomAccessFile
logReader) throws IOException
@@ -239,19 +237,19 @@
private void writeCommitLogHeader() throws IOException
{
int cfSize = Table.TableMetadata.getColumnFamilyCount();
- clHeader_ = new CommitLogHeader(cfSize);
- writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
+ clHeader = new CommitLogHeader(cfSize);
+ writeCommitLogHeader(logWriter, clHeader.toByteArray());
}
/** writes header at the beginning of the file, then seeks back to current
position */
private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
{
- long currentPos = logWriter_.getFilePointer();
- logWriter_.seek(0);
+ long currentPos = logWriter.getFilePointer();
+ logWriter.seek(0);
- writeCommitLogHeader(logWriter_, bytes);
+ writeCommitLogHeader(logWriter, bytes);
- logWriter_.seek(currentPos);
+ logWriter.seek(currentPos);
}
private static void writeCommitLogHeader(BufferedRandomAccessFile
logWriter, byte[] bytes) throws IOException
@@ -277,14 +275,14 @@
break;
reader.seek(lowPos);
- if (logger_.isDebugEnabled())
- logger_.debug("Replaying " + file + " starting at " + lowPos);
+ if (logger.isDebugEnabled())
+ logger.debug("Replaying " + file + " starting at " + lowPos);
/* read the logs populate RowMutation and apply */
while (!reader.isEOF())
{
- if (logger_.isDebugEnabled())
- logger_.debug("Reading mutation at " +
reader.getFilePointer());
+ if (logger.isDebugEnabled())
+ logger.debug("Reading mutation at " +
reader.getFilePointer());
long claimedCRC32;
byte[] bytes;
@@ -312,8 +310,8 @@
/* deserialize the commit log entry */
final RowMutation rm =
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
- if (logger_.isDebugEnabled())
- logger_.debug(String.format("replaying mutation for %s.%s:
%s",
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("replaying mutation for %s.%s:
%s",
rm.getTable(),
rm.key(),
"{" +
StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
@@ -389,10 +387,10 @@
for (ColumnFamily columnFamily : rm.getColumnFamilies())
{
int id = table.getColumnFamilyId(columnFamily.name());
- if (!clHeader_.isDirty(id))
+ if (!clHeader.isDirty(id))
{
- clHeader_.turnOn(id, logWriter_.getFilePointer());
- seekAndWriteCommitLogHeader(clHeader_.toByteArray());
+ clHeader.turnOn(id, logWriter.getFilePointer());
+ seekAndWriteCommitLogHeader(clHeader.toByteArray());
}
}
}
@@ -403,7 +401,7 @@
{
public CommitLogContext call() throws Exception
{
- return new CommitLogContext(logFile_,
logWriter_.getFilePointer());
+ return new CommitLogContext(logFile,
logWriter.getFilePointer());
}
};
try
@@ -473,19 +471,19 @@
*/
private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx,
int id) throws IOException
{
- if (logger_.isDebugEnabled())
- logger_.debug("discard completed log segments for " + cLogCtx + ",
column family " + id + ". CFIDs are " +
Table.TableMetadata.getColumnFamilyIDString());
+ if (logger.isDebugEnabled())
+ logger.debug("discard completed log segments for " + cLogCtx + ",
column family " + id + ". CFIDs are " +
Table.TableMetadata.getColumnFamilyIDString());
/* retrieve the commit log header associated with the file in the
context */
- if (clHeaders_.get(cLogCtx.file) == null)
+ if (clHeaders.get(cLogCtx.file) == null)
{
- if (logFile_.equals(cLogCtx.file))
+ if (logFile.equals(cLogCtx.file))
{
/* this means we are dealing with the current commit log. */
- clHeaders_.put(cLogCtx.file, clHeader_);
+ clHeaders.put(cLogCtx.file, clHeader);
}
else
{
- logger_.error("Unknown commitlog file " + cLogCtx.file);
+ logger.error("Unknown commitlog file " + cLogCtx.file);
return;
}
}
@@ -494,10 +492,10 @@
* log replay assumes that we only have to look at entries past the
last
* flush position, so verify that this flush happens after the last.
*/
- assert cLogCtx.position >=
clHeaders_.get(cLogCtx.file).getPosition(id);
+ assert cLogCtx.position >= clHeaders.get(cLogCtx.file).getPosition(id);
/* Sort the commit logs based on creation time */
- List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
+ List<String> oldFiles = new ArrayList<String>(clHeaders.keySet());
Collections.sort(oldFiles, new CommitLogFileComparator());
/*
@@ -508,17 +506,17 @@
*/
for (String oldFile : oldFiles)
{
- CommitLogHeader header = clHeaders_.get(oldFile);
+ CommitLogHeader header = clHeaders.get(oldFile);
if (oldFile.equals(cLogCtx.file))
{
// we can't just mark the segment where the flush happened
clean,
// since there may have been writes to it between when the
flush
// started and when it finished. so mark the flush position as
// the replay point for this CF, instead.
- if (logger_.isDebugEnabled())
- logger_.debug("Marking replay position " +
cLogCtx.position + " on commit log " + oldFile);
+ if (logger.isDebugEnabled())
+ logger.debug("Marking replay position " + cLogCtx.position
+ " on commit log " + oldFile);
header.turnOn(id, cLogCtx.position);
- if (oldFile.equals(logFile_))
+ if (oldFile.equals(logFile))
{
seekAndWriteCommitLogHeader(header.toByteArray());
}
@@ -532,14 +530,14 @@
header.turnOff(id);
if (header.isSafeToDelete())
{
- logger_.info("Deleting obsolete commit log:" + oldFile);
+ logger.info("Deleting obsolete commit log:" + oldFile);
DeletionService.submitDelete(oldFile);
- clHeaders_.remove(oldFile);
+ clHeaders.remove(oldFile);
}
else
{
- if (logger_.isDebugEnabled())
- logger_.debug("Not safe to delete commit log " + oldFile +
"; dirty is " + header.dirtyString());
+ if (logger.isDebugEnabled())
+ logger.debug("Not safe to delete commit log " + oldFile +
"; dirty is " + header.dirtyString());
writeOldCommitLogHeader(oldFile, header);
}
}
@@ -554,19 +552,19 @@
private boolean maybeRollLog() throws IOException
{
- if (logWriter_.length() >= SEGMENT_SIZE)
+ if (logWriter.length() >= SEGMENT_SIZE)
{
/* Rolls the current log file over to a new one. */
setNextFileName();
- String oldLogFile = logWriter_.getPath();
- logWriter_.close();
+ String oldLogFile = logWriter.getPath();
+ logWriter.close();
/* point reader/writer to a new commit log file. */
- logWriter_ = CommitLog.createWriter(logFile_);
+ logWriter = CommitLog.createWriter(logFile);
/* squirrel away the old commit log header */
- clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
- clHeader_.clear();
- writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
+ clHeaders.put(oldLogFile, new CommitLogHeader(clHeader));
+ clHeader.clear();
+ writeCommitLogHeader(logWriter, clHeader.toByteArray());
return true;
}
return false;
@@ -574,7 +572,7 @@
void sync() throws IOException
{
- logWriter_.sync();
+ logWriter.sync();
}
class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
@@ -593,33 +591,33 @@
long currentPosition = -1L;
try
{
- currentPosition = logWriter_.getFilePointer();
- CommitLogContext cLogCtx = new CommitLogContext(logFile_,
currentPosition);
+ currentPosition = logWriter.getFilePointer();
+ CommitLogContext cLogCtx = new CommitLogContext(logFile,
currentPosition);
maybeUpdateHeader(rowMutation);
Checksum checkum = new CRC32();
if (serializedRow instanceof DataOutputBuffer)
{
DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
- logWriter_.writeLong(buffer.getLength());
- logWriter_.write(buffer.getData(), 0, buffer.getLength());
+ logWriter.writeLong(buffer.getLength());
+ logWriter.write(buffer.getData(), 0, buffer.getLength());
checkum.update(buffer.getData(), 0, buffer.getLength());
}
else
{
assert serializedRow instanceof byte[];
byte[] bytes = (byte[]) serializedRow;
- logWriter_.writeLong(bytes.length);
- logWriter_.write(bytes);
+ logWriter.writeLong(bytes.length);
+ logWriter.write(bytes);
checkum.update(bytes, 0, bytes.length);
}
- logWriter_.writeLong(checkum.getValue());
+ logWriter.writeLong(checkum.getValue());
maybeRollLog();
return cLogCtx;
}
catch (IOException e)
{
if ( currentPosition != -1 )
- logWriter_.seek(currentPosition);
+ logWriter.seek(currentPosition);
throw e;
}
}