Repository: qpid-jms
Updated Branches:
  refs/heads/master aef8ade2c -> 7a9d2cefd


QPIDJMS-197 Force fallback producer to be synchronous sender always.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7a9d2cef
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7a9d2cef
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7a9d2cef

Branch: refs/heads/master
Commit: 7a9d2cefdd1179567af878713a9ffd5c2ebc3b89
Parents: aef8ade
Author: Timothy Bish <[email protected]>
Authored: Thu Aug 4 10:49:10 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Thu Aug 4 10:49:10 2016 -0400

----------------------------------------------------------------------
 .../amqp/AmqpAnonymousFallbackProducer.java     |  5 ++
 .../integration/ProducerIntegrationTest.java    | 63 ++++++++++++++++++++
 2 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7a9d2cef/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 3d18eb0..b44e3b3 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -72,6 +72,11 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
     public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
         LOG.trace("Started send chain for anonymous producer: {}", 
getProducerId());
 
+        // Force sends marked as asynchronous to be sent synchronous so that 
the temporary
+        // producer instance can handle failures and perform necessary 
completion work on
+        // the send.
+        envelope.setSendAsync(false);
+
         AmqpProducer producer = null;
         if (connection.isAnonymousProducerCache()) {
             producer = producerCache.get(envelope.getDestination());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7a9d2cef/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index f3c41a4..699858a 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -1774,4 +1774,67 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void 
testAnonymousProducerAsyncSendFailureHandledWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.forceAsyncSend=true");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), false, new 
Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            // Producer should act as synchronous regardless of asynchronous 
send setting.
+            Message message = session.createMessage();
+            try {
+                producer.send(dest, message);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            //Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            producer.send(dest, message);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to