Author: chirino
Date: Wed Jan 25 03:58:36 2012
New Revision: 1235623
URL: http://svn.apache.org/viewvc?rev=1235623&view=rev
Log:
Support using log files > 2GB also fixes Int rollover bug if a log file went
longer than 2GB.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1235623&r1=1235622&r2=1235623&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Wed Jan 25 03:58:36 2012
@@ -216,16 +216,7 @@ class LevelDBClient(store: LevelDBStore)
}
def log_size = {
- Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
- if(size == MemoryPropertyEditor.parse("2G")) {
- Int.MaxValue // which is 2G - 1 (close enough!)
- } else if(size > Int.MaxValue) {
- warn("leveldb log_size was configured to be '"+config.log_size+"' but
the maximum supported log size is 2G")
- Int.MaxValue
- } else {
- size.toInt
- }
- }.getOrElse(1024 * 1024 * 100)
+ Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024
* 1024 * 100L)
}
def start() = {
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1235623&r1=1235622&r2=1235623&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Wed Jan 25 03:58:36 2012
@@ -70,7 +70,7 @@ case class RecordLog(directory: File, lo
directory.mkdirs()
- var logSize = 1024 * 1024 * 100
+ var logSize = 1024 * 1024 * 100L
var current_appender:LogAppender = _
var verify_checksums = false
var sync = false
@@ -122,9 +122,11 @@ case class RecordLog(directory: File, lo
channel.position(logSize-1)
channel.write(new Buffer(1).toByteBuffer)
channel.force(true)
- channel.position(0)
+ if( sync ) {
+ channel.position(0)
+ }
- val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE)
+ val write_buffer = new
DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
def force = {
flush
@@ -194,7 +196,7 @@ case class RecordLog(directory: File, lo
}
}
- override def check_read_flush(end_offset:Int) = {
+ override def check_read_flush(end_offset:Long) = {
if( flushed_offset.get() < end_offset ) {
this.synchronized {
println("read flush")
@@ -216,10 +218,12 @@ case class RecordLog(directory: File, lo
fd.close()
}
- def check_read_flush(end_offset:Int) = {}
+ def check_read_flush(end_offset:Long) = {}
def read(record_position:Long, length:Int) = {
- val offset = (record_position-position).toInt
+ val offset = record_position-position
+ assert(offset >=0 )
+
check_read_flush(offset+LOG_HEADER_SIZE+length)
if(verify_checksums) {
@@ -273,7 +277,7 @@ case class RecordLog(directory: File, lo
}
def read(record_position:Long) = {
- val offset = (record_position-position).toInt
+ val offset = record_position-position
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();
@@ -300,7 +304,7 @@ case class RecordLog(directory: File, lo
}
def check(record_position:Long):Option[(Long, Option[Long])] = {
- var offset = (record_position-position).toInt
+ var offset = record_position-position
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();