Author: chirino
Date: Thu Jan 19 05:15:52 2012
New Revision: 1233180
URL: http://svn.apache.org/viewvc?rev=1233180&view=rev
Log:
Some follow on work for APLO-128, improve the syncronization used by the log
impl and use better defaults
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233180&r1=1233179&r2=1233180&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Jan 19 05:15:52 2012
@@ -40,6 +40,7 @@ import org.apache.activemq.apollo.dto.Js
import java.util.Map
import org.iq80.leveldb._
import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
+import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -215,7 +216,14 @@ class LevelDBClient(store: LevelDBStore)
def log_size = {
import OptionSupport._
-
Option(config.log_size).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024
* 1024 * 100)
+ Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
+ if(size > Int.MaxValue) {
+ warn("leveldb log_size was configured to be '"+config.log_size+"' but
the maximum supported log size is 2G")
+ Int.MaxValue
+ } else {
+ size.toInt
+ }
+ }.getOrElse(1024 * 1024 * 100)
}
def start() = {
@@ -243,7 +251,7 @@ class LevelDBClient(store: LevelDBStore)
config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
config.index_block_restart_interval.foreach(
index_options.blockRestartInterval(_) )
- index_options.paranoidChecks(config.paranoid_checks.getOrElse(true))
+ index_options.paranoidChecks(config.paranoid_checks.getOrElse(false))
Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.writeBufferSize(_) )
Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.blockSize(_) )
Option(config.index_compression).foreach(x =>
index_options.compressionType( x match {
@@ -254,7 +262,7 @@ class LevelDBClient(store: LevelDBStore)
index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
index_options.logger(new Logger() {
- def log(msg: String) = debug(store.store_kind+": "+msg)
+ def log(msg: String) = debug(store.store_kind+": "+msg.stripSuffix("\n"))
})
log = create_log
@@ -513,7 +521,7 @@ class LevelDBClient(store: LevelDBStore)
} catch {
case e:Throwable =>
if( error==null ) {
- warn(e, "DB operation failed. (entering recovery mode)")
+ warn(e, "DB operation failed. (entering recovery mode): "+e)
}
error = e
}
@@ -880,7 +888,7 @@ class LevelDBClient(store: LevelDBStore)
}
}
- case class UsageCounter(info:RecordLog#LogInfo) {
+ case class UsageCounter(info:LogInfo) {
var count = 0L
var size = 0L
var first_reference_queue:QueueRecord = _
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1233180&r1=1233179&r2=1233180&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Jan 19 05:15:52 2012
@@ -22,12 +22,12 @@ import java.{util=>ju}
import java.util.zip.CRC32
import java.util.Map.Entry
import collection.immutable.TreeMap
-import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.atomic.AtomicLong
import java.io._
import org.apache.activemq.apollo.util.FileSupport._
import org.apache.activemq.apollo.util.{Log, LRUCache}
import org.fusesource.hawtbuf.{DataByteArrayInputStream,
DataByteArrayOutputStream, Buffer}
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
object RecordLog extends Log {
@@ -42,7 +42,13 @@ object RecordLog extends Log {
val LOG_HEADER_PREFIX = '*'.toByte
val LOG_HEADER_SIZE = 10
- val BUFFER_SIZE = 1024
+
+ val BUFFER_SIZE = 1024*512
+ val BYPASS_BUFFER_SIZE = 1024*16
+
+ case class LogInfo(file:File, position:Long, length:Long) {
+ def limit = position+length
+ }
}
case class RecordLog(directory: File, logSuffix:String) {
@@ -55,9 +61,6 @@ case class RecordLog(directory: File, lo
var paranoidChecks = false
var sync = false
- case class LogInfo(file:File, position:Long, length:AtomicLong) {
- def limit = position+length.get
- }
var log_infos = TreeMap[Long, LogInfo]()
object log_mutex
@@ -65,7 +68,7 @@ case class RecordLog(directory: File, lo
def delete(id:Long) = {
log_mutex.synchronized {
// We can't delete the current appender.
- if( current_appender.start != id ) {
+ if( current_appender.position != id ) {
log_infos.get(id).foreach { info =>
onDelete(info.file)
log_infos = log_infos.filterNot(_._1 == id)
@@ -84,7 +87,20 @@ case class RecordLog(directory: File, lo
(checksum.getValue & 0xFFFFFFFF).toInt
}
- class LogAppender(file:File, start:Long) extends LogReader(file, start) {
+ var write_thread:Thread = _
+ def is_write_thread_executing = if(write_thread==null) {
+ write_thread = Thread.currentThread()
+ true
+ } else {
+ write_thread eq Thread.currentThread()
+ }
+
+ def assert_on_write_thread = if ( !is_write_thread_executing) {
+ val current: Thread = Thread.currentThread()
+ throw new Exception("current: "+current.getName+", expected:
"+write_thread.getName)
+ }
+
+ class LogAppender(file:File, position:Long) extends LogReader(file,
position) {
override def open = new RandomAccessFile(file, "rw")
@@ -93,9 +109,13 @@ case class RecordLog(directory: File, lo
super.dispose()
}
- val length = new AtomicLong(0)
-
- def limit = start+length.get()
+ var append_offset = 0L
+ val flushed_offset = new AtomicLong(0)
+
+ def append_position = {
+ assert_on_write_thread
+ position+append_offset
+ }
// set the file size ahead of time so that we don't have to sync the file
// meta-data on every log sync.
@@ -104,101 +124,131 @@ case class RecordLog(directory: File, lo
channel.force(true)
channel.position(0)
- val os = new DataByteArrayOutputStream((BUFFER_SIZE)+LOG_HEADER_PREFIX)
+ val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE)
def force = {
// only need to update the file metadata if the file size changes..
+ assert_on_write_thread
flush
if(sync) {
- channel.force(length.get() > logSize)
- }
- }
-
- def flush = {
- if( os.position() > 0 ) {
- val buffer = os.toBuffer.toByteBuffer
- val pos = length.get()-buffer.remaining
- trace("wrote at "+pos+" "+os.toBuffer)
- channel.write(buffer, pos)
- if( buffer.hasRemaining ) {
- throw new IOException("Short write")
- }
- os.reset()
+ channel.force(append_offset > logSize)
}
}
/**
* returns the offset position of the data record.
*/
- def append(id:Byte, data: Buffer): Long = {
- val rc = limit
+ def append(id:Byte, data: Buffer): Long = this.synchronized {
+ assert_on_write_thread
+ val record_position = append_position
val data_length = data.length
val total_length = LOG_HEADER_SIZE + data_length
- if( os.position() + total_length > BUFFER_SIZE ) {
+ if( write_buffer.position() + total_length > BUFFER_SIZE ) {
flush
}
- if( total_length > (BUFFER_SIZE<<2) ) {
+ val cs: Int = checksum(data)
+// trace("Writing at: "+record_position+" len: "+data_length+" with
checksum: "+cs)
+
+ if( total_length > BYPASS_BUFFER_SIZE ) {
// Write the header and flush..
- os.writeByte(LOG_HEADER_PREFIX)
- os.writeByte(id)
- os.writeInt(checksum(data))
- os.writeInt(data_length)
+ write_buffer.writeByte(LOG_HEADER_PREFIX)
+ write_buffer.writeByte(id)
+ write_buffer.writeInt(cs)
+ write_buffer.writeInt(data_length)
- length.addAndGet(LOG_HEADER_PREFIX)
+ append_offset += LOG_HEADER_SIZE
flush
// Directly write the data to the channel since it's large.
val buffer = data.toByteBuffer
- val pos = length.get()+LOG_HEADER_PREFIX
- trace("wrote at "+pos+" "+data)
+ val pos = append_offset+LOG_HEADER_SIZE
+ flushed_offset.addAndGet(buffer.remaining)
channel.write(buffer, pos)
if( buffer.hasRemaining ) {
throw new IOException("Short write")
}
- length.addAndGet(data_length)
+ append_offset += data_length
} else {
- os.writeByte(LOG_HEADER_PREFIX)
- os.writeByte(id)
- os.writeInt(checksum(data))
- os.writeInt(data_length)
- os.write(data.data, data.offset, data_length)
- length.addAndGet(total_length)
+ write_buffer.writeByte(LOG_HEADER_PREFIX)
+ write_buffer.writeByte(id)
+ write_buffer.writeInt(cs)
+ write_buffer.writeInt(data_length)
+ write_buffer.write(data.data, data.offset, data_length)
+ append_offset += total_length
+ }
+ record_position
+ }
+
+ def flush = this.synchronized {
+ assert_on_write_thread
+ if( write_buffer.position() > 0 ) {
+ val buffer = write_buffer.toBuffer.toByteBuffer
+ val pos = append_offset-buffer.remaining
+ flushed_offset.addAndGet(buffer.remaining)
+ channel.write(buffer, pos)
+ if( buffer.hasRemaining ) {
+ throw new IOException("Short write")
+ }
+ write_buffer.reset()
+ }
+ }
+
+// override def read(record_position: Long, length: Int) =
this.synchronized {
+// super.read(record_position, length)
+// }
+//
+// override def read(record_position: Long) = this.synchronized {
+// super.read(record_position)
+// }
+//
+// override def check(record_position: Long) = this.synchronized {
+// super.check(record_position)
+// }
+
+ override def check_read_flush(end_offset:Int) = {
+ if( flushed_offset.get() < end_offset ) {
+ this.synchronized {
+ println("read flush")
+ flush
+ }
}
- rc
}
}
- case class LogReader(file:File, start:Long) extends BaseRetained {
-
- val fd = open
+ case class LogReader(file:File, position:Long) extends BaseRetained {
def open = new RandomAccessFile(file, "r")
-
- def channel = fd.getChannel
+
+ val fd = open
+ val channel = fd.getChannel
override def dispose() {
fd.close()
}
- def read(pos:Long, length:Int) = this.synchronized {
- val offset = (pos-start).toInt
+ def check_read_flush(end_offset:Int) = {}
+
+ def read(record_position:Long, length:Int) = {
+ val offset = (record_position-position).toInt
+ check_read_flush(offset+LOG_HEADER_SIZE+length)
+
if(paranoidChecks) {
+
val record = new Buffer(LOG_HEADER_SIZE+length)
+
if( channel.read(record.toByteBuffer, offset) != record.length ) {
- val data2 = new Buffer(LOG_HEADER_SIZE+length)
- channel.read(data2.toByteBuffer, offset)
- throw new IOException("short record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ throw new IOException("short record at position: "+record_position+"
in file: "+file+", offset: "+offset)
}
val is = new DataByteArrayInputStream(record)
val prefix = is.readByte()
if( prefix != LOG_HEADER_PREFIX ) {
- throw new IOException("invalid record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ throw new IOException("invalid record at position:
"+record_position+" in file: "+file+", offset: "+offset)
}
val id = is.readByte()
@@ -209,7 +259,7 @@ case class RecordLog(directory: File, lo
// If your reading the whole record we can verify the data checksum
if( expectedLength == length ) {
if( expectedChecksum != checksum(data) ) {
- throw new IOException("checksum does not match at position:
"+pos+" in file: "+file+", offset: "+offset)
+ throw new IOException("checksum does not match at position:
"+record_position+" in file: "+file+", offset: "+offset)
}
}
@@ -217,14 +267,14 @@ case class RecordLog(directory: File, lo
} else {
val data = new Buffer(length)
if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) !=
data.length ) {
- throw new IOException("short record at position: "+pos+" in file:
"+file+", offset: "+offset)
+ throw new IOException("short record at position: "+record_position+"
in file: "+file+", offset: "+offset)
}
data
}
}
- def read(pos:Long) = this.synchronized {
- val offset = (pos-start).toInt
+ def read(record_position:Long) = {
+ val offset = (record_position-position).toInt
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();
@@ -247,11 +297,11 @@ case class RecordLog(directory: File, lo
throw new IOException("checksum does not match")
}
}
- (id, data, pos+LOG_HEADER_SIZE+length)
+ (id, data, record_position+LOG_HEADER_SIZE+length)
}
- def check(pos:Long):Option[Long] = this.synchronized {
- var offset = (pos-start).toInt
+ def check(record_position:Long):Option[Long] = {
+ var offset = (record_position-position).toInt
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();
@@ -289,12 +339,12 @@ case class RecordLog(directory: File, lo
if( expectedChecksum != checksum ) {
return None
}
- return Some(pos+LOG_HEADER_SIZE+length)
+ return Some(record_position+LOG_HEADER_SIZE+length)
}
- def verifyAndGetEndPosition:Long = this.synchronized {
- var pos = start;
- val limit = start+channel.size()
+ def verifyAndGetEndPosition:Long = {
+ var pos = position;
+ val limit = position+channel.size()
while(pos < limit) {
check(pos) match {
case Some(next) => pos = next
@@ -310,16 +360,20 @@ case class RecordLog(directory: File, lo
}
def create_appender(position: Long): Any = {
- current_appender = create_log_appender(position)
+ assert_on_write_thread
log_mutex.synchronized {
- log_infos += position -> new LogInfo(current_appender.file, position,
current_appender.length)
+ if(current_appender!=null) {
+ log_infos += position -> new LogInfo(current_appender.file,
current_appender.position, current_appender.append_offset)
+ }
+ current_appender = create_log_appender(position)
+ log_infos += position -> new LogInfo(current_appender.file, position, 0)
}
}
def open = {
log_mutex.synchronized {
log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map
{ case (position,file) =>
- position -> LogInfo(file, position, new AtomicLong(file.length()))
+ position -> LogInfo(file, position, file.length())
}
val appendPos = if( log_infos.isEmpty ) {
@@ -328,13 +382,14 @@ case class RecordLog(directory: File, lo
val (_, file) = log_infos.last
val r = LogReader(file.file, file.position)
try {
- val rc = r.verifyAndGetEndPosition
- file.length.set(rc - file.position)
- if( file.file.length != file.length.get() ) {
+ val actualLength = r.verifyAndGetEndPosition
+ val updated = file.copy(length = actualLength - file.position)
+ log_infos = log_infos + (updated.position->updated)
+ if( updated.file.length != file.length ) {
// we need to truncate.
- using(new RandomAccessFile(file.file, "rw")) (
_.setLength(file.length.get()) )
+ using(new RandomAccessFile(file.file, "rw")) (
_.setLength(updated.length))
}
- rc
+ actualLength
} finally {
r.release()
}
@@ -350,8 +405,8 @@ case class RecordLog(directory: File, lo
}
}
- def appender_limit = current_appender.limit
- def appender_start = current_appender.start
+ def appender_limit = current_appender.append_position
+ def appender_start = current_appender.position
def next_log(position:Long) = LevelDBClient.create_sequence_file(directory,
position, logSuffix)
@@ -361,10 +416,11 @@ case class RecordLog(directory: File, lo
} finally {
current_appender.flush
log_mutex.synchronized {
- if ( current_appender.length.get >= logSize ) {
+ assert_on_write_thread
+ if ( current_appender.append_offset >= logSize ) {
current_appender.release()
on_log_rotate()
- create_appender(current_appender.limit)
+ create_appender(current_appender.append_position)
}
}
}
@@ -380,12 +436,12 @@ case class RecordLog(directory: File, lo
def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L,
pos+1).lastOption.map(_._2))
- private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
+ private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
val lookup = log_mutex.synchronized {
- val info = log_info(pos)
+ val info = log_info(record_position)
info.map { info=>
- if(info.position == current_appender.start) {
+ if(info.position == current_appender.position) {
current_appender.retain()
(info, current_appender)
} else {