Thx for the heads up.. Fixed now. On Fri, Nov 29, 2013 at 7:44 AM, Gary Tully <[email protected]> wrote: > Hiram, there is a regression in > org.apache.activemq.store.LevelDBStorePerDestinationTest > seems nothing is going to terminate the connection in this case. > Skipped the test as it was hanging the ci builds. > see: > https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a > > On 25 November 2013 18:22, <[email protected]> wrote: >> Updated Branches: >> refs/heads/trunk 00cb9a566 -> b0e91d47f >> >> >> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions >> so that the clients retry try the operations instead of giving up. Also >> retry the problemantic getMessage() call which seems to fail at times. >> >> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo >> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47 >> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47 >> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47 >> >> Branch: refs/heads/trunk >> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b >> Parents: 00cb9a5 >> Author: Hiram Chirino <[email protected]> >> Authored: Mon Nov 25 13:17:58 2013 -0500 >> Committer: Hiram Chirino <[email protected]> >> Committed: Mon Nov 25 13:17:58 2013 -0500 >> >> ---------------------------------------------------------------------- >> .../activemq/broker/SuppressReplyException.java | 8 +++++++ >> .../org/apache/activemq/leveldb/DBManager.scala | 2 +- >> .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++--- >> .../apache/activemq/leveldb/LevelDBStore.scala | 6 ++--- >> 4 files changed, 32 insertions(+), 7 deletions(-) >> ---------------------------------------------------------------------- >> >> >> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java >> ---------------------------------------------------------------------- >> diff --git >> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java >> >> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java >> index eb54a12..f2c6502 100644 >> --- >> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java >> +++ >> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java >> @@ -26,6 +26,14 @@ import java.io.IOException; >> * >> */ >> public class SuppressReplyException extends RuntimeException { >> + public SuppressReplyException(Throwable cause) { >> + super(cause); >> + } >> + >> + public SuppressReplyException(String reason) { >> + super(reason); >> + } >> + >> public SuppressReplyException(String reason, IOException cause) { >> super(reason, cause); >> } >> >> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala >> ---------------------------------------------------------------------- >> diff --git >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala >> >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala >> index e467379..00260d9 100644 >> --- >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala >> +++ >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala >> @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) { >> def getMessage(x: MessageId):Message = { >> val id = >> Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) >> val locator = id.getDataLocator() >> - val msg = client.getMessage(locator) >> + val msg = client.getMessageWithRetry(locator) >> msg.setMessageId(id) >> msg >> } >> >> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala >> ---------------------------------------------------------------------- >> diff --git >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala >> >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala >> index 15f7bb0..c0cedce 100755 >> --- >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala >> +++ >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala >> @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord >> import org.apache.activemq.leveldb.EntryLocator >> import org.apache.activemq.leveldb.DataLocator >> import org.fusesource.hawtbuf.ByteArrayOutputStream >> +import org.apache.activemq.broker.SuppressReplyException >> >> /** >> * @author <a href="http://hiramchirino.com">Hiram Chirino</a> >> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) { >> Thread.sleep(100); >> } >> } >> - throw failure; >> + throw new SuppressReplyException(failure); >> } >> try { >> func >> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) { >> collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => >> val seq = decodeLong(key) >> var locator = DataLocator(store, value.getValueLocation, >> value.getValueLength) >> - val msg = getMessage(locator) >> + val msg = getMessageWithRetry(locator) >> msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) >> msg.getMessageId().setDataLocator(locator) >> msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) >> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) { >> func(XaAckRecord(collectionKey, seq, ack, sub)) >> } else { >> var locator = DataLocator(store, value.getValueLocation, >> value.getValueLength) >> - val msg = getMessage(locator) >> + val msg = getMessageWithRetry(locator) >> msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) >> msg.getMessageId().setDataLocator(locator) >> func(msg) >> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) { >> } >> } >> >> + def getMessageWithRetry(locator:AnyRef):Message = { >> + var retry = 0 >> + var rc = getMessage(locator); >> + while( rc == null ) { >> + if( retry > 10 ) >> + return null; >> + Thread.sleep(retry*10) >> + rc = getMessage(locator); >> + retry+=1 >> + } >> + if( retry > 0 ) { >> + info("Recovered from 'failed getMessage' on retry: "+retry) >> + } >> + rc >> + } >> + >> def getMessage(locator:AnyRef):Message = { >> assert(locator!=null) >> val buffer = locator match { >> >> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala >> ---------------------------------------------------------------------- >> diff --git >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala >> >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala >> index 322656f..e4c7a02 100644 >> --- >> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala >> +++ >> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala >> @@ -17,7 +17,7 @@ >> >> package org.apache.activemq.leveldb >> >> -import org.apache.activemq.broker.{LockableServiceSupport, >> BrokerServiceAware, ConnectionContext} >> +import org.apache.activemq.broker.{SuppressReplyException, >> LockableServiceSupport, BrokerServiceAware, ConnectionContext} >> import org.apache.activemq.command._ >> import org.apache.activemq.openwire.OpenWireFormat >> import org.apache.activemq.usage.SystemUsage >> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with >> BrokerServiceAware with P >> >> def check_running = { >> if( this.isStopped ) { >> - throw new IOException("Store has been stopped") >> + throw new SuppressReplyException("Store has been stopped") >> } >> } >> >> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with >> BrokerServiceAware with P >> def verify_running = { >> if( isStopping || isStopped ) { >> try { >> - throw new IOException("Not running") >> + throw new SuppressReplyException("Not running") >> } catch { >> case e:IOException => >> if( broker_service!=null ) { >> > > > > -- > http://redhat.com > http://blog.garytully.com
-- Hiram Chirino Engineering | Red Hat, Inc. [email protected] | fusesource.com | redhat.com skype: hiramchirino | twitter: @hiramchirino blog: Hiram Chirino's Bit Mojo
