Repository: qpid-jms
Updated Branches:
  refs/heads/master f8470ec1b -> acecfbe19


QPIDJMS-159 Message producer respond to drain of credit.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/acecfbe1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/acecfbe1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/acecfbe1

Branch: refs/heads/master
Commit: acecfbe194036ffdf3d6ee11b6a945b4520c7711
Parents: f8470ec
Author: Timothy Bish <[email protected]>
Authored: Thu Mar 24 16:43:35 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Thu Mar 24 16:43:35 2016 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    |  5 ++
 .../integration/ProducerIntegrationTest.java    | 46 +++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 54 ++++++++++++++++++++
 3 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index f548096..3dbb85a 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -201,6 +201,11 @@ public class AmqpFixedProducer extends AmqpProducer {
             }
         }
 
+        // If a drain was requested, we just sent what we had so respond with 
drained
+        if (getEndpoint().getDrain()) {
+            getEndpoint().drained();
+        }
+
         // Once the pending sends queue is drained we can propagate the close 
request.
         if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
             super.close(closeRequest);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index f65a47a..4b35a4f 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -68,6 +68,7 @@ import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1248,4 +1249,49 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testCreditDrainedAfterSend() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            MessageProducer producer = session.createProducer(destination);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // After the first send lets drain off the credit from the sender 
asking for one
+            // more message if it has one.
+            testPeer.expectTransferRespondWithDrain(messageMatcher, 1);
+            testPeer.expectLinkFlow(true, false, 
Matchers.equalTo(UnsignedInteger.ZERO));
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
+
+            producer.send(session.createMessage());
+
+            // We don't have any credit now since we were drained, so the send 
should
+            // block until more credit is issued.
+            try {
+                producer.send(session.createMessage());
+                fail("Should have timed out waiting for credit to send.");
+            } catch (JmsSendTimedOutException jmsEx) {
+                LOG.info("Caught expected send timeout.");
+            }
+
+            producer.close();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7ffdb2e..296c108 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1549,6 +1549,60 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(transferMatcher);
     }
 
+    public void expectTransferRespondWithDrain(Matcher<Binary> 
expectedPayloadMatcher, int drainAmount)
+    {
+        Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), 
nullValue());
+
+        final TransferMatcher transferMatcher = new TransferMatcher();
+        transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
+        transferMatcher.withSettled(settledMatcher);
+        transferMatcher.withState(nullValue());
+
+        CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+        final DispositionFrame dispositionResponse = new 
DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new 
Accepted());
+
+        // The response frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender dispositionFrameSender = new FrameSender(this, 
FrameType.AMQP, -1, dispositionResponse, null);
+        dispositionFrameSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                
dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
+                
dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
+            }
+        });
+
+        final FlowFrame flowFrame = new 
FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard 
coded
+            .setIncomingWindow(UnsignedInteger.valueOf(2048))
+            .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard 
coded
+            .setOutgoingWindow(UnsignedInteger.valueOf(2048))
+            .setLinkCredit(UnsignedInteger.valueOf(drainAmount));
+
+        // The flow frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender flowFrameSender = new FrameSender(this, 
FrameType.AMQP, -1, flowFrame, null);
+        flowFrameSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                flowFrameSender.setChannel(transferMatcher.getActualChannel());
+                flowFrame.setHandle(transferMatcher.getReceivedHandle());
+                flowFrame.setDeliveryCount(UnsignedInteger.ONE);
+                flowFrame.setDrain(true);
+            }
+        });
+
+        flowFrameSender.setSendDelay(0);
+
+        composite.add(flowFrameSender);
+        composite.add(dispositionFrameSender);
+
+        transferMatcher.onCompletion(composite);
+
+        addHandler(transferMatcher);
+    }
+
     public void expectDeclare(Binary txnId)
     {
         TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to