Author: rgodfrey
Date: Tue Jun 2 11:04:44 2015
New Revision: 1683079
URL: http://svn.apache.org/r1683079
Log:
QPID-6566 : Fix issues highlighted by the Joram JMS tests
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
Tue Jun 2 11:04:44 2015
@@ -64,9 +64,13 @@ public abstract class AbstractConsumerTa
sendNextMessage();
}
+ processStateChanged();
+
processClosed();
}
+ protected abstract void processStateChanged();
+
protected abstract void processClosed();
@Override
@@ -169,7 +173,6 @@ public abstract class AbstractConsumerTa
public final long send(final ConsumerImpl consumer, MessageInstance entry,
boolean batch)
{
_queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
-
getSessionModel().getConnectionModel().notifyWork();
return entry.getMessage().getSize();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
Tue Jun 2 11:04:44 2015
@@ -666,4 +666,10 @@ public class ConsumerTarget_0_10 extends
{
}
+
+ @Override
+ protected void processStateChanged()
+ {
+
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Tue Jun 2 11:04:44 2015
@@ -524,6 +524,12 @@ public abstract class ConsumerTarget_0_8
}
}
+ @Override
+ protected void processStateChanged()
+ {
+
+ }
+
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Tue Jun 2 11:04:44 2015
@@ -550,4 +550,15 @@ public class Connection_1_0 implements C
}
}
+
+ @Override
+ public String toString()
+ {
+ return "Connection_1_0["
+ + _connectionId
+ + " "
+ + _protocolEngine.getRemoteAddress().toString()
+ + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+ + ']';
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Tue Jun 2 11:04:44 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.util.Conne
class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private final boolean _acquires;
private SendingLink_1_0 _link;
@@ -63,6 +67,7 @@ class ConsumerTarget_1_0 extends Abstrac
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
private ConsumerImpl _consumer;
+ private boolean _queueEmpty;
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
@@ -322,10 +327,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
synchronized(_link.getLock())
{
- if(_link.drained())
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
+ _queueEmpty = true;
}
}
@@ -547,4 +549,21 @@ class ConsumerTarget_1_0 extends Abstrac
{
}
+
+ @Override
+ protected void processStateChanged()
+ {
+ synchronized (_link.getLock())
+ {
+ if(_queueEmpty)
+ {
+ _queueEmpty = false;
+
+ if(_link.drained())
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ }
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1683079&r1=1683078&r2=1683079&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Tue Jun 2 11:04:44 2015
@@ -250,6 +250,8 @@ public class Session_1_0 implements Sess
sendingLinkEndpoint.setLinkEventListener(new
SubjectSpecificSendingLinkListener(previousLink));
link = previousLink;
endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ registerConsumer(previousLink);
+
}
}
else
@@ -971,4 +973,10 @@ public class Session_1_0 implements Sess
}
}
+
+ @Override
+ public String toString()
+ {
+ return "Session_1_0[" + _connection + ": " +
_endpoint.getSendingChannel() + ']';
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]