This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push:
new 2e925c9 AMQNET-746 NmsContext event listeners
2e925c9 is described below
commit 2e925c9cd1027c3b64b08ed542fbe083548b807b
Author: lukeabsent <[email protected]>
AuthorDate: Tue Dec 21 10:22:06 2021 +0100
AMQNET-746 NmsContext event listeners
---
src/NMS.AMQP/NmsContext.cs | 49 +++++--
.../Async/NMSContextEventListenersTestAsync.cs | 153 +++++++++++++++++++++
.../Integration/FailoverIntegrationTest.cs | 13 +-
3 files changed, 198 insertions(+), 17 deletions(-)
diff --git a/src/NMS.AMQP/NmsContext.cs b/src/NMS.AMQP/NmsContext.cs
index d8b905e..7838f21 100644
--- a/src/NMS.AMQP/NmsContext.cs
+++ b/src/NMS.AMQP/NmsContext.cs
@@ -34,11 +34,8 @@ namespace Apache.NMS.AMQP
private NmsMessageProducer sharedProducer;
private bool autoStart = true;
- public NmsContext(NmsConnection connection, AcknowledgementMode
acknowledgementMode)
+ public NmsContext(NmsConnection connection, AcknowledgementMode
acknowledgementMode) : this(connection, acknowledgementMode, new AtomicLong(1))
{
- this.connection = connection;
- this.AcknowledgementMode = acknowledgementMode;
- this.connectionRefCount = new AtomicLong(1);
}
private NmsContext(NmsConnection connection, AcknowledgementMode
acknowledgementMode,
@@ -462,7 +459,7 @@ namespace Apache.NMS.AMQP
private NmsSession GetSession() {
if (session == null)
{
- using( syncRoot.Lock())
+ using(syncRoot.Lock())
{
if (session == null)
{
@@ -516,16 +513,40 @@ namespace Apache.NMS.AMQP
public bool AutoStart { get => autoStart; set => autoStart = value; }
- public event SessionTxEventDelegate TransactionStartedListener;
-
- public event SessionTxEventDelegate TransactionCommittedListener;
-
- public event SessionTxEventDelegate TransactionRolledBackListener;
-
- public event ExceptionListener ExceptionListener;
+ public event SessionTxEventDelegate TransactionStartedListener
+ {
+ add => GetSession().TransactionStartedListener += value;
+ remove => GetSession().TransactionStartedListener -= value;
+ }
+
+ public event SessionTxEventDelegate TransactionCommittedListener
+ {
+ add => GetSession().TransactionCommittedListener += value;
+ remove => GetSession().TransactionCommittedListener -= value;
+ }
+
+ public event SessionTxEventDelegate TransactionRolledBackListener
+ {
+ add => GetSession().TransactionRolledBackListener += value;
+ remove => GetSession().TransactionRolledBackListener -= value;
+ }
+
+ public event ExceptionListener ExceptionListener
+ {
+ add => connection.ExceptionListener += value;
+ remove => connection.ExceptionListener -= value;
+ }
- public event ConnectionInterruptedListener
ConnectionInterruptedListener;
+ public event ConnectionInterruptedListener
ConnectionInterruptedListener
+ {
+ add => connection.ConnectionInterruptedListener += value;
+ remove => connection.ConnectionInterruptedListener -= value;
+ }
- public event ConnectionResumedListener ConnectionResumedListener;
+ public event ConnectionResumedListener ConnectionResumedListener
+ {
+ add => connection.ConnectionResumedListener += value;
+ remove => connection.ConnectionResumedListener -= value;
+ }
}
}
\ No newline at end of file
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
new file mode 100644
index 0000000..54a6c44
--- /dev/null
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSContextEventListenersTestAsync.cs
@@ -0,0 +1,153 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Amqp.Framing;
+using Amqp.Transactions;
+using Apache.NMS;
+using Apache.NMS.AMQP;
+using NMS.AMQP.Test.TestAmqp;
+using NMS.AMQP.Test.TestAmqp.BasicTypes;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration.Async
+{
+ public class NMSContextEventListenersTestAsync : IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public async Task TestRemotelyEndConnectionListenerInvoked()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent done = new ManualResetEvent(false);
+
+ // Don't set a ClientId, so that the underlying AMQP
connection isn't established yet
+ var context = await EstablishNMSContextAsync(testPeer:
testPeer, setClientId: false);
+
+ // Tell the test peer to close the connection when executing
its last handler
+ testPeer.RemotelyCloseConnection(expectCloseResponse: true,
errorCondition: ConnectionError.CONNECTION_FORCED, errorMessage: "buba");
+
+ context.ExceptionListener += exception => done.Set();
+
+ // Trigger the underlying AMQP connection
+ await context.StartAsync();
+
+ Assert.IsTrue(done.WaitOne(TimeSpan.FromSeconds(5)),
"Connection should report failure");
+
+ await context.CloseAsync();
+ }
+ }
+
+ [Test, Timeout(20_000), Category("Windows")]
+ public async Task
TestConnectionInterruptedInvokedWhenConnectionToBrokerLost()
+ {
+ using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent connectionInterruptedInvoked = new
ManualResetEvent(false);
+
+ originalPeer.ExpectSaslAnonymous();
+ originalPeer.ExpectOpen();
+ originalPeer.ExpectBegin();
+ originalPeer.ExpectBegin();
+
+ var conFactory = new
ConnectionFactory(FailoverIntegrationTest.CreateFailoverUri(null, null,
originalPeer));
+ var context =
conFactory.CreateContext(AcknowledgementMode.AutoAcknowledge);
+ context.ConnectionInterruptedListener += () =>
connectionInterruptedInvoked.Set();
+
+ await context.StartAsync();
+
+ originalPeer.Close();
+
+
Assert.IsTrue(connectionInterruptedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+ }
+ }
+
+ [Test, Timeout(20_000), Category("Windows")]
+ public async Task
TestConnectionResumedInvokedWhenConnectionToBrokerLost()
+ {
+ using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+ using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent connectionResumedInvoked = new
ManualResetEvent(false);
+
+ originalPeer.ExpectSaslAnonymous();
+ originalPeer.ExpectOpen();
+ originalPeer.ExpectBegin();
+ originalPeer.ExpectBegin();
+
+ finalPeer.ExpectSaslAnonymous();
+ finalPeer.ExpectOpen();
+ finalPeer.ExpectBegin();
+ finalPeer.ExpectBegin();
+
+ var conFactory =
+ new ConnectionFactory(
+ FailoverIntegrationTest.CreateFailoverUri(null, null,
originalPeer, finalPeer));
+ var context = conFactory.CreateContext();
+
+ context.ConnectionResumedListener += () =>
connectionResumedInvoked.Set();
+
+ await context.StartAsync();
+
+ originalPeer.Close();
+
Assert.IsTrue(connectionResumedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+ }
+ }
+
+
+ [Test, Timeout(20_000)]
+ public async Task TestProducedMessagesOnTransactedSessionCarryTxnId()
+ {
+ ManualResetEvent transactionRolledBackEvent = new
ManualResetEvent(false);
+
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var connection = await
EstablishNMSContextAsync(testPeer,acknowledgementMode:AcknowledgementMode.Transactional);
+
+
+ testPeer.ExpectBegin();
+ testPeer.ExpectCoordinatorAttach();
+ await connection.StartAsync();
+
+ // First expect an unsettled 'declare' transfer to the txn
coordinator, and
+ // reply with a declared disposition state containing the
txnId.
+ byte[] txnId = { 1, 2, 3, 4 };
+ testPeer.ExpectDeclare(txnId);
+
+ // ISession session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ connection.TransactionRolledBackListener += session =>
transactionRolledBackEvent.Set();
+ IQueue queue = await connection.GetQueueAsync("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP
transaction
+ testPeer.ExpectSenderAttach();
+
+ var producer = await connection.CreateProducerAsync();
+
+ // Expect the message which was sent under the current
transaction. Check it carries
+ // TransactionalState with the above txnId but has no outcome.
Respond with a
+ // TransactionalState with Accepted outcome.
+ testPeer.ExpectTransfer(messageMatcher: Assert.NotNull,
+ stateMatcher: state =>
+ {
+ Assert.IsInstanceOf<TransactionalState>(state);
+ TransactionalState transactionalState =
(TransactionalState) state;
+ CollectionAssert.AreEqual(txnId,
transactionalState.TxnId);
+ Assert.IsNull(transactionalState.Outcome);
+ },
+ responseState: new TransactionalState() { TxnId = txnId,
Outcome = new Accepted() },
+ responseSettled: true);
+ testPeer.ExpectDischarge(txnId, dischargeState: true);
+
+ await producer.SendAsync(queue, await
connection.CreateMessageAsync());
+
+ testPeer.ExpectEnd();
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+
+ Assert.IsTrue(transactionRolledBackEvent.WaitOne(1000));
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 3705eb8..07dfdc0 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1125,6 +1125,14 @@ namespace NMS.AMQP.Test.Integration
}
private NmsConnection EstablishAnonymousConnection(string
connectionParams, string failoverParams, params TestAmqpPeer[] peers)
+ {
+ var remoteUri = CreateFailoverUri(connectionParams,
failoverParams, peers);
+
+ NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri);
+ return (NmsConnection) factory.CreateConnection();
+ }
+
+ internal static string CreateFailoverUri(string connectionParams,
string failoverParams, params TestAmqpPeer[] peers)
{
if (peers.Length == 0)
{
@@ -1153,11 +1161,10 @@ namespace NMS.AMQP.Test.Integration
remoteUri += ")?" + failoverParams;
}
- NmsConnectionFactory factory = new NmsConnectionFactory(remoteUri);
- return (NmsConnection) factory.CreateConnection();
+ return remoteUri;
}
- private string CreatePeerUri(TestAmqpPeer peer, string parameters =
null)
+ internal static string CreatePeerUri(TestAmqpPeer peer, string
parameters = null)
{
return $"amqp://127.0.0.1:{peer.ServerPort}/{(parameters != null ?
"?" + parameters : "")}";
}