This is an automated email from the ASF dual-hosted git repository. havret pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push: new 2e925c9 AMQNET-746 NmsContext event listeners 2e925c9 is described below commit 2e925c9cd1027c3b64b08ed542fbe083548b807b Author: lukeabsent <travelthrou...@interia.pl> AuthorDate: Tue Dec 21 10:22:06 2021 +0100 AMQNET-746 NmsContext event listeners --- src/NMS.AMQP/NmsContext.cs | 49 +++++-- .../Async/NMSContextEventListenersTestAsync.cs | 153 +++++++++++++++++++++ .../Integration/FailoverIntegrationTest.cs | 13 +- 3 files changed, 198 insertions(+), 17 deletions(-) diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs index d8b905e..7838f21 100644 --- a/src/NMS.AMQP/NmsContext.cs +++ b/src/NMS.AMQP/NmsContext.cs @@ -34,11 +34,8 @@ namespace Apache.NMS.AMQP private NmsMessageProducer sharedProducer; private bool autoStart = true; - public NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode) + public NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode) : this(connection, acknowledgementMode, new AtomicLong(1)) { - this.connection = connection; - this.AcknowledgementMode = acknowledgementMode; - this.connectionRefCount = new AtomicLong(1); } private NmsContext(NmsConnection connection, AcknowledgementMode acknowledgementMode, @@ -462,7 +459,7 @@ namespace Apache.NMS.AMQP private NmsSession GetSession() { if (session == null) { - using( syncRoot.Lock()) + using(syncRoot.Lock()) { if (session == null) { @@ -516,16 +513,40 @@ namespace Apache.NMS.AMQP public bool AutoStart { get => autoStart; set => autoStart = value; } - public event SessionTxEventDelegate TransactionStartedListener; - - public event SessionTxEventDelegate TransactionCommittedListener; - - public event SessionTxEventDelegate TransactionRolledBackListener; - - public event ExceptionListener ExceptionListener; + public event SessionTxEventDelegate TransactionStartedListener + { + add => GetSession().TransactionStartedListener += value; + remove => GetSession().TransactionStartedListener -= value; + } + + public event SessionTxEventDelegate TransactionCommittedListener + { + add => GetSession().TransactionCommittedListener += value; + remove => GetSession().TransactionCommittedListener -= value; + } + + public event SessionTxEventDelegate TransactionRolledBackListener + { + add => GetSession().TransactionRolledBackListener += value; + remove => GetSession().TransactionRolledBackListener -= value; + } + + public event ExceptionListener ExceptionListener + { + add => connection.ExceptionListener += value; + remove => connection.ExceptionListener -= value; + } - public event ConnectionInterruptedListener ConnectionInterruptedListener; + public event ConnectionInterruptedListener ConnectionInterruptedListener + { + add => connection.ConnectionInterruptedListener += value; + remove => connection.ConnectionInterruptedListener -= value; + } - public event ConnectionResumedListener ConnectionResumedListener; + public event ConnectionResumedListener ConnectionResumedListener + { + add => connection.ConnectionResumedListener += value; + remove => connection.ConnectionResumedListener -= value; + } } } \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs new file mode 100644 index 0000000..54a6c44 --- /dev/null +++ b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs @@ -0,0 +1,153 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Amqp.Framing; +using Amqp.Transactions; +using Apache.NMS; +using Apache.NMS.AMQP; +using NMS.AMQP.Test.TestAmqp; +using NMS.AMQP.Test.TestAmqp.BasicTypes; +using NUnit.Framework; + +namespace NMS.AMQP.Test.Integration.Async +{ + public class NMSContextEventListenersTestAsync : IntegrationTestFixture + { + [Test, Timeout(20_000)] + public async Task TestRemotelyEndConnectionListenerInvoked() + { + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + ManualResetEvent done = new ManualResetEvent(false); + + // Don't set a ClientId, so that the underlying AMQP connection isn't established yet + var context = await EstablishNMSContextAsync(testPeer: testPeer, setClientId: false); + + // Tell the test peer to close the connection when executing its last handler + testPeer.RemotelyCloseConnection(expectCloseResponse: true, errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba"); + + context.ExceptionListener += exception => done.Set(); + + // Trigger the underlying AMQP connection + await context.StartAsync(); + + Assert.IsTrue(done.WaitOne(TimeSpan.FromSeconds(5)), "Connection should report failure"); + + await context.CloseAsync(); + } + } + + [Test, Timeout(20_000), Category("Windows")] + public async Task TestConnectionInterruptedInvokedWhenConnectionToBrokerLost() + { + using (TestAmqpPeer originalPeer = new TestAmqpPeer()) + { + ManualResetEvent connectionInterruptedInvoked = new ManualResetEvent(false); + + originalPeer.ExpectSaslAnonymous(); + originalPeer.ExpectOpen(); + originalPeer.ExpectBegin(); + originalPeer.ExpectBegin(); + + var conFactory = new ConnectionFactory(FailoverIntegrationTest.CreateFailoverUri(null, null, originalPeer)); + var context = conFactory.CreateContext(AcknowledgementMode.AutoAcknowledge); + context.ConnectionInterruptedListener += () => connectionInterruptedInvoked.Set(); + + await context.StartAsync(); + + originalPeer.Close(); + + Assert.IsTrue(connectionInterruptedInvoked.WaitOne(TimeSpan.FromSeconds(10))); + } + } + + [Test, Timeout(20_000), Category("Windows")] + public async Task TestConnectionResumedInvokedWhenConnectionToBrokerLost() + { + using (TestAmqpPeer originalPeer = new TestAmqpPeer()) + using (TestAmqpPeer finalPeer = new TestAmqpPeer()) + { + ManualResetEvent connectionResumedInvoked = new ManualResetEvent(false); + + originalPeer.ExpectSaslAnonymous(); + originalPeer.ExpectOpen(); + originalPeer.ExpectBegin(); + originalPeer.ExpectBegin(); + + finalPeer.ExpectSaslAnonymous(); + finalPeer.ExpectOpen(); + finalPeer.ExpectBegin(); + finalPeer.ExpectBegin(); + + var conFactory = + new ConnectionFactory( + FailoverIntegrationTest.CreateFailoverUri(null, null, originalPeer, finalPeer)); + var context = conFactory.CreateContext(); + + context.ConnectionResumedListener += () => connectionResumedInvoked.Set(); + + await context.StartAsync(); + + originalPeer.Close(); + Assert.IsTrue(connectionResumedInvoked.WaitOne(TimeSpan.FromSeconds(10))); + } + } + + + [Test, Timeout(20_000)] + public async Task TestProducedMessagesOnTransactedSessionCarryTxnId() + { + ManualResetEvent transactionRolledBackEvent = new ManualResetEvent(false); + + using (TestAmqpPeer testPeer = new TestAmqpPeer()) + { + var connection = await EstablishNMSContextAsync(testPeer,acknowledgementMode:AcknowledgementMode.Transactional); + + + testPeer.ExpectBegin(); + testPeer.ExpectCoordinatorAttach(); + await connection.StartAsync(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + byte[] txnId = { 1, 2, 3, 4 }; + testPeer.ExpectDeclare(txnId); + + // ISession session = connection.CreateSession(AcknowledgementMode.Transactional); + connection.TransactionRolledBackListener += session => transactionRolledBackEvent.Set(); + IQueue queue = await connection.GetQueueAsync("myQueue"); + + // Create a producer to use in provoking creation of the AMQP transaction + testPeer.ExpectSenderAttach(); + + var producer = await connection.CreateProducerAsync(); + + // Expect the message which was sent under the current transaction. Check it carries + // TransactionalState with the above txnId but has no outcome. Respond with a + // TransactionalState with Accepted outcome. + testPeer.ExpectTransfer(messageMatcher: Assert.NotNull, + stateMatcher: state => + { + Assert.IsInstanceOf<TransactionalState>(state); + TransactionalState transactionalState = (TransactionalState) state; + CollectionAssert.AreEqual(txnId, transactionalState.TxnId); + Assert.IsNull(transactionalState.Outcome); + }, + responseState: new TransactionalState() { TxnId = txnId, Outcome = new Accepted() }, + responseSettled: true); + testPeer.ExpectDischarge(txnId, dischargeState: true); + + await producer.SendAsync(queue, await connection.CreateMessageAsync()); + + testPeer.ExpectEnd(); + testPeer.ExpectClose(); + await connection.CloseAsync(); + + testPeer.WaitForAllMatchersToComplete(1000); + + Assert.IsTrue(transactionRolledBackEvent.WaitOne(1000)); + } + } + + } +} \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs index 3705eb8..07dfdc0 100644 --- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs +++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs @@ -1125,6 +1125,14 @@ namespace NMS.AMQP.Test.Integration } private NmsConnection EstablishAnonymousConnection(string connectionParams, string failoverParams, params TestAmqpPeer[] peers) + { + var remoteUri = CreateFailoverUri(connectionParams, failoverParams, peers); + + NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri); + return (NmsConnection) factory.CreateConnection(); + } + + internal static string CreateFailoverUri(string connectionParams, string failoverParams, params TestAmqpPeer[] peers) { if (peers.Length == 0) { @@ -1153,11 +1161,10 @@ namespace NMS.AMQP.Test.Integration remoteUri += ")?" + failoverParams; } - NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri); - return (NmsConnection) factory.CreateConnection(); + return remoteUri; } - private string CreatePeerUri(TestAmqpPeer peer, string parameters = null) + internal static string CreatePeerUri(TestAmqpPeer peer, string parameters = null) { return $"amqp://127.0.0.1:{peer.ServerPort}/{(parameters != null ? "?" + parameters : "")}"; }