This is an automated email from the ASF dual-hosted git repository. michaelpearce pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/master by this push: new 590fc54 AMQNET-619: Handle remote detach properly for sender link new fe647a9 Merge pull request #42 from Havret/handle_sender_link_remote_detach_properly 590fc54 is described below commit 590fc549d06c347cbeb2fda685ff51113e8fe7a5 Author: Havret <h4v...@gmail.com> AuthorDate: Sat Oct 5 10:21:42 2019 +0200 AMQNET-619: Handle remote detach properly for sender link --- src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs | 17 +++++++++- .../Integration/FailoverIntegrationTest.cs | 39 ++++++++++++++++++++++ test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 24 ++++++++++--- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs index d767577..d3982dd 100644 --- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs +++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs @@ -54,7 +54,7 @@ namespace Apache.NMS.AMQP.Provider.Amqp string linkName = info.Id + ":" + target.Address; var taskCompletionSource = new TaskCompletionSource<bool>(); - senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, (link, attach) => { taskCompletionSource.SetResult(true); }); + senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, HandleOpened(taskCompletionSource)); senderLink.AddClosedCallback((sender, error) => { @@ -73,6 +73,21 @@ namespace Apache.NMS.AMQP.Provider.Amqp return taskCompletionSource.Task; } + + private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) => + { + if (IsClosePending(attach)) + return; + + tsc.SetResult(true); + }; + + private static bool IsClosePending(Attach attach) + { + // When no link terminus was created, the peer will now detach/close us otherwise + // we need to validate the returned remote target prior to open completion. + return attach.Target == null; + } private Source CreateSource() => new Source { diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs index 816dc84..cb3f4cc 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs @@ -1028,6 +1028,45 @@ namespace NMS.AMQP.Test.Integration } } + [Test, Timeout(20_000)] + public void TestCreateProducerFailsWhenLinkRefused() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + testPeer.ExpectSaslAnonymous(); + testPeer.ExpectOpen(); + testPeer.ExpectBegin(); + + NmsConnection connection = EstablishAnonymousConnection(testPeer); + connection.Start(); + + testPeer.ExpectBegin(); + ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + + string topicName = "myTopic"; + ITopic topic = session.GetTopic(topicName); + + // Expect a link to a topic node, which we will then refuse + testPeer.ExpectSenderAttach(targetMatcher: source => + { + Assert.AreEqual(topicName, source.Address); + Assert.IsFalse(source.Dynamic); + Assert.AreEqual((uint) TerminusDurability.NONE, source.Durable); + }, sourceMatcher: Assert.NotNull, refuseLink: true); + + //Expect the detach response to the test peer closing the producer link after refusal. + testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false); + + Assert.Catch<NMSException>(() => session.CreateProducer(topic)); + + // Shut it down + testPeer.ExpectClose(); + connection.Close(); + + testPeer.WaitForAllMatchersToComplete(1000); + } + } + private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers) { return EstablishAnonymousConnection(null, null, peers); diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs index 5646215..f1981e5 100644 --- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs +++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs @@ -421,7 +421,11 @@ namespace NMS.AMQP.Test.TestAmqp ExpectSenderAttach(sourceMatcher: Assert.NotNull, targetMatcher: Assert.NotNull); } - public void ExpectSenderAttach(Action<Source> sourceMatcher, Action<Target> targetMatcher, uint creditAmount = 100, bool senderSettled = false) + public void ExpectSenderAttach(Action<Source> sourceMatcher, + Action<Target> targetMatcher, + bool refuseLink = false, + uint creditAmount = 100, + bool senderSettled = false) { var attachMatcher = new FrameMatcher<Attach>() .WithAssertion(attach => Assert.IsNotNull(attach.LinkName)) @@ -440,15 +444,26 @@ namespace NMS.AMQP.Test.TestAmqp RcvSettleMode = ReceiverSettleMode.First, Handle = context.Command.Handle, LinkName = context.Command.LinkName, - Source = context.Command.Source, - Target = context.Command.Target + Source = context.Command.Source }; + if (refuseLink) + attach.Target = null; + else + attach.Target = context.Command.Target; + lastInitiatedLinkHandle = context.Command.Handle; context.SendCommand(attach); - var flow = new Flow() + if (refuseLink) + { + var detach = new Detach { Closed = true, Handle = context.Command.Handle }; + context.SendCommand(detach); + } + else + { + var flow = new Flow { NextIncomingId = 1, IncomingWindow = 2048, @@ -460,6 +475,7 @@ namespace NMS.AMQP.Test.TestAmqp }; context.SendCommand(flow); + } }); AddMatcher(attachMatcher);