This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e925c9  AMQNET-746 NmsContext event listeners
2e925c9 is described below

commit 2e925c9cd1027c3b64b08ed542fbe083548b807b
Author: lukeabsent <travelthrou...@interia.pl>
AuthorDate: Tue Dec 21 10:22:06 2021 +0100

    AMQNET-746 NmsContext event listeners
---
 src/NMS.AMQP/NmsContext.cs                         |  49 +++++--
 .../Async/NMSContextEventListenersTestAsync.cs     | 153 +++++++++++++++++++++
 .../Integration/FailoverIntegrationTest.cs         |  13 +-
 3 files changed, 198 insertions(+), 17 deletions(-)

diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
index d8b905e..7838f21 100644
--- a/src/NMS.AMQP/NmsContext.cs
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -34,11 +34,8 @@ namespace Apache.NMS.AMQP
         private NmsMessageProducer sharedProducer;
         private bool autoStart = true;
 
-        public NmsContext(NmsConnection connection, AcknowledgementMode 
acknowledgementMode)
+        public NmsContext(NmsConnection connection, AcknowledgementMode 
acknowledgementMode) : this(connection, acknowledgementMode, new AtomicLong(1))
         {
-            this.connection = connection;
-            this.AcknowledgementMode = acknowledgementMode;
-            this.connectionRefCount = new AtomicLong(1);
         }
 
         private NmsContext(NmsConnection connection, AcknowledgementMode 
acknowledgementMode,
@@ -462,7 +459,7 @@ namespace Apache.NMS.AMQP
         private NmsSession GetSession() {
             if (session == null)
             {
-                using( syncRoot.Lock())
+                using(syncRoot.Lock())
                 {
                     if (session == null)
                     {
@@ -516,16 +513,40 @@ namespace Apache.NMS.AMQP
         
         public bool AutoStart { get => autoStart; set => autoStart = value; }
         
-        public event SessionTxEventDelegate TransactionStartedListener;
-        
-        public event SessionTxEventDelegate TransactionCommittedListener;
-        
-        public event SessionTxEventDelegate TransactionRolledBackListener;
-        
-        public event ExceptionListener ExceptionListener;
+        public event SessionTxEventDelegate TransactionStartedListener
+        {
+            add => GetSession().TransactionStartedListener += value;
+            remove => GetSession().TransactionStartedListener -= value;
+        }
+
+        public event SessionTxEventDelegate TransactionCommittedListener
+        {
+            add => GetSession().TransactionCommittedListener += value;
+            remove => GetSession().TransactionCommittedListener -= value;
+        }
+
+        public event SessionTxEventDelegate TransactionRolledBackListener
+        {
+            add => GetSession().TransactionRolledBackListener += value;
+            remove => GetSession().TransactionRolledBackListener -= value;
+        }
+
+        public event ExceptionListener ExceptionListener
+        {
+            add => connection.ExceptionListener += value;
+            remove => connection.ExceptionListener -= value;
+        }
         
-        public event ConnectionInterruptedListener 
ConnectionInterruptedListener;
+        public event ConnectionInterruptedListener 
ConnectionInterruptedListener
+        {
+            add => connection.ConnectionInterruptedListener += value;
+            remove => connection.ConnectionInterruptedListener -= value;
+        }
         
-        public event ConnectionResumedListener ConnectionResumedListener;
+        public event ConnectionResumedListener ConnectionResumedListener
+        {
+            add => connection.ConnectionResumedListener += value;
+            remove => connection.ConnectionResumedListener -= value;
+        }
     }
 }
\ No newline at end of file
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
new file mode 100644
index 0000000..54a6c44
--- /dev/null
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
@@ -0,0 +1,153 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Amqp.Transactions;
+using Apache.NMS;
+using Apache.NMS.AMQP;
+using NMS.AMQP.Test.TestAmqp;
+using NMS.AMQP.Test.TestAmqp.BasicTypes;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration.Async
+{
+    public class NMSContextEventListenersTestAsync : IntegrationTestFixture
+    {
+        [Test, Timeout(20_000)]
+        public async Task TestRemotelyEndConnectionListenerInvoked()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                ManualResetEvent done = new ManualResetEvent(false);
+
+                // Don't set a ClientId, so that the underlying AMQP 
connection isn't established yet
+                var context = await EstablishNMSContextAsync(testPeer: 
testPeer, setClientId: false);
+
+                // Tell the test peer to close the connection when executing 
its last handler
+                testPeer.RemotelyCloseConnection(expectCloseResponse: true, 
errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba");
+
+                context.ExceptionListener += exception => done.Set();
+
+                // Trigger the underlying AMQP connection
+                await context.StartAsync();
+
+                Assert.IsTrue(done.WaitOne(TimeSpan.FromSeconds(5)), 
"Connection should report failure");
+
+                await context.CloseAsync();
+            }
+        }
+        
+        [Test, Timeout(20_000), Category("Windows")]
+        public async Task 
TestConnectionInterruptedInvokedWhenConnectionToBrokerLost()
+        {
+            using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+            {
+                ManualResetEvent connectionInterruptedInvoked = new 
ManualResetEvent(false);
+
+                originalPeer.ExpectSaslAnonymous();
+                originalPeer.ExpectOpen();
+                originalPeer.ExpectBegin();
+                originalPeer.ExpectBegin();
+
+                var conFactory = new 
ConnectionFactory(FailoverIntegrationTest.CreateFailoverUri(null, null, 
originalPeer));
+                var context = 
conFactory.CreateContext(AcknowledgementMode.AutoAcknowledge);
+                context.ConnectionInterruptedListener += () => 
connectionInterruptedInvoked.Set();
+
+                await context.StartAsync();
+                
+                originalPeer.Close();
+
+                
Assert.IsTrue(connectionInterruptedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+            }
+        }
+        
+        [Test, Timeout(20_000), Category("Windows")]
+        public async Task 
TestConnectionResumedInvokedWhenConnectionToBrokerLost()
+        {
+            using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+            using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+            {
+                ManualResetEvent connectionResumedInvoked = new 
ManualResetEvent(false);
+
+                originalPeer.ExpectSaslAnonymous();
+                originalPeer.ExpectOpen();
+                originalPeer.ExpectBegin();
+                originalPeer.ExpectBegin();
+
+                finalPeer.ExpectSaslAnonymous();
+                finalPeer.ExpectOpen();
+                finalPeer.ExpectBegin();
+                finalPeer.ExpectBegin();
+
+                var conFactory =
+                    new ConnectionFactory(
+                        FailoverIntegrationTest.CreateFailoverUri(null, null, 
originalPeer, finalPeer));
+                var context = conFactory.CreateContext();
+
+                context.ConnectionResumedListener += () => 
connectionResumedInvoked.Set();
+
+                await context.StartAsync();
+                
+                originalPeer.Close();
+                
Assert.IsTrue(connectionResumedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+            }
+        }
+        
+            
+        [Test, Timeout(20_000)]
+        public async Task TestProducedMessagesOnTransactedSessionCarryTxnId()
+        {
+            ManualResetEvent transactionRolledBackEvent = new 
ManualResetEvent(false);
+            
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                var connection = await 
EstablishNMSContextAsync(testPeer,acknowledgementMode:AcknowledgementMode.Transactional);
+                
+
+                testPeer.ExpectBegin();
+                testPeer.ExpectCoordinatorAttach();
+                await connection.StartAsync();
+                
+                // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+                // reply with a declared disposition state containing the 
txnId.
+                byte[] txnId = { 1, 2, 3, 4 };
+                testPeer.ExpectDeclare(txnId);
+
+                // ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                connection.TransactionRolledBackListener += session => 
transactionRolledBackEvent.Set();
+                IQueue queue = await connection.GetQueueAsync("myQueue");
+
+                // Create a producer to use in provoking creation of the AMQP 
transaction
+                testPeer.ExpectSenderAttach();
+
+                var producer = await connection.CreateProducerAsync();
+
+                // Expect the message which was sent under the current 
transaction. Check it carries
+                // TransactionalState with the above txnId but has no outcome. 
Respond with a
+                // TransactionalState with Accepted outcome.
+                testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+                    stateMatcher: state =>
+                    {
+                        Assert.IsInstanceOf<TransactionalState>(state);
+                        TransactionalState transactionalState = 
(TransactionalState) state;
+                        CollectionAssert.AreEqual(txnId, 
transactionalState.TxnId);
+                        Assert.IsNull(transactionalState.Outcome);
+                    },
+                    responseState: new TransactionalState() { TxnId = txnId, 
Outcome = new Accepted() },
+                    responseSettled: true);
+                testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+                await producer.SendAsync(queue, await 
connection.CreateMessageAsync());
+
+                testPeer.ExpectEnd();
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+                
+                Assert.IsTrue(transactionRolledBackEvent.WaitOne(1000));
+            }
+        }
+       
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 3705eb8..07dfdc0 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1125,6 +1125,14 @@ namespace NMS.AMQP.Test.Integration
         }
 
         private NmsConnection EstablishAnonymousConnection(string 
connectionParams, string failoverParams, params TestAmqpPeer[] peers)
+        {
+            var remoteUri = CreateFailoverUri(connectionParams, 
failoverParams, peers);
+
+            NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri);
+            return (NmsConnection) factory.CreateConnection();
+        }
+
+        internal static string CreateFailoverUri(string connectionParams, 
string failoverParams, params TestAmqpPeer[] peers)
         {
             if (peers.Length == 0)
             {
@@ -1153,11 +1161,10 @@ namespace NMS.AMQP.Test.Integration
                 remoteUri += ")?" + failoverParams;
             }
 
-            NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri);
-            return (NmsConnection) factory.CreateConnection();
+            return remoteUri;
         }
 
-        private string CreatePeerUri(TestAmqpPeer peer, string parameters = 
null)
+        internal static string CreatePeerUri(TestAmqpPeer peer, string 
parameters = null)
         {
             return $"amqp://127.0.0.1:{peer.ServerPort}/{(parameters != null ? 
"?" + parameters : "")}";
         }

Reply via email to