Author: chirino
Date: Fri May 24 22:33:32 2013
New Revision: 1486242
URL: http://svn.apache.org/r1486242
Log:
This should fixed the problem with the delayed leveldb index updates.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Modified:
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java
Fri May 24 22:33:32 2013
@@ -108,7 +108,4 @@ public interface LevelDBStoreViewMBean {
@MBeanInfo("Compacts disk usage")
void compact();
- @MBeanInfo("Are delayed index updates occurring?")
- boolean getDelayedIndexUpdates();
-
}
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Fri May 24 22:33:32 2013
@@ -331,7 +331,6 @@ class DelayableUOW(val manager:DBManager
val s = size
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
- countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
@@ -353,11 +352,11 @@ class DelayableUOW(val manager:DBManager
asyncCapacityUsed = 0
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
- countDownFuture.set(null)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
}
+ countDownFuture.set(null)
for( (id, action) <- actions ) {
if( !action.enqueues.isEmpty ) {
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1486242&r1=1486241&r2=1486242&view=diff
==============================================================================
---
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Fri May 24 22:33:32 2013
@@ -98,7 +98,6 @@ class LevelDBStoreView(val store:LevelDB
def getParanoidChecks = paranoidChecks
def getSync = sync
def getVerifyChecksums = verifyChecksums
- def getDelayedIndexUpdates = delayedIndexUpdates
def getUowClosedCounter = db.uowClosedCounter
def getUowCanceledCounter = db.uowCanceledCounter
@@ -184,7 +183,6 @@ class LevelDBStore extends LockableServi
val topics = collection.mutable.HashMap[ActiveMQTopic,
LevelDBStore#LevelDBTopicMessageStore]()
val topicsById = collection.mutable.HashMap[Long,
LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
- var delayedIndexUpdates = false
def init() = {}
@@ -708,35 +706,9 @@ class LevelDBStore extends LockableServi
}
def recoverNextMessages(maxReturned: Int, listener:
MessageRecoveryListener): Unit = {
- var found = false
- var counter = 0;
- while( !found ) {
val limiting = LimitingRecoveryListener(maxReturned, listener)
val excluding = PreparedExcluding(limiting)
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
- if( limiting.recovered > 0 ) {
- if( !delayedIndexUpdates && counter>0 ) {
- info("This machine seems to have delayed index updates.")
- delayedIndexUpdates = true
- }
- found = true
- } else {
- // Seems like on some systems it takes a while for leveldb index
updates
- // to become visible for read. Need to figure out why this is, but
until
- // then, lets loop until we can read it.
- if( counter > 10 ) {
- found = true
- } else {
- counter+=1
- // lets try to sync up /w the write thread..
- val t = new CountDownLatch(1)
- client.writeExecutorExec {
- t.countDown()
- }
- t.await()
- }
- }
- }
}
override def setBatch(id: MessageId): Unit = {