Author: chirino
Date: Thu Feb 16 14:10:32 2012
New Revision: 1244981
URL: http://svn.apache.org/viewvc?rev=1244981&view=rev
Log:
Fixes APLO-161 : LevelDB base storage slows down once you have several hundred
log files hold message data.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Feb 16 14:10:32 2012
@@ -1007,7 +1007,7 @@ class LevelDBClient(store: LevelDBStore)
// TODO:
// Perhaps we should snapshot_index if the current snapshot is old.
//
-
+ import collection.JavaConversions._
last_index_snapshot_pos
val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Thu Feb 16 14:10:32 2012
@@ -270,6 +270,7 @@ class LevelDBStore(val config:LevelDBSto
rc.log_append_pos = client.log.appender_limit
rc.index_snapshot_pos = client.last_index_snapshot_pos
rc.log_stats = {
+ import collection.JavaConversions._
var row_layout = "%-20s | %-10s | %-10s\n"
row_layout.format("Log File", "Msg Refs", "File Size")+
client.log.log_infos.map{case (id,info)=> id ->
client.log_refs.get(id).map(_.get)}.toSeq.flatMap { case (id, refs)=>
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1244981&r1=1244980&r2=1244981&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Feb 16 14:10:32 2012
@@ -21,13 +21,13 @@ import java.{util=>ju}
import java.util.zip.CRC32
import java.util.Map.Entry
-import collection.immutable.TreeMap
import java.util.concurrent.atomic.AtomicLong
import java.io._
import org.apache.activemq.apollo.util.FileSupport._
import org.apache.activemq.apollo.util.{Log, LRUCache}
import org.fusesource.hawtdispatch.BaseRetained
import org.fusesource.hawtbuf.{DataByteArrayInputStream,
DataByteArrayOutputStream, Buffer}
+import java.util.TreeMap
object RecordLog extends Log {
@@ -76,16 +76,16 @@ case class RecordLog(directory: File, lo
var sync = false
- var log_infos = TreeMap[Long, LogInfo]()
+ val log_infos = new TreeMap[Long, LogInfo]()
object log_mutex
def delete(id:Long) = {
log_mutex.synchronized {
// We can't delete the current appender.
if( current_appender.position != id ) {
- log_infos.get(id).foreach { info =>
+ Option(log_infos.get(id)).foreach { info =>
onDelete(info.file)
- log_infos = log_infos.filterNot(_._1 == id)
+ log_infos.remove(id)
}
}
}
@@ -377,28 +377,29 @@ case class RecordLog(directory: File, lo
def create_appender(position: Long): Any = {
log_mutex.synchronized {
if(current_appender!=null) {
- log_infos += position -> new LogInfo(current_appender.file,
current_appender.position, current_appender.append_offset)
+ log_infos.put (position, new LogInfo(current_appender.file,
current_appender.position, current_appender.append_offset))
}
current_appender = create_log_appender(position)
- log_infos += position -> current_appender.info
+ log_infos.put(position, current_appender.info)
}
}
def open = {
log_mutex.synchronized {
- log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map
{ case (position,file) =>
- position -> LogInfo(file, position, file.length())
+ log_infos.clear()
+ LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case
(position,file) =>
+ log_infos.put(position, LogInfo(file, position, file.length()))
}
val appendPos = if( log_infos.isEmpty ) {
0L
} else {
- val (_, file) = log_infos.last
+ val file = log_infos.lastEntry().getValue
val r = LogReader(file.file, file.position)
try {
val actualLength = r.verifyAndGetEndPosition
val updated = file.copy(length = actualLength - file.position)
- log_infos = log_infos + (updated.position->updated)
+ log_infos.put(updated.position, updated)
if( updated.file.length != file.length ) {
// we need to truncate.
using(new RandomAccessFile(file.file, "rw")) (
_.setLength(updated.length))
@@ -453,7 +454,7 @@ case class RecordLog(directory: File, lo
}
}
- def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L,
pos+1).lastOption.map(_._2))
+ def log_info(pos:Long) = log_mutex.synchronized {
Option(log_infos.floorEntry(pos)).map(_.getValue) }
private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {