Repository: qpid-jms Updated Branches: refs/heads/master f8470ec1b -> acecfbe19
QPIDJMS-159 Message producer respond to drain of credit. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/acecfbe1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/acecfbe1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/acecfbe1 Branch: refs/heads/master Commit: acecfbe194036ffdf3d6ee11b6a945b4520c7711 Parents: f8470ec Author: Timothy Bish <[email protected]> Authored: Thu Mar 24 16:43:35 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Mar 24 16:43:35 2016 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpFixedProducer.java | 5 ++ .../integration/ProducerIntegrationTest.java | 46 +++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 54 ++++++++++++++++++++ 3 files changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index f548096..3dbb85a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -201,6 +201,11 @@ public class AmqpFixedProducer extends AmqpProducer { } } + // If a drain was requested, we just sent what we had so respond with drained + if (getEndpoint().getDrain()) { + getEndpoint().drained(); + } + // Once the pending sends queue is drained we can propagate the close request. if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) { super.close(closeRequest); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index f65a47a..4b35a4f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -68,6 +68,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1248,4 +1249,49 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout = 20000) + public void testCreditDrainedAfterSend() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setSendTimeout(500); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + MessageProducer producer = session.createProducer(destination); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + // After the first send lets drain off the credit from the sender asking for one + // more message if it has one. + testPeer.expectTransferRespondWithDrain(messageMatcher, 1); + testPeer.expectLinkFlow(true, false, Matchers.equalTo(UnsignedInteger.ZERO)); + testPeer.expectDetach(true, true, true); + testPeer.expectClose(); + + producer.send(session.createMessage()); + + // We don't have any credit now since we were drained, so the send should + // block until more credit is issued. + try { + producer.send(session.createMessage()); + fail("Should have timed out waiting for credit to send."); + } catch (JmsSendTimedOutException jmsEx) { + LOG.info("Caught expected send timeout."); + } + + producer.close(); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 7ffdb2e..296c108 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1549,6 +1549,60 @@ public class TestAmqpPeer implements AutoCloseable addHandler(transferMatcher); } + public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int drainAmount) + { + Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); + + final TransferMatcher transferMatcher = new TransferMatcher(); + transferMatcher.setPayloadMatcher(expectedPayloadMatcher); + transferMatcher.withSettled(settledMatcher); + transferMatcher.withState(nullValue()); + + CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); + final DispositionFrame dispositionResponse = new DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new Accepted()); + + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null); + dispositionFrameSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + dispositionFrameSender.setChannel(transferMatcher.getActualChannel()); + dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId()); + } + }); + + final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded + .setIncomingWindow(UnsignedInteger.valueOf(2048)) + .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded + .setOutgoingWindow(UnsignedInteger.valueOf(2048)) + .setLinkCredit(UnsignedInteger.valueOf(drainAmount)); + + // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); + flowFrameSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + flowFrameSender.setChannel(transferMatcher.getActualChannel()); + flowFrame.setHandle(transferMatcher.getReceivedHandle()); + flowFrame.setDeliveryCount(UnsignedInteger.ONE); + flowFrame.setDrain(true); + } + }); + + flowFrameSender.setSendDelay(0); + + composite.add(flowFrameSender); + composite.add(dispositionFrameSender); + + transferMatcher.onCompletion(composite); + + addHandler(transferMatcher); + } + public void expectDeclare(Binary txnId) { TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
