Author: rgodfrey
Date: Tue Jun  2 11:04:44 2015
New Revision: 1683079

URL: http://svn.apache.org/r1683079
Log:
QPID-6566 : Fix issues highlighted by the Joram JMS tests

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 Tue Jun  2 11:04:44 2015
@@ -64,9 +64,13 @@ public abstract class AbstractConsumerTa
             sendNextMessage();
         }
 
+        processStateChanged();
+
         processClosed();
     }
 
+    protected abstract void processStateChanged();
+
     protected abstract void processClosed();
 
     @Override
@@ -169,7 +173,6 @@ public abstract class AbstractConsumerTa
     public final long send(final ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
     {
         _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-
         getSessionModel().getConnectionModel().notifyWork();
         return entry.getMessage().getSize();
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 Tue Jun  2 11:04:44 2015
@@ -666,4 +666,10 @@ public class ConsumerTarget_0_10 extends
     {
 
     }
+
+    @Override
+    protected void processStateChanged()
+    {
+
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 Tue Jun  2 11:04:44 2015
@@ -524,6 +524,12 @@ public abstract class ConsumerTarget_0_8
         }
     }
 
+    @Override
+    protected void processStateChanged()
+    {
+
+    }
+
     public void flushBatched()
     {
         _channel.getConnection().setDeferFlush(false);

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Tue Jun  2 11:04:44 2015
@@ -550,4 +550,15 @@ public class Connection_1_0 implements C
         }
 
     }
+
+    @Override
+    public String toString()
+    {
+        return "Connection_1_0["
+               +  _connectionId
+               + " "
+               + _protocolEngine.getRemoteAddress().toString()
+               + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+               + ']';
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Tue Jun  2 11:04:44 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.server.protocol.
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.amqp_1_0.codec.ValueHandler;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.util.Conne
 
 class ConsumerTarget_1_0 extends AbstractConsumerTarget
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerTarget_1_0.class);
     private final boolean _acquires;
     private SendingLink_1_0 _link;
 
@@ -63,6 +67,7 @@ class ConsumerTarget_1_0 extends Abstrac
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private final SectionEncoder _sectionEncoder;
     private ConsumerImpl _consumer;
+    private boolean _queueEmpty;
 
     public ConsumerTarget_1_0(final SendingLink_1_0 link,
                               boolean acquires)
@@ -322,10 +327,7 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         synchronized(_link.getLock())
         {
-            if(_link.drained())
-            {
-                updateState(State.ACTIVE, State.SUSPENDED);
-            }
+            _queueEmpty = true;
         }
     }
 
@@ -547,4 +549,21 @@ class ConsumerTarget_1_0 extends Abstrac
     {
 
     }
+
+    @Override
+    protected void processStateChanged()
+    {
+        synchronized (_link.getLock())
+        {
+            if(_queueEmpty)
+            {
+                _queueEmpty = false;
+
+                if(_link.drained())
+                {
+                    updateState(State.ACTIVE, State.SUSPENDED);
+                }
+            }
+        }
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Tue Jun  2 11:04:44 2015
@@ -250,6 +250,8 @@ public class Session_1_0 implements Sess
                 sendingLinkEndpoint.setLinkEventListener(new 
SubjectSpecificSendingLinkListener(previousLink));
                 link = previousLink;
                 
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+                registerConsumer(previousLink);
+
             }
         }
         else
@@ -971,4 +973,10 @@ public class Session_1_0 implements Sess
 
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "Session_1_0[" + _connection + ": " + 
_endpoint.getSendingChannel() + ']';
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to