Hiram, there is a regression in org.apache.activemq.store.LevelDBStorePerDestinationTest seems nothing is going to terminate the connection in this case. Skipped the test as it was hanging the ci builds. see: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a
On 25 November 2013 18:22, <chir...@apache.org> wrote: > Updated Branches: > refs/heads/trunk 00cb9a566 -> b0e91d47f > > > Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions > so that the clients retry try the operations instead of giving up. Also > retry the problemantic getMessage() call which seems to fail at times. > > Project: http://git-wip-us.apache.org/repos/asf/activemq/repo > Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47 > Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47 > Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47 > > Branch: refs/heads/trunk > Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b > Parents: 00cb9a5 > Author: Hiram Chirino <hi...@hiramchirino.com> > Authored: Mon Nov 25 13:17:58 2013 -0500 > Committer: Hiram Chirino <hi...@hiramchirino.com> > Committed: Mon Nov 25 13:17:58 2013 -0500 > > ---------------------------------------------------------------------- > .../activemq/broker/SuppressReplyException.java | 8 +++++++ > .../org/apache/activemq/leveldb/DBManager.scala | 2 +- > .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++--- > .../apache/activemq/leveldb/LevelDBStore.scala | 6 ++--- > 4 files changed, 32 insertions(+), 7 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java > ---------------------------------------------------------------------- > diff --git > a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java > > b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java > index eb54a12..f2c6502 100644 > --- > a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java > +++ > b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java > @@ -26,6 +26,14 @@ import java.io.IOException; > * > */ > public class SuppressReplyException extends RuntimeException { > + public SuppressReplyException(Throwable cause) { > + super(cause); > + } > + > + public SuppressReplyException(String reason) { > + super(reason); > + } > + > public SuppressReplyException(String reason, IOException cause) { > super(reason, cause); > } > > http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala > ---------------------------------------------------------------------- > diff --git > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala > > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala > index e467379..00260d9 100644 > --- > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala > +++ > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala > @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) { > def getMessage(x: MessageId):Message = { > val id = > Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) > val locator = id.getDataLocator() > - val msg = client.getMessage(locator) > + val msg = client.getMessageWithRetry(locator) > msg.setMessageId(id) > msg > } > > http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala > ---------------------------------------------------------------------- > diff --git > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala > > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala > index 15f7bb0..c0cedce 100755 > --- > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala > +++ > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala > @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord > import org.apache.activemq.leveldb.EntryLocator > import org.apache.activemq.leveldb.DataLocator > import org.fusesource.hawtbuf.ByteArrayOutputStream > +import org.apache.activemq.broker.SuppressReplyException > > /** > * @author <a href="http://hiramchirino.com">Hiram Chirino</a> > @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) { > Thread.sleep(100); > } > } > - throw failure; > + throw new SuppressReplyException(failure); > } > try { > func > @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) { > collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => > val seq = decodeLong(key) > var locator = DataLocator(store, value.getValueLocation, > value.getValueLength) > - val msg = getMessage(locator) > + val msg = getMessageWithRetry(locator) > msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) > msg.getMessageId().setDataLocator(locator) > msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) > @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) { > func(XaAckRecord(collectionKey, seq, ack, sub)) > } else { > var locator = DataLocator(store, value.getValueLocation, > value.getValueLength) > - val msg = getMessage(locator) > + val msg = getMessageWithRetry(locator) > msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) > msg.getMessageId().setDataLocator(locator) > func(msg) > @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) { > } > } > > + def getMessageWithRetry(locator:AnyRef):Message = { > + var retry = 0 > + var rc = getMessage(locator); > + while( rc == null ) { > + if( retry > 10 ) > + return null; > + Thread.sleep(retry*10) > + rc = getMessage(locator); > + retry+=1 > + } > + if( retry > 0 ) { > + info("Recovered from 'failed getMessage' on retry: "+retry) > + } > + rc > + } > + > def getMessage(locator:AnyRef):Message = { > assert(locator!=null) > val buffer = locator match { > > http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala > ---------------------------------------------------------------------- > diff --git > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala > > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala > index 322656f..e4c7a02 100644 > --- > a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala > +++ > b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala > @@ -17,7 +17,7 @@ > > package org.apache.activemq.leveldb > > -import org.apache.activemq.broker.{LockableServiceSupport, > BrokerServiceAware, ConnectionContext} > +import org.apache.activemq.broker.{SuppressReplyException, > LockableServiceSupport, BrokerServiceAware, ConnectionContext} > import org.apache.activemq.command._ > import org.apache.activemq.openwire.OpenWireFormat > import org.apache.activemq.usage.SystemUsage > @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with > BrokerServiceAware with P > > def check_running = { > if( this.isStopped ) { > - throw new IOException("Store has been stopped") > + throw new SuppressReplyException("Store has been stopped") > } > } > > @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with > BrokerServiceAware with P > def verify_running = { > if( isStopping || isStopped ) { > try { > - throw new IOException("Not running") > + throw new SuppressReplyException("Not running") > } catch { > case e:IOException => > if( broker_service!=null ) { > -- http://redhat.com http://blog.garytully.com