Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so 
that the clients retry try the operations instead of giving up.  Also retry the 
problemantic getMessage() call which seems to fail at times.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/48a9edc8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/48a9edc8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/48a9edc8

Branch: refs/heads/activemq-5.9
Commit: 48a9edc8ccdd737af471044112e4bed9fddbc94b
Parents: 2119a70
Author: Hiram Chirino <[email protected]>
Authored: Mon Nov 25 13:17:58 2013 -0500
Committer: Hadrian Zbarcea <[email protected]>
Committed: Wed Mar 12 12:11:24 2014 -0400

----------------------------------------------------------------------
 .../activemq/broker/SuppressReplyException.java |  8 +++++++
 .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
 .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
 .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
 4 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/48a9edc8/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
index eb54a12..f2c6502 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
@@ -26,6 +26,14 @@ import java.io.IOException;
  *
  */
 public class SuppressReplyException extends RuntimeException {
+    public SuppressReplyException(Throwable cause) {
+        super(cause);
+    }
+
+    public SuppressReplyException(String reason) {
+        super(reason);
+    }
+
     public SuppressReplyException(String reason, IOException cause) {
         super(reason, cause);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/48a9edc8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index e467379..00260d9 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
   def getMessage(x: MessageId):Message = {
     val id = 
Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
     val locator = id.getDataLocator()
-    val msg = client.getMessage(locator)
+    val msg = client.getMessageWithRetry(locator)
     msg.setMessageId(id)
     msg
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/48a9edc8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 15f7bb0..c0cedce 100755
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
 import org.apache.activemq.leveldb.EntryLocator
 import org.apache.activemq.leveldb.DataLocator
 import org.fusesource.hawtbuf.ByteArrayOutputStream
+import org.apache.activemq.broker.SuppressReplyException
 
 /**
  * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
@@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
           Thread.sleep(100);
         }
       }
-      throw failure;
+      throw new SuppressReplyException(failure);
     }
     try {
       func
@@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
     collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
       val seq = decodeLong(key)
       var locator = DataLocator(store, value.getValueLocation, 
value.getValueLength)
-      val msg = getMessage(locator)
+      val msg = getMessageWithRetry(locator)
       msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
       msg.getMessageId().setDataLocator(locator)
       msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
@@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
         func(XaAckRecord(collectionKey, seq, ack, sub))
       } else {
         var locator = DataLocator(store, value.getValueLocation, 
value.getValueLength)
-        val msg = getMessage(locator)
+        val msg = getMessageWithRetry(locator)
         msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
         msg.getMessageId().setDataLocator(locator)
         func(msg)
@@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
+  def getMessageWithRetry(locator:AnyRef):Message = {
+    var retry = 0
+    var rc = getMessage(locator);
+    while( rc == null ) {
+      if( retry > 10 )
+        return null;
+      Thread.sleep(retry*10)
+      rc = getMessage(locator);
+      retry+=1
+    }
+    if( retry > 0 ) {
+      info("Recovered from 'failed getMessage' on retry: "+retry)
+    }
+    rc
+  }
+  
   def getMessage(locator:AnyRef):Message = {
     assert(locator!=null)
     val buffer = locator match {

http://git-wip-us.apache.org/repos/asf/activemq/blob/48a9edc8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 322656f..e4c7a02 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.activemq.leveldb
 
-import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, 
ConnectionContext}
+import org.apache.activemq.broker.{SuppressReplyException, 
LockableServiceSupport, BrokerServiceAware, ConnectionContext}
 import org.apache.activemq.command._
 import org.apache.activemq.openwire.OpenWireFormat
 import org.apache.activemq.usage.SystemUsage
@@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
 
   def check_running = {
     if( this.isStopped ) {
-      throw new IOException("Store has been stopped")
+      throw new SuppressReplyException("Store has been stopped")
     }
   }
 
@@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
   def verify_running = {
     if( isStopping || isStopped ) {
       try {
-        throw new IOException("Not running")
+        throw new SuppressReplyException("Not running")
       } catch {
         case e:IOException =>
           if( broker_service!=null ) {

Reply via email to