Repository: qpid-jms Updated Branches: refs/heads/master 92112063a -> 51a27a4ab
update to allow test to receive multiple messages, plus some extra logging and renaming for clarity Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c8397730 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c8397730 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c8397730 Branch: refs/heads/master Commit: c83977303dec6ded3b78200129a199b3c61a6f1a Parents: 9211206 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 17 15:36:09 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 17 15:36:09 2014 +0000 ---------------------------------------------------------------------- .../qpid/jms/test/testpeer/FrameSender.java | 12 +-- .../FrameWithNoPayloadMatchingHandler.java | 1 + .../FrameWithPayloadMatchingHandler.java | 1 + .../qpid/jms/test/testpeer/TestAmqpPeer.java | 88 ++++++++++++++------ 4 files changed, 72 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8397730/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java index 1a139cd..4dbdef7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameSender.java @@ -24,19 +24,19 @@ class FrameSender implements AmqpPeerRunnable { private final TestAmqpPeer _testAmqpPeer; private final FrameType _type; - private final ListDescribedType _listDescribedType; - private final Binary _payload; + private final ListDescribedType _frameDescribedType; + private final Binary _framePayload; private ValueProvider _valueProvider; private int _channel; private boolean _deferWrite = false; - FrameSender(TestAmqpPeer testAmqpPeer, FrameType type, int channel, ListDescribedType listDescribedType, Binary payload) + FrameSender(TestAmqpPeer testAmqpPeer, FrameType type, int channel, ListDescribedType frameDescribedType, Binary framePayload) { _testAmqpPeer = testAmqpPeer; _type = type; _channel = channel; - _listDescribedType = listDescribedType; - _payload = payload; + _frameDescribedType = frameDescribedType; + _framePayload = framePayload; } @Override @@ -47,7 +47,7 @@ class FrameSender implements AmqpPeerRunnable _valueProvider.setValues(); } - _testAmqpPeer.sendFrame(_type, _channel, _listDescribedType, _payload, _deferWrite); + _testAmqpPeer.sendFrame(_type, _channel, _frameDescribedType, _framePayload, _deferWrite); } public FrameSender setValueProvider(ValueProvider valueProvider) http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8397730/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java index 84e949f..a37c831 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithNoPayloadMatchingHandler.java @@ -37,6 +37,7 @@ public class FrameWithNoPayloadMatchingHandler extends AbstractFrameFieldAndPayl @Override protected void verifyPayload(Binary payload) { + _logger.debug("About to check that there is no payload"); if(payload != null && payload.getLength() > 0) { throw new IllegalArgumentException("Expected no payload but received payload of length: " + payload.getLength()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8397730/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java index 3aceb8b..51c09db 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameWithPayloadMatchingHandler.java @@ -47,6 +47,7 @@ public class FrameWithPayloadMatchingHandler extends AbstractFrameFieldAndPayloa @Override protected void verifyPayload(Binary payload) { + _logger.debug("About to check the payload" + "\n Received: {}", payload); if(_payloadMatcher != null) { assertThat("Payload should match", payload, _payloadMatcher); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8397730/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index adc4fe9..cfefe74 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -237,15 +237,15 @@ public class TestAmqpPeer implements AutoCloseable _driverRunnable.sendBytes(header); } - public void sendFrame(FrameType type, int channel, DescribedType describedType, Binary payload, boolean deferWrite) + public void sendFrame(FrameType type, int channel, DescribedType frameDescribedType, Binary framePayload, boolean deferWrite) { if(channel < 0) { throw new IllegalArgumentException("Frame must be sent on a channel >= 0"); } - LOGGER.debug("About to send: {}", describedType); - byte[] output = AmqpDataFramer.encodeFrame(type, channel, describedType, payload); + LOGGER.debug("About to send: {}", frameDescribedType); + byte[] output = AmqpDataFramer.encodeFrame(type, channel, frameDescribedType, framePayload); if(deferWrite && _deferredBytes == null) { @@ -696,21 +696,76 @@ public class TestAmqpPeer implements AutoCloseable } public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, + final MessageAnnotationsDescribedType messageAnnotationsDescribedType, + final PropertiesDescribedType propertiesDescribedType, + final ApplicationPropertiesDescribedType appPropertiesDescribedType, + final DescribedType content) + { + expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, 1); + } + + public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, final MessageAnnotationsDescribedType messageAnnotationsDescribedType, final PropertiesDescribedType propertiesDescribedType, final ApplicationPropertiesDescribedType appPropertiesDescribedType, - final DescribedType content) + final DescribedType content, + final int count) { + if(count <= 0) + { + throw new IllegalArgumentException("Message count must be >= 1"); + } + + int nextIncomingId = 0; // TODO: we shouldn't assume this will be the first transfer on the session + final FlowMatcher flowMatcher = new FlowMatcher() - .withLinkCredit(Matchers.greaterThan(UnsignedInteger.ZERO)); + .withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count))) + .withNextIncomingId(Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId))); + + CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); - final TransferFrame transferResponse = new TransferFrame() + for(int i = 0; i < count; i++) + { + final int nextId = nextIncomingId + i; + + String tagString = "theDeliveryTag" + nextId; + Binary dtag = new Binary(tagString.getBytes()); + + final TransferFrame transferResponse = new TransferFrame() .setHandle(UnsignedInteger.valueOf(_nextLinkHandle - 1)) // TODO: this needs to be the value used in the attach response - .setDeliveryId(UnsignedInteger.ZERO) // TODO: we shouldn't assume this is the first transfer on the session - .setDeliveryTag(new Binary("theDeliveryTag".getBytes())) + .setDeliveryId(UnsignedInteger.valueOf(nextId)) + .setDeliveryTag(dtag) .setMessageFormat(UnsignedInteger.ZERO) .setSettled(false); + Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, + propertiesDescribedType, appPropertiesDescribedType, content); + + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); + transferResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + transferResponseSender.setChannel(flowMatcher.getActualChannel()); + } + }); + + composite.add(transferResponseSender); + } + + flowMatcher.onSuccess(composite); + + addHandler(flowMatcher); + } + + private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType, + final MessageAnnotationsDescribedType messageAnnotationsDescribedType, + final PropertiesDescribedType propertiesDescribedType, + final ApplicationPropertiesDescribedType appPropertiesDescribedType, + final DescribedType content) + { Data payloadData = Proton.data(1024); if(headerDescribedType != null) @@ -738,22 +793,7 @@ public class TestAmqpPeer implements AutoCloseable payloadData.putDescribedType(content); } - Binary payload = payloadData.encode(); - - // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); - transferResponseSender.setValueProvider(new ValueProvider() - { - @Override - public void setValues() - { - transferResponseSender.setChannel(flowMatcher.getActualChannel()); - } - }); - - flowMatcher.onSuccess(transferResponseSender); - - addHandler(flowMatcher); + return payloadData.encode(); } public void expectTransfer(Matcher<Binary> expectedPayloadMatcher) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
