Author: chirino
Date: Tue Nov 13 20:51:13 2012
New Revision: 1408952

URL: http://svn.apache.org/viewvc?rev=1408952&view=rev
Log:
Fixing amqp impl bugs.

Modified:
    
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
    
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java

Modified: 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
 Tue Nov 13 20:51:13 2012
@@ -949,6 +949,10 @@ class AmqpProtocolHandler extends Protoc
       val state = proton_delivery.getRemoteState();
       state match {
         case null =>
+          if( !proton_delivery.remotelySettled() ) {
+              proton_delivery.disposition(new Accepted());
+          }
+          settle(proton_delivery, Consumed, false);
         case accepted:Accepted =>
           if( !proton_delivery.remotelySettled() ) {
               proton_delivery.disposition(new Accepted());

Modified: 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
 Tue Nov 13 20:51:13 2012
@@ -162,12 +162,13 @@ public class AmqpSender extends AmqpLink
     @Override
     protected void processDelivery(Delivery delivery) {
         final MessageDelivery md  = (MessageDelivery) delivery.getContext();
-        if( delivery.remotelySettled() && delivery.getTag().length > 0 ) {
-            checkinTag(delivery.getTag());
-        }
-        final DeliveryState state = delivery.getRemoteState();
-        if( state!=null ) {
-            if( state instanceof Accepted) {
+        if( delivery.remotelySettled() ) {
+            if( delivery.getTag().length > 0 ) {
+                checkinTag(delivery.getTag());
+            }
+
+            final DeliveryState state = delivery.getRemoteState();
+            if( state==null || state instanceof Accepted) {
                 if( !delivery.remotelySettled() ) {
                     delivery.disposition(new Accepted());
                 }

Modified: 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java?rev=1408952&r1=1408951&r2=1408952&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
 Tue Nov 13 20:51:13 2012
@@ -201,7 +201,7 @@ public abstract class MessageDelivery ex
         addWatch(new Watch() {
             @Override
             public boolean execute() {
-                if( delivery!=null && (delivery.isSettled() || 
delivery.remotelySettled()) ) {
+                if( delivery!=null && delivery.isSettled() ) {
                     cb.onSuccess(delivery.getRemoteState());
                     return true;
                 }


Reply via email to