consolidate handling of consumer flow handling and subsequent transfer and drain flow responses
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b5e981c4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b5e981c4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b5e981c4 Branch: refs/heads/master Commit: b5e981c49d11cc925a4d55ffe5b5d5bf8c00e5d2 Parents: 1b75b7e Author: Robert Gemmell <[email protected]> Authored: Tue Dec 9 14:40:03 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Dec 9 14:40:03 2014 +0000 ---------------------------------------------------------------------- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 155 +++++++++++-------- 1 file changed, 93 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5e981c4/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 0152cbf..cca5ab5 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -707,61 +707,7 @@ public class TestAmqpPeer implements AutoCloseable public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher) { - Matcher<Boolean> drainMatcher = null; - if(drain) - { - drainMatcher = equalTo(true); - } - else - { - drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); - } - - final FlowMatcher flowMatcher = new FlowMatcher() - .withLinkCredit(creditMatcher) - .withHandle(Matchers.notNullValue()) - .withDrain(drainMatcher); - - if(drain && sendDrainFlowResponse) - { - final FlowFrame drainResponse = new FlowFrame(); - drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded - drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded - drainResponse.setLinkCredit(UnsignedInteger.ZERO); - drainResponse.setDrain(true); - - // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); - flowResponseSender.setValueProvider(new ValueProvider() - { - @Override - public void setValues() - { - flowResponseSender.setChannel(flowMatcher.getActualChannel()); - drainResponse.setHandle(calculateLinkHandle(flowMatcher)); - drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); - drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages. - drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); - } - }); - - flowMatcher.onSuccess(flowResponseSender); - } - - addHandler(flowMatcher); - } - - private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) { - UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle(); - - return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET)); - } - - private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) { - UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount(); - UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit(); - - return dc.add(lc); + expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null); } public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, @@ -770,7 +716,8 @@ public class TestAmqpPeer implements AutoCloseable final ApplicationPropertiesDescribedType appPropertiesDescribedType, final DescribedType content) { - expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, 1); + expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, + appPropertiesDescribedType, content, 1); } public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, @@ -780,19 +727,54 @@ public class TestAmqpPeer implements AutoCloseable final DescribedType content, final int count) { - if(count <= 0) + expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, + appPropertiesDescribedType, content, count, false, false, + Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1); + } + + public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, + final MessageAnnotationsDescribedType messageAnnotationsDescribedType, + final PropertiesDescribedType propertiesDescribedType, + final ApplicationPropertiesDescribedType appPropertiesDescribedType, + final DescribedType content, + final int count, + final boolean drain, + final boolean sendDrainFlowResponse, + Matcher<UnsignedInteger> creditMatcher, + final Integer nextIncomingId) + { + if (nextIncomingId == null && count > 0) + { + throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested"); + } + + Matcher<Boolean> drainMatcher = null; + if(drain) + { + drainMatcher = equalTo(true); + } + else { - throw new IllegalArgumentException("Message count must be >= 1"); + drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); } - int nextIncomingId = 1; // TODO: we shouldn't assume this will be the first transfer on the session + Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null; + if(nextIncomingId != null) + { + remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)); + } + else + { + remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE); + } final FlowMatcher flowMatcher = new FlowMatcher() .withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count))) - .withDrain(Matchers.anyOf(equalTo(false), nullValue())) - .withNextIncomingId(Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId))); + .withDrain(drainMatcher) + .withNextIncomingId(remoteNextIncomingIdMatcher); CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); + boolean addComposite = false; for(int i = 0; i < count; i++) { @@ -822,14 +804,63 @@ public class TestAmqpPeer implements AutoCloseable } }); + addComposite = true; composite.add(transferResponseSender); } - flowMatcher.onSuccess(composite); + if(drain && sendDrainFlowResponse) + { + final FlowFrame drainResponse = new FlowFrame(); + drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded + drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded + drainResponse.setLinkCredit(UnsignedInteger.ZERO); + drainResponse.setDrain(true); + + // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); + flowResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + flowResponseSender.setChannel(flowMatcher.getActualChannel()); + drainResponse.setHandle(calculateLinkHandle(flowMatcher)); + drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); + drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count)); + drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); + } + }); + + addComposite = true; + composite.add(flowResponseSender); + } + + if(addComposite) { + flowMatcher.onSuccess(composite); + } addHandler(flowMatcher); } + private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) { + UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle(); + + return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET)); + } + + private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) { + UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount(); + UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit(); + + return dc.add(lc); + } + + private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) { + UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId(); + + return nid.add(UnsignedInteger.valueOf(sentCount)); + } + private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType, final MessageAnnotationsDescribedType messageAnnotationsDescribedType, final PropertiesDescribedType propertiesDescribedType, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
