QPID-7752: Producer message flow control for AMQP 1.0 should not effect flow of transactions
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e1405a35 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e1405a35 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e1405a35 Branch: refs/heads/master Commit: e1405a357696c28ebd9c4fb6b2c67a798cce8384 Parents: b8981a7 Author: Alex Rudyy <[email protected]> Authored: Mon Apr 24 15:08:05 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Tue Apr 25 11:41:14 2017 +0100 ---------------------------------------------------------------------- .../v1_0/AbstractReceivingLinkEndpoint.java | 11 ----------- .../qpid/server/protocol/v1_0/Session_1_0.java | 19 +++++++++---------- .../v1_0/StandardReceivingLinkEndpoint.java | 11 +++++++++++ 3 files changed, 20 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1405a35/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java index fd19d73..abc82a9 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java @@ -45,7 +45,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend { private final SectionDecoder _sectionDecoder; private UnsignedInteger _lastDeliveryId; - private ReceivingDestination _receivingDestination; private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>(); private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>(); private boolean _creditWindow; @@ -157,16 +156,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend protected abstract Error messageTransfer(final Transfer transfer); - public ReceivingDestination getReceivingDestination() - { - return _receivingDestination; - } - - public void setDestination(final ReceivingDestination receivingDestination) - { - _receivingDestination = receivingDestination; - } - @Override public void receiveFlow(final Flow flow) { setAvailable(flow.getAvailable()); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1405a35/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 4ad15b8..9a39007 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -1240,8 +1240,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (linkEndpoint instanceof AbstractReceivingLinkEndpoint - && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) + if (linkEndpoint instanceof StandardReceivingLinkEndpoint + && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { linkEndpoint.setStopped(true); } @@ -1280,8 +1280,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget } for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (linkEndpoint instanceof AbstractReceivingLinkEndpoint - && isQueueDestinationForLink(queue, ((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) + if (linkEndpoint instanceof StandardReceivingLinkEndpoint + && isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { linkEndpoint.setStopped(false); } @@ -1311,7 +1311,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (linkEndpoint instanceof AbstractReceivingLinkEndpoint) + if (linkEndpoint instanceof StandardReceivingLinkEndpoint) { linkEndpoint.setStopped(true); } @@ -1343,8 +1343,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget } for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet()) { - if (linkEndpoint instanceof AbstractReceivingLinkEndpoint - && !_blockingEntities.contains(((AbstractReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) + if (linkEndpoint instanceof StandardReceivingLinkEndpoint + && !_blockingEntities.contains(((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination())) { linkEndpoint.setStopped(false); } @@ -1631,10 +1631,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget } else { - if (endpoint.getRole() == Role.RECEIVER + if (endpoint instanceof StandardReceivingLinkEndpoint && (_blockingEntities.contains(Session_1_0.this) - || (endpoint instanceof StandardReceivingLinkEndpoint - && _blockingEntities.contains(((AbstractReceivingLinkEndpoint) endpoint).getReceivingDestination())))) + || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination()))) { endpoint.setStopped(true); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e1405a35/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 339a167..53fedb9 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -77,6 +77,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint private boolean _resumedMessage; private Binary _messageDeliveryTag; private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>()); + private ReceivingDestination _receivingDestination; public StandardReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Target> link) @@ -494,6 +495,16 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint _localUnsettled = new HashMap(_unsettledMap); } + public ReceivingDestination getReceivingDestination() + { + return _receivingDestination; + } + + public void setDestination(final ReceivingDestination receivingDestination) + { + _receivingDestination = receivingDestination; + } + @Override protected void recoverLink(final Attach attach) throws AmqpErrorException { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
