Thx for the heads up.. Fixed now.

On Fri, Nov 29, 2013 at 7:44 AM, Gary Tully <[email protected]> wrote:
> Hiram, there is a regression in
> org.apache.activemq.store.LevelDBStorePerDestinationTest
> seems nothing is going to terminate the connection in this case.
> Skipped the test as it was hanging the ci builds.
> see: 
> https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a
>
> On 25 November 2013 18:22,  <[email protected]> wrote:
>> Updated Branches:
>>   refs/heads/trunk 00cb9a566 -> b0e91d47f
>>
>>
>> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions 
>> so that the clients retry try the operations instead of giving up.  Also 
>> retry the problemantic getMessage() call which seems to fail at times.
>>
>> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
>> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
>> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47
>>
>> Branch: refs/heads/trunk
>> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
>> Parents: 00cb9a5
>> Author: Hiram Chirino <[email protected]>
>> Authored: Mon Nov 25 13:17:58 2013 -0500
>> Committer: Hiram Chirino <[email protected]>
>> Committed: Mon Nov 25 13:17:58 2013 -0500
>>
>> ----------------------------------------------------------------------
>>  .../activemq/broker/SuppressReplyException.java |  8 +++++++
>>  .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
>>  .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
>>  .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
>>  4 files changed, 32 insertions(+), 7 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> ----------------------------------------------------------------------
>> diff --git 
>> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>>  
>> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> index eb54a12..f2c6502 100644
>> --- 
>> a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> +++ 
>> b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> @@ -26,6 +26,14 @@ import java.io.IOException;
>>   *
>>   */
>>  public class SuppressReplyException extends RuntimeException {
>> +    public SuppressReplyException(Throwable cause) {
>> +        super(cause);
>> +    }
>> +
>> +    public SuppressReplyException(String reason) {
>> +        super(reason);
>> +    }
>> +
>>      public SuppressReplyException(String reason, IOException cause) {
>>          super(reason, cause);
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> ----------------------------------------------------------------------
>> diff --git 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>>  
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> index e467379..00260d9 100644
>> --- 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> +++ 
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
>>    def getMessage(x: MessageId):Message = {
>>      val id = 
>> Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
>>      val locator = id.getDataLocator()
>> -    val msg = client.getMessage(locator)
>> +    val msg = client.getMessageWithRetry(locator)
>>      msg.setMessageId(id)
>>      msg
>>    }
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> ----------------------------------------------------------------------
>> diff --git 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>>  
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> index 15f7bb0..c0cedce 100755
>> --- 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> +++ 
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
>>  import org.apache.activemq.leveldb.EntryLocator
>>  import org.apache.activemq.leveldb.DataLocator
>>  import org.fusesource.hawtbuf.ByteArrayOutputStream
>> +import org.apache.activemq.broker.SuppressReplyException
>>
>>  /**
>>   * @author <a href="http://hiramchirino.com";>Hiram Chirino</a>
>> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
>>            Thread.sleep(100);
>>          }
>>        }
>> -      throw failure;
>> +      throw new SuppressReplyException(failure);
>>      }
>>      try {
>>        func
>> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
>>      collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
>>        val seq = decodeLong(key)
>>        var locator = DataLocator(store, value.getValueLocation, 
>> value.getValueLength)
>> -      val msg = getMessage(locator)
>> +      val msg = getMessageWithRetry(locator)
>>        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>>        msg.getMessageId().setDataLocator(locator)
>>        msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
>> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
>>          func(XaAckRecord(collectionKey, seq, ack, sub))
>>        } else {
>>          var locator = DataLocator(store, value.getValueLocation, 
>> value.getValueLength)
>> -        val msg = getMessage(locator)
>> +        val msg = getMessageWithRetry(locator)
>>          msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>>          msg.getMessageId().setDataLocator(locator)
>>          func(msg)
>> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
>>      }
>>    }
>>
>> +  def getMessageWithRetry(locator:AnyRef):Message = {
>> +    var retry = 0
>> +    var rc = getMessage(locator);
>> +    while( rc == null ) {
>> +      if( retry > 10 )
>> +        return null;
>> +      Thread.sleep(retry*10)
>> +      rc = getMessage(locator);
>> +      retry+=1
>> +    }
>> +    if( retry > 0 ) {
>> +      info("Recovered from 'failed getMessage' on retry: "+retry)
>> +    }
>> +    rc
>> +  }
>> +
>>    def getMessage(locator:AnyRef):Message = {
>>      assert(locator!=null)
>>      val buffer = locator match {
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> ----------------------------------------------------------------------
>> diff --git 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>>  
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> index 322656f..e4c7a02 100644
>> --- 
>> a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> +++ 
>> b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> @@ -17,7 +17,7 @@
>>
>>  package org.apache.activemq.leveldb
>>
>> -import org.apache.activemq.broker.{LockableServiceSupport, 
>> BrokerServiceAware, ConnectionContext}
>> +import org.apache.activemq.broker.{SuppressReplyException, 
>> LockableServiceSupport, BrokerServiceAware, ConnectionContext}
>>  import org.apache.activemq.command._
>>  import org.apache.activemq.openwire.OpenWireFormat
>>  import org.apache.activemq.usage.SystemUsage
>> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with 
>> BrokerServiceAware with P
>>
>>    def check_running = {
>>      if( this.isStopped ) {
>> -      throw new IOException("Store has been stopped")
>> +      throw new SuppressReplyException("Store has been stopped")
>>      }
>>    }
>>
>> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with 
>> BrokerServiceAware with P
>>    def verify_running = {
>>      if( isStopping || isStopped ) {
>>        try {
>> -        throw new IOException("Not running")
>> +        throw new SuppressReplyException("Not running")
>>        } catch {
>>          case e:IOException =>
>>            if( broker_service!=null ) {
>>
>
>
>
> --
> http://redhat.com
> http://blog.garytully.com



-- 
Hiram Chirino

Engineering | Red Hat, Inc.

[email protected] | fusesource.com | redhat.com

skype: hiramchirino | twitter: @hiramchirino

blog: Hiram Chirino's Bit Mojo

Reply via email to