Repository: qpid-jms Updated Branches: refs/heads/master dfa2b6739 -> aef8ade2c
QPIDJMS-196 Anonymous fallback producer send failure not handled. Ensure that on send failure with the anonymous fallback producer the producer is closed if caching is off and the send failure is propagated to the MessageProducer send call as expected. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/aef8ade2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/aef8ade2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/aef8ade2 Branch: refs/heads/master Commit: aef8ade2c101b9931bbcf25a1efe2c619ca89e15 Parents: dfa2b67 Author: Timothy Bish <[email protected]> Authored: Wed Aug 3 18:21:17 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Aug 3 18:21:17 2016 -0400 ---------------------------------------------------------------------- .../amqp/AmqpAnonymousFallbackProducer.java | 7 ++- .../integration/ProducerIntegrationTest.java | 64 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aef8ade2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 71f26e7..3d18eb0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -191,8 +191,11 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { @Override public void onFailure(Throwable result) { - // Ensure that cache get purged of any failed producers. - AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getResourceInfo().getDestination()); + LOG.trace("Send phase of anonymous send failed: {} ", getProducerId()); + if (!connection.isAnonymousProducerCache()) { + AnonymousCloseRequest close = new AnonymousCloseRequest(this); + producer.close(close); + } super.onFailure(result); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aef8ade2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 8a03b87..f3c41a4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -52,6 +52,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; @@ -65,9 +66,11 @@ import org.apache.qpid.jms.test.Wait; import org.apache.qpid.jms.test.testpeer.ListDescribedType; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; import org.apache.qpid.jms.test.testpeer.describedtypes.Released; +import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; @@ -1710,4 +1713,65 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout = 20000) + public void testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY + + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), false, new Rejected(), true); + testPeer.expectDetach(true, true, true); + + Message message = session.createMessage(); + try { + producer.send(dest, message); + fail("Send should fail"); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + } + + //Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + producer.send(dest, message); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
