Repository: qpid-jms
Updated Branches:
  refs/heads/master dfa2b6739 -> aef8ade2c


QPIDJMS-196 Anonymous fallback producer send failure not handled.

Ensure that on send failure with the anonymous fallback producer the
producer is closed if caching is off and the send failure is propagated
to the MessageProducer send call as expected.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/aef8ade2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/aef8ade2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/aef8ade2

Branch: refs/heads/master
Commit: aef8ade2c101b9931bbcf25a1efe2c619ca89e15
Parents: dfa2b67
Author: Timothy Bish <[email protected]>
Authored: Wed Aug 3 18:21:17 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed Aug 3 18:21:17 2016 -0400

----------------------------------------------------------------------
 .../amqp/AmqpAnonymousFallbackProducer.java     |  7 ++-
 .../integration/ProducerIntegrationTest.java    | 64 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aef8ade2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 71f26e7..3d18eb0 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -191,8 +191,11 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
 
         @Override
         public void onFailure(Throwable result) {
-            // Ensure that cache get purged of any failed producers.
-            
AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getResourceInfo().getDestination());
+            LOG.trace("Send phase of anonymous send failed: {} ", 
getProducerId());
+            if (!connection.isAnonymousProducerCache()) {
+                AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+                producer.close(close);
+            }
             super.onFailure(result);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aef8ade2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 8a03b87..f3c41a4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -52,6 +52,7 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
@@ -65,9 +66,11 @@ import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.ListDescribedType;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
+import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
@@ -1710,4 +1713,65 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void 
testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), false, new 
Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            Message message = session.createMessage();
+            try {
+                producer.send(dest, message);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            //Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            producer.send(dest, message);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to