Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala&r1=1245945&r2=1291054&rev=1291054&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/RecordLog.scala Sun Feb 19 20:24:15 2012 @@ -1,3 +1,5 @@ +package org.apache.activemq.apollo.broker.store.leveldb + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -14,10 +16,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store.leveldb -import java.{lang=>jl} -import java.{util=>ju} +import java.{lang => jl} +import java.{util => ju} import java.util.zip.CRC32 import java.util.Map.Entry @@ -42,56 +43,59 @@ object RecordLog extends Log { val LOG_HEADER_PREFIX = '*'.toByte val UOW_END_RECORD = -1.toByte - + val LOG_HEADER_SIZE = 10 - val BUFFER_SIZE = 1024*512 - val BYPASS_BUFFER_SIZE = 1024*16 + val BUFFER_SIZE = 1024 * 512 + val BYPASS_BUFFER_SIZE = 1024 * 16 - case class LogInfo(file:File, position:Long, length:Long) { - def limit = position+length + case class LogInfo(file: File, position: Long, length: Long) { + def limit = position + length } - - def encode_long(a1:Long) = { + + def encode_long(a1: Long) = { val out = new DataByteArrayOutputStream(8) out.writeLong(a1) out.toBuffer } - def decode_long(value:Buffer):Long = { + def decode_long(value: Buffer): Long = { val in = new DataByteArrayInputStream(value) in.readLong() } - + } -case class RecordLog(directory: File, logSuffix:String) { +case class RecordLog(directory: File, logSuffix: String) { + import RecordLog._ directory.mkdirs() var logSize = 1024 * 1024 * 100L - var current_appender:LogAppender = _ + var current_appender: LogAppender = _ var verify_checksums = false var sync = false val log_infos = new TreeMap[Long, LogInfo]() + object log_mutex - def delete(id:Long) = { + def delete(id: Long) = { log_mutex.synchronized { // We can't delete the current appender. - if( current_appender.position != id ) { - Option(log_infos.get(id)).foreach { info => - onDelete(info.file) - log_infos.remove(id) + if (current_appender.position != id) { + Option(log_infos.get(id)).foreach { + info => + onDelete(info.file) + log_infos.remove(id) } } } } - protected def onDelete(file:File) = { + protected def onDelete(file: File) = { file.delete() } @@ -101,7 +105,7 @@ case class RecordLog(directory: File, lo (checksum.getValue & 0xFFFFFFFF).toInt } - class LogAppender(file:File, position:Long) extends LogReader(file, position) { + class LogAppender(file: File, position: Long) extends LogReader(file, position) { val info = new LogInfo(file, position, 0) @@ -116,23 +120,23 @@ case class RecordLog(directory: File, lo val flushed_offset = new AtomicLong(0) def append_position = { - position+append_offset + position + append_offset } // set the file size ahead of time so that we don't have to sync the file // meta-data on every log sync. - channel.position(logSize-1) + channel.position(logSize - 1) channel.write(new Buffer(1).toByteBuffer) channel.force(true) - if( sync ) { + if (sync) { channel.position(0) } - val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE) + val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE + LOG_HEADER_SIZE) def force = { flush - if(sync) { + if (sync) { // only need to update the file metadata if the file size changes.. channel.force(append_offset > logSize) } @@ -141,19 +145,19 @@ case class RecordLog(directory: File, lo /** * returns the offset position of the data record. */ - def append(id:Byte, data: Buffer) = this.synchronized { + def append(id: Byte, data: Buffer) = this.synchronized { val record_position = append_position val data_length = data.length val total_length = LOG_HEADER_SIZE + data_length - - if( write_buffer.position() + total_length > BUFFER_SIZE ) { + + if (write_buffer.position() + total_length > BUFFER_SIZE) { flush } val cs: Int = checksum(data) -// trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs) + // trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs) - if( false && total_length > BYPASS_BUFFER_SIZE ) { + if (false && total_length > BYPASS_BUFFER_SIZE) { // Write the header and flush.. write_buffer.writeByte(LOG_HEADER_PREFIX) @@ -166,10 +170,10 @@ case class RecordLog(directory: File, lo // Directly write the data to the channel since it's large. val buffer = data.toByteBuffer - val pos = append_offset+LOG_HEADER_SIZE + val pos = append_offset + LOG_HEADER_SIZE flushed_offset.addAndGet(buffer.remaining) channel.write(buffer, pos) - if( buffer.hasRemaining ) { + if (buffer.hasRemaining) { throw new IOException("Short write") } append_offset += data_length @@ -182,24 +186,24 @@ case class RecordLog(directory: File, lo write_buffer.write(data.data, data.offset, data_length) append_offset += total_length } - (record_position,info) + (record_position, info) } def flush = this.synchronized { - if( write_buffer.position() > 0 ) { + if (write_buffer.position() > 0) { val buffer = write_buffer.toBuffer.toByteBuffer - val pos = append_offset-buffer.remaining + val pos = append_offset - buffer.remaining flushed_offset.addAndGet(buffer.remaining) channel.write(buffer, pos) - if( buffer.hasRemaining ) { + if (buffer.hasRemaining) { throw new IOException("Short write") } write_buffer.reset() } } - override def check_read_flush(end_offset:Long) = { - if( flushed_offset.get() < end_offset ) { + override def check_read_flush(end_offset: Long) = { + if (flushed_offset.get() < end_offset) { this.synchronized { flush } @@ -208,7 +212,7 @@ case class RecordLog(directory: File, lo } - case class LogReader(file:File, position:Long) extends BaseRetained { + case class LogReader(file: File, position: Long) extends BaseRetained { def open = new RandomAccessFile(file, "r") @@ -219,38 +223,39 @@ case class RecordLog(directory: File, lo fd.close() } - def check_read_flush(end_offset:Long) = {} - - def read(record_position:Long, length:Int) = { - val offset = record_position-position - assert(offset >=0 ) - - check_read_flush(offset+LOG_HEADER_SIZE+length) - - val record = new Buffer(LOG_HEADER_SIZE+length) - if( channel.read(record.toByteBuffer, offset) != record.length ) { - throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset) + def check_read_flush(end_offset: Long) = {} + + def read(record_position: Long, length: Int) = { + val offset = record_position - position + assert(offset >= 0) + + check_read_flush(offset + LOG_HEADER_SIZE + length) + + val record = new Buffer(LOG_HEADER_SIZE + length) + if (channel.read(record.toByteBuffer, offset) != record.length) { + throw new IOException("short record at position: " + record_position + " in file: " + file + ", offset: " + offset) } - if(verify_checksums) { + if (verify_checksums) { def record_is_not_changing = { - using(open) { fd => - val channel = fd.getChannel - val new_record = new Buffer(LOG_HEADER_SIZE+length) - channel.read(new_record.toByteBuffer, offset) - var same = record == new_record - println(same) - same + using(open) { + fd => + val channel = fd.getChannel + val new_record = new Buffer(LOG_HEADER_SIZE + length) + channel.read(new_record.toByteBuffer, offset) + var same = record == new_record + println(same) + same } } val is = new DataByteArrayInputStream(record) val prefix = is.readByte() - if( prefix != LOG_HEADER_PREFIX ) { + if (prefix != LOG_HEADER_PREFIX) { assert(record_is_not_changing) - throw new IOException("invalid record at position: "+record_position+" in file: "+file+", offset: "+offset) + throw new IOException("invalid record at position: " + record_position + " in file: " + file + ", offset: " + offset) } val kind = is.readByte() @@ -259,10 +264,10 @@ case class RecordLog(directory: File, lo val data = is.readBuffer(length) // If your reading the whole record we can verify the data checksum - if( expectedLength == length ) { - if( expectedChecksum != checksum(data) ) { + if (expectedLength == length) { + if (expectedChecksum != checksum(data)) { assert(record_is_not_changing) - throw new IOException("checksum does not match at position: "+record_position+" in file: "+file+", offset: "+offset) + throw new IOException("checksum does not match at position: " + record_position + " in file: " + file + ", offset: " + offset) } } @@ -274,13 +279,13 @@ case class RecordLog(directory: File, lo } } - def read(record_position:Long) = { - val offset = record_position-position + def read(record_position: Long) = { + val offset = record_position - position val header = new Buffer(LOG_HEADER_SIZE) channel.read(header.toByteBuffer, offset) val is = header.bigEndianEditor(); val prefix = is.readByte() - if( prefix != LOG_HEADER_PREFIX ) { + if (prefix != LOG_HEADER_PREFIX) { // Does not look like a record. throw new IOException("invalid record position") } @@ -289,32 +294,32 @@ case class RecordLog(directory: File, lo val length = is.readInt() val data = new Buffer(length) - if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) { + if (channel.read(data.toByteBuffer, offset + LOG_HEADER_SIZE) != length) { throw new IOException("short record") } - if(verify_checksums) { - if( expectedChecksum != checksum(data) ) { + if (verify_checksums) { + if (expectedChecksum != checksum(data)) { throw new IOException("checksum does not match") } } - (id, data, record_position+LOG_HEADER_SIZE+length) + (id, data, record_position + LOG_HEADER_SIZE + length) } - def check(record_position:Long):Option[(Long, Option[Long])] = { - var offset = record_position-position + def check(record_position: Long): Option[(Long, Option[Long])] = { + var offset = record_position - position val header = new Buffer(LOG_HEADER_SIZE) channel.read(header.toByteBuffer, offset) val is = header.bigEndianEditor(); val prefix = is.readByte() - if( prefix != LOG_HEADER_PREFIX ) { + if (prefix != LOG_HEADER_PREFIX) { return None // Does not look like a record. } val kind = is.readByte() val expectedChecksum = is.readInt() val length = is.readInt() - val chunk = new Buffer(1024*4) + val chunk = new Buffer(1024 * 4) val chunkbb = chunk.toByteBuffer offset += LOG_HEADER_SIZE @@ -323,12 +328,12 @@ case class RecordLog(directory: File, lo // with a bad record length val checksumer = new CRC32 var remaining = length - while( remaining > 0 ) { - val chunkSize = remaining.min(1024*4); + while (remaining > 0) { + val chunkSize = remaining.min(1024 * 4); chunkbb.position(0) chunkbb.limit(chunkSize) channel.read(chunkbb, offset) - if( chunkbb.hasRemaining ) { + if (chunkbb.hasRemaining) { return None } checksumer.update(chunk.data, 0, chunkSize) @@ -336,27 +341,28 @@ case class RecordLog(directory: File, lo remaining -= chunkSize } - val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt - if( expectedChecksum != checksum ) { + val checksum = (checksumer.getValue & 0xFFFFFFFF).toInt + if (expectedChecksum != checksum) { return None } - val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk)) else None - return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos) + val uow_start_pos = if (kind == UOW_END_RECORD && length == 8) Some(decode_long(chunk)) else None + return Some(record_position + LOG_HEADER_SIZE + length, uow_start_pos) } - def verifyAndGetEndPosition:Long = { + def verifyAndGetEndPosition: Long = { var pos = position; var current_uow_start = pos - val limit = position+channel.size() - while(pos < limit) { + val limit = position + channel.size() + while (pos < limit) { check(pos) match { case Some((next, uow_start_pos)) => - uow_start_pos.foreach { uow_start_pos => - if( uow_start_pos == current_uow_start ) { - current_uow_start = next - } else { - return current_uow_start - } + uow_start_pos.foreach { + uow_start_pos => + if (uow_start_pos == current_uow_start) { + current_uow_start = next + } else { + return current_uow_start + } } pos = next case None => @@ -373,8 +379,8 @@ case class RecordLog(directory: File, lo def create_appender(position: Long): Any = { log_mutex.synchronized { - if(current_appender!=null) { - log_infos.put (position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset)) + if (current_appender != null) { + log_infos.put(position, new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset)) } current_appender = create_log_appender(position) log_infos.put(position, current_appender.info) @@ -384,11 +390,12 @@ case class RecordLog(directory: File, lo def open = { log_mutex.synchronized { log_infos.clear() - LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) => - log_infos.put(position, LogInfo(file, position, file.length())) + LevelDBClient.find_sequence_files(directory, logSuffix).foreach { + case (position, file) => + log_infos.put(position, LogInfo(file, position, file.length())) } - val appendPos = if( log_infos.isEmpty ) { + val appendPos = if (log_infos.isEmpty) { 0L } else { val file = log_infos.lastEntry().getValue @@ -397,9 +404,9 @@ case class RecordLog(directory: File, lo val actualLength = r.verifyAndGetEndPosition val updated = file.copy(length = actualLength - file.position) log_infos.put(updated.position, updated) - if( updated.file.length != file.length ) { + if (updated.file.length != file.length) { // we need to truncate. - using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length)) + using(new RandomAccessFile(file.file, "rw"))(_.setLength(updated.length)) } actualLength } finally { @@ -418,23 +425,24 @@ case class RecordLog(directory: File, lo } def appender_limit = current_appender.append_position + def appender_start = current_appender.position - def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix) + def next_log(position: Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix) - def appender[T](func: (LogAppender)=>T):T= { + def appender[T](func: (LogAppender) => T): T = { val intial_position = current_appender.append_position try { val rc = func(current_appender) - if( current_appender.append_position != intial_position ) { + if (current_appender.append_position != intial_position) { // Record a UOW_END_RECORD so that on recovery we only replay full units of work. - current_appender.append(UOW_END_RECORD,encode_long(intial_position)) + current_appender.append(UOW_END_RECORD, encode_long(intial_position)) } rc } finally { current_appender.flush log_mutex.synchronized { - if ( current_appender.append_offset >= logSize ) { + if (current_appender.append_offset >= logSize) { current_appender.release() on_log_rotate() create_appender(current_appender.append_position) @@ -443,7 +451,7 @@ case class RecordLog(directory: File, lo } } - var on_log_rotate: ()=>Unit = ()=>{} + var on_log_rotate: () => Unit = () => {} private val reader_cache = new LRUCache[File, LogReader](100) { protected override def onCacheEviction(entry: Entry[File, LogReader]) = { @@ -451,51 +459,56 @@ case class RecordLog(directory: File, lo } } - def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue) } + def log_info(pos: Long) = log_mutex.synchronized { + Option(log_infos.floorEntry(pos)).map(_.getValue) + } - private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = { + private def get_reader[T](record_position: Long)(func: (LogReader) => T) = { val lookup = log_mutex.synchronized { val info = log_info(record_position) - info.map { info=> - if(info.position == current_appender.position) { - current_appender.retain() - (info, current_appender) - } else { - (info, null) - } + info.map { + info => + if (info.position == current_appender.position) { + current_appender.retain() + (info, current_appender) + } else { + (info, null) + } } } - lookup.map { case (info, appender) => - val reader = if( appender!=null ) { - // read from the current appender. - appender - } else { - // Checkout a reader from the cache... - reader_cache.synchronized { - var reader = reader_cache.get(info.file) - if(reader==null) { - reader = LogReader(info.file, info.position) - reader_cache.put(info.file, reader) + lookup.map { + case (info, appender) => + val reader = if (appender != null) { + // read from the current appender. + appender + } else { + // Checkout a reader from the cache... + reader_cache.synchronized { + var reader = reader_cache.get(info.file) + if (reader == null) { + reader = LogReader(info.file, info.position) + reader_cache.put(info.file, reader) + } + reader.retain() + reader } - reader.retain() - reader } - } - try { - func(reader) - } finally { - reader.release - } + try { + func(reader) + } finally { + reader.release + } } } - def read(pos:Long) = { + def read(pos: Long) = { get_reader(pos)(_.read(pos)) } - def read(pos:Long, length:Int) = { + + def read(pos: Long, length: Int) = { get_reader(pos)(_.read(pos, length)) }
Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala&r1=1245945&r2=1291054&rev=1291054&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStoreTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBStoreTest.scala Sun Feb 19 20:24:15 2012 @@ -1,3 +1,5 @@ +package org.apache.activemq.apollo.broker.store.leveldb + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -14,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store.leveldb import dto.LevelDBStoreDTO import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport} @@ -24,7 +25,7 @@ import org.apache.activemq.apollo.broker */ class LevelDBStoreTest extends StoreFunSuiteSupport { - def create_store(flushDelay:Long):Store = { + def create_store(flushDelay: Long): Store = { new LevelDBStore({ val rc = new LevelDBStoreDTO rc.directory = data_directory Copied: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala (from r1245945, activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala?p2=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala&p1=activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala&r1=1245945&r2=1291054&rev=1291054&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/PureJavaLevelDBStoreTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/PureJavaLevelDBStoreTest.scala Sun Feb 19 20:24:15 2012 @@ -1,4 +1,4 @@ -package org.apache.activemq.apollo.broker.store.leveldb.leveldb +package org.apache.activemq.apollo.broker.store.leveldb /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -16,6 +16,7 @@ package org.apache.activemq.apollo.broke * See the License for the specific language governing permissions and * limitations under the License. */ + import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport} import org.apache.activemq.apollo.broker.store.leveldb.LevelDBStore import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO @@ -26,7 +27,7 @@ import org.apache.activemq.apollo.util.F */ class PureJavaLevelDBStoreTest extends StoreFunSuiteSupport { - def create_store(flushDelay:Long):Store = { + def create_store(flushDelay: Long): Store = { new LevelDBStore({ val rc = new LevelDBStoreDTO rc.index_factory = "org.iq80.leveldb.impl.Iq80DBFactory"
