Repository: qpid-jms Updated Branches: refs/heads/master aef8ade2c -> 7a9d2cefd
QPIDJMS-197 Force fallback producer to be synchronous sender always. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7a9d2cef Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7a9d2cef Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7a9d2cef Branch: refs/heads/master Commit: 7a9d2cefdd1179567af878713a9ffd5c2ebc3b89 Parents: aef8ade Author: Timothy Bish <[email protected]> Authored: Thu Aug 4 10:49:10 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Aug 4 10:49:10 2016 -0400 ---------------------------------------------------------------------- .../amqp/AmqpAnonymousFallbackProducer.java | 5 ++ .../integration/ProducerIntegrationTest.java | 63 ++++++++++++++++++++ 2 files changed, 68 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7a9d2cef/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 3d18eb0..b44e3b3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -72,6 +72,11 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); + // Force sends marked as asynchronous to be sent synchronous so that the temporary + // producer instance can handle failures and perform necessary completion work on + // the send. + envelope.setSendAsync(false); + AmqpProducer producer = null; if (connection.isAnonymousProducerCache()) { producer = producerCache.get(envelope.getDestination()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7a9d2cef/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index f3c41a4..699858a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -1774,4 +1774,67 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout = 20000) + public void testAnonymousProducerAsyncSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY + + Connection connection = testFixture.establishConnecton(testPeer, "?jms.forceAsyncSend=true"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), false, new Rejected(), true); + testPeer.expectDetach(true, true, true); + + // Producer should act as synchronous regardless of asynchronous send setting. + Message message = session.createMessage(); + try { + producer.send(dest, message); + fail("Send should fail"); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + } + + //Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + producer.send(dest, message); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
