Author: chirino
Date: Thu May 23 18:33:06 2013
New Revision: 1485810
URL: http://svn.apache.org/r1485810
Log:
related to AMQ-4296 : Fixes leveldb store cursoring. It was recovering too
many messages and sometimes not the right messages.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1485810&r1=1485809&r2=1485810&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Thu May 23 18:33:06 2013
@@ -667,17 +667,24 @@ class DBManager(val parent:LevelDBStore)
}
def cursorMessages(key:Long, listener:MessageRecoveryListener,
startPos:Long) = {
- var nextPos = startPos;
- client.queueCursor(key, nextPos) { msg =>
+ var lastmsgid:MessageId = null
+ client.queueCursor(key, startPos) { msg =>
if( listener.hasSpace ) {
- listener.recoverMessage(msg)
- nextPos += 1
- true
+ if( listener.recoverMessage(msg) ) {
+ lastmsgid = msg.getMessageId
+ true
+ } else {
+ false
+ }
} else {
false
}
}
- nextPos
+ if( lastmsgid==null ) {
+ startPos
+ } else {
+ lastmsgid.getEntryLocator.asInstanceOf[EntryLocator].seq+1
+ }
}
def getXAActions(key:Long) = {
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1485810&r1=1485809&r2=1485810&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Thu May 23 18:33:06 2013
@@ -678,24 +678,26 @@ class LevelDBStore extends LockableServi
}
def recover(listener: MessageRecoveryListener): Unit = {
- cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
+ cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
}
- def preparedExcluding(listener: MessageRecoveryListener) = new
MessageRecoveryListener {
+ case class PreparedExcluding(listener: MessageRecoveryListener) extends
MessageRecoveryListener {
def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
def hasSpace = listener.hasSpace
def recoverMessageReference(ref: MessageId) = {
if (!preparedAcks.contains(ref)) {
listener.recoverMessageReference(ref)
+ } else {
+ true
}
- true
}
def recoverMessage(message: Message) = {
if (!preparedAcks.contains(message.getMessageId)) {
listener.recoverMessage(message)
+ } else {
+ true
}
- true
}
}
@@ -704,7 +706,8 @@ class LevelDBStore extends LockableServi
}
def recoverNextMessages(maxReturned: Int, listener:
MessageRecoveryListener): Unit = {
- cursorPosition = db.cursorMessages(key,
preparedExcluding(LimitingRecoveryListener(maxReturned, listener)),
cursorPosition)
+ val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned,
listener))
+ cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
}
override def setBatch(id: MessageId): Unit = {
@@ -714,7 +717,7 @@ class LevelDBStore extends LockableServi
}
case class LimitingRecoveryListener(max: Int, listener:
MessageRecoveryListener) extends MessageRecoveryListener {
- private var recovered: Int = 0
+ var recovered: Int = 0
def hasSpace = recovered < max
def recoverMessage(message: Message) = {
recovered += 1;
@@ -849,7 +852,7 @@ class LevelDBStore extends LockableServi
def recoverNextMessages(clientId: String, subscriptionName: String,
maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
- sub.cursorPosition = db.cursorMessages(key,
preparedExcluding(LimitingRecoveryListener(maxReturned, listener)),
sub.cursorPosition.max(sub.lastAckPosition+1))
+ sub.cursorPosition = db.cursorMessages(key,
PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)),
sub.cursorPosition.max(sub.lastAckPosition+1))
}
}