This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new c8c0630 PROTON-2638 AwaitAccepted API should work for transaction
settlement
c8c0630 is described below
commit c8c0630100a163c03e4b4340816b2920c1761581
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Oct 28 16:51:22 2022 -0400
PROTON-2638 AwaitAccepted API should work for transaction settlement
Check in the transactional state if settlement outcome is accepted and then
answer that the delivery state is accepted.
---
.../Client/Implementation/ClientDeliveryState.cs | 4 +-
.../ClientReconnectTransactionTest.cs | 240 +++++++++++++++++++++
.../Implementation/ClilentTransactionsTest.cs | 58 +++++
3 files changed, 301 insertions(+), 1 deletion(-)
diff --git a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
index 8df2ce7..593a54f 100644
--- a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
+++ b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
@@ -28,7 +28,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
/// </summary>
public abstract class ClientDeliveryState : IDeliveryState
{
- public bool IsAccepted => Type == DeliveryStateType.Accepted;
+ public virtual bool IsAccepted => Type == DeliveryStateType.Accepted;
public abstract DeliveryStateType Type { get; }
@@ -166,6 +166,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
this.txnState.TxnId = txnState.TxnId.Copy();
}
+ public override bool IsAccepted => txnState.Outcome is Accepted;
+
public override DeliveryStateType Type =>
DeliveryStateType.Transactional;
public override Types.Transport.IDeliveryState ProtonDeliveryState =>
txnState;
diff --git
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
index f864aa0..0c95e04 100644
---
a/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
+++
b/test/Proton.Client.Tests/Client/Implementation/ClientReconnectTransactionTest.cs
@@ -17,6 +17,7 @@
using System.Threading;
using Apache.Qpid.Proton.Client.Exceptions;
+using Apache.Qpid.Proton.Client.TestSupport;
using Apache.Qpid.Proton.Test.Driver;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
@@ -84,5 +85,244 @@ namespace Apache.Qpid.Proton.Client.Implementation
secondPeer.WaitForScriptToComplete();
}
}
+
+ [Test]
+ public void TestTransactionInDoubtAfterReconnect()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer firstPeer = new
ProtonTestServer(loggerFactory))
+ using (ProtonTestServer secondPeer = new
ProtonTestServer(loggerFactory))
+ {
+ firstPeer.ExpectSASLAnonymousConnect();
+ firstPeer.ExpectOpen().Respond();
+ firstPeer.ExpectBegin().Respond();
+ firstPeer.ExpectCoordinatorAttach().Respond();
+ firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+ firstPeer.ExpectDeclare().Accept(txnId);
+
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ firstPeer.RemoteFlow().WithLinkCredit(1).Queue();
+ firstPeer.ExpectTransfer().WithNonNullPayload();
+ firstPeer.DropAfterLastHandler();
+ firstPeer.Start();
+
+ secondPeer.ExpectSASLAnonymousConnect();
+ secondPeer.ExpectOpen().Respond();
+ secondPeer.ExpectBegin().Respond();
+
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ secondPeer.Start();
+
+ string primaryAddress = firstPeer.ServerAddress;
+ int primaryPort = firstPeer.ServerPort;
+ string backupAddress = secondPeer.ServerAddress;
+ int backupPort = secondPeer.ServerPort;
+
+ logger.LogInformation("Test started, first peer listening on:
{0}:{1}", primaryAddress, primaryPort);
+ logger.LogInformation("Test started, backup peer listening on:
{0}:{1}", backupAddress, backupPort);
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.ReconnectOptions.ReconnectEnabled = true;
+ options.ReconnectOptions.AddReconnectLocation(backupAddress,
backupPort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(primaryAddress,
primaryPort, options);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ ISender sender = session.OpenSender("test").OpenTask.Result;
+ sender.Send(IMessage<string>.Create("Hello"));
+
+ firstPeer.WaitForScriptToComplete();
+
+ secondPeer.WaitForScriptToComplete();
+ secondPeer.ExpectClose().Respond();
+
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Should have failed to declare transaction");
+ }
+ catch (ClientTransactionRolledBackException cliEx)
+ {
+ logger.LogInformation("Caught expected error from test", cliEx);
+ }
+
+ connection.Close();
+
+ secondPeer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestSendInTransactionIsNoOpAfterReconnect()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer firstPeer = new
ProtonTestServer(loggerFactory))
+ using (ProtonTestServer secondPeer = new
ProtonTestServer(loggerFactory))
+ {
+ firstPeer.ExpectSASLAnonymousConnect();
+ firstPeer.ExpectOpen().Respond();
+ firstPeer.ExpectBegin().Respond();
+ firstPeer.ExpectCoordinatorAttach().Respond();
+ firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+ firstPeer.ExpectDeclare().Accept(txnId);
+
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ firstPeer.RemoteFlow().WithLinkCredit(1).Queue();
+ firstPeer.ExpectTransfer().WithNonNullPayload();
+ firstPeer.DropAfterLastHandler();
+ firstPeer.Start();
+
+ secondPeer.ExpectSASLAnonymousConnect();
+ secondPeer.ExpectOpen().Respond();
+ secondPeer.ExpectBegin().Respond();
+
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ secondPeer.RemoteFlow().WithLinkCredit(1).Queue();
+ secondPeer.Start();
+
+ string primaryAddress = firstPeer.ServerAddress;
+ int primaryPort = firstPeer.ServerPort;
+ string backupAddress = secondPeer.ServerAddress;
+ int backupPort = secondPeer.ServerPort;
+
+ logger.LogInformation("Test started, first peer listening on:
{0}:{1}", primaryAddress, primaryPort);
+ logger.LogInformation("Test started, backup peer listening on:
{0}:{1}", backupAddress, backupPort);
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.ReconnectOptions.ReconnectEnabled = true;
+ options.ReconnectOptions.AddReconnectLocation(backupAddress,
backupPort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(primaryAddress,
primaryPort, options);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ ISender sender = session.OpenSender("test").OpenTask.Result;
+ sender.Send(IMessage<string>.Create("Hello"));
+
+ firstPeer.WaitForScriptToComplete();
+
+ secondPeer.WaitForScriptToComplete();
+ secondPeer.ExpectClose().Respond();
+
+ sender.Send(IMessage<string>.Create("Hello Again"));
+
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Should have failed to declare transaction");
+ }
+ catch (ClientTransactionRolledBackException cliEx)
+ {
+ logger.LogInformation("Caught expected error from test", cliEx);
+ }
+
+ connection.Close();
+
+ secondPeer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void
TestNewTransactionCanBeCreatedAfterOldInstanceRolledBackByReconnect()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer firstPeer = new
ProtonTestServer(loggerFactory))
+ using (ProtonTestServer secondPeer = new
ProtonTestServer(loggerFactory))
+ {
+ firstPeer.ExpectSASLAnonymousConnect();
+ firstPeer.ExpectOpen().Respond();
+ firstPeer.ExpectBegin().Respond();
+
firstPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ firstPeer.ExpectCoordinatorAttach().Respond();
+ firstPeer.RemoteFlow().WithLinkCredit(2).Queue();
+ firstPeer.ExpectDeclare().Accept(txnId);
+ firstPeer.DropAfterLastHandler(5);
+ firstPeer.Start();
+
+ secondPeer.ExpectSASLAnonymousConnect();
+ secondPeer.ExpectOpen().Respond();
+ secondPeer.ExpectBegin().Respond();
+
secondPeer.ExpectAttach().OfSender().WithTarget().WithAddress("test").And().Respond();
+ secondPeer.RemoteFlow().WithLinkCredit(1).Queue();
+ secondPeer.Start();
+
+ string primaryAddress = firstPeer.ServerAddress;
+ int primaryPort = firstPeer.ServerPort;
+ string backupAddress = secondPeer.ServerAddress;
+ int backupPort = secondPeer.ServerPort;
+
+ logger.LogInformation("Test started, first peer listening on:
{0}:{1}", primaryAddress, primaryPort);
+ logger.LogInformation("Test started, backup peer listening on:
{0}:{1}", backupAddress, backupPort);
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.ReconnectOptions.ReconnectEnabled = true;
+ options.ReconnectOptions.AddReconnectLocation(backupAddress,
backupPort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(primaryAddress,
primaryPort, options);
+ ISession session = connection.OpenSession().OpenTask.Result;
+ ISender sender = session.OpenSender("test").OpenTask.Result;
+
+ session.BeginTransaction();
+
+ firstPeer.WaitForScriptToComplete();
+
+ secondPeer.WaitForScriptToComplete();
+ secondPeer.ExpectCoordinatorAttach().Respond();
+ secondPeer.RemoteFlow().WithLinkCredit(2).Queue();
+ secondPeer.ExpectDeclare().Accept(txnId);
+ secondPeer.ExpectTransfer().WithHandle(0)
+ .WithNonNullPayload()
+
.WithState().Transactional().WithTxnId(txnId).And()
+ .Respond()
+
.WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+ .WithSettled(true);
+
secondPeer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ secondPeer.ExpectEnd().Respond();
+ secondPeer.ExpectClose().Respond();
+
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Should have failed to declare transaction");
+ }
+ catch (ClientTransactionRolledBackException cliEx)
+ {
+ logger.LogInformation("Caught expected error from test", cliEx);
+ }
+
+ session.BeginTransaction();
+
+ ITracker tracker =
sender.Send(IMessage<string>.Create("test-message"));
+
+ Assert.IsNotNull(tracker);
+ Assert.IsNotNull(tracker.SettlementTask.Result);
+ Assert.AreEqual(tracker.RemoteState.Type,
DeliveryStateType.Transactional);
+ Assert.IsNotNull(tracker.State);
+ Assert.AreEqual(tracker.State.Type,
DeliveryStateType.Transactional,
+ "Delivery inside transaction should have Transactional state:
" + tracker.State.Type);
+ Wait.AssertTrue("Delivery in transaction should be locally settled
after response", () => tracker.Settled);
+
+ try
+ {
+ session.CommitTransaction();
+ }
+ catch (ClientException cliEx)
+ {
+ logger.LogInformation("Caught unexpected error from test",
cliEx);
+ Assert.Fail("Should not have failed to declare transaction");
+ }
+
+ session.Close();
+ connection.Close();
+
+ secondPeer.WaitForScriptToComplete();
+ }
+ }
+
}
}
\ No newline at end of file
diff --git
a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
index bdf8749..2e2f3bd 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
@@ -1725,5 +1725,63 @@ namespace Apache.Qpid.Proton.Client.Implementation
peer.WaitForScriptToComplete();
}
}
+
+ [Test]
+ public void TestAwaitSettlementWorksForMessageSentInTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectTransfer().WithHandle(0)
+ .WithNonNullPayload()
+
.WithState().Transactional().WithTxnId(txnId).And()
+ .Respond()
+
.WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+ .WithSettled(true);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}",
remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress,
remotePort);
+ ISession session = connection.OpenSession();
+ ISender sender = session.OpenSender("address").OpenTask.Result;
+
+ session.BeginTransaction();
+
+ ITracker tracker =
sender.Send(IMessage<string>.Create("test-message"));
+
+ Assert.IsNotNull(tracker);
+ Assert.IsNotNull(tracker.AwaitAccepted());
+ Assert.IsTrue(tracker.RemoteState.IsAccepted);
+ Assert.AreEqual(tracker.RemoteState.Type,
DeliveryStateType.Transactional,
+ "Delivery inside transaction should have
Transactional state");
+ Assert.AreEqual(tracker.State.Type,
DeliveryStateType.Transactional,
+ "Delivery inside transaction should have
Transactional state: " + tracker.State.Type);
+ Wait.AssertTrue("Delivery in transaction should be locally settled
after response", () => tracker.Settled);
+
+ session.CommitTransaction();
+
+ session.CloseAsync();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]