This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new ab43220 PROTON-2649 Update sender sendable state on check to reflect
state
ab43220 is described below
commit ab43220acf92d14d3a41fb21e1f668edd7c4c43a
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Nov 7 15:35:39 2022 -0500
PROTON-2649 Update sender sendable state on check to reflect state
When a sender is asked if it is sendable update the state if the session
capacity reports that it is not available while the sender link does
still have credit. This allows the engine to notifity all senders in
the not sendable state that they have recovered.
---
src/Proton/Engine/Implementation/ProtonSender.cs | 2 +-
.../Engine/Implementation/ProtonSessionTest.cs | 184 +++++++++++++++++++++
2 files changed, 185 insertions(+), 1 deletion(-)
diff --git a/src/Proton/Engine/Implementation/ProtonSender.cs
b/src/Proton/Engine/Implementation/ProtonSender.cs
index 9e251b9..ecef356 100644
--- a/src/Proton/Engine/Implementation/ProtonSender.cs
+++ b/src/Proton/Engine/Implementation/ProtonSender.cs
@@ -54,7 +54,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation
public override uint Credit => CreditState.Credit;
- public bool IsSendable => sendable && sessionWindow.IsSendable;
+ public bool IsSendable => sendable = sendable &&
sessionWindow.IsSendable;
public override bool IsDraining => CreditState.IsDrain;
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
index 805814d..edac887 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
@@ -2779,6 +2779,190 @@ namespace Apache.Qpid.Proton.Engine.Implementation
Assert.IsNull(failure);
}
+ [Test]
+ public void
TestBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState()
+ {
+ IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+ engine.ErrorHandler((error) => failure = error.FailureCause);
+ Queue<Action> asyncIOCallbacks = new Queue<Action>();
+ ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+ byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+ IDeliveryTagGenerator generator =
ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+ peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+ peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+ peer.ExpectAttach().Respond();
+
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+ peer.ExpectAttach().Respond();
+
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+ IConnection connection = engine.Start();
+ connection.MaxFrameSize = 1024;
+ connection.Open();
+ ISession session = connection.Session();
+ session.OutgoingCapacity = 1024;
+ session.Open();
+ ISender sender1 = session.Sender("test1");
+ sender1.DeliveryTagGenerator = generator;
+ sender1.Open();
+ ISender sender2 = session.Sender("test2");
+ sender2.DeliveryTagGenerator = generator;
+ sender2.Open();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectTransfer().WithPayload(payload);
+
+ int sender1CreditStateUpdated = 0;
+ sender1.CreditStateUpdateHandler((self) =>
+ {
+ sender1CreditStateUpdated++;
+ });
+
+ int sender2CreditStateUpdated = 0;
+ sender2.CreditStateUpdateHandler((self) =>
+ {
+ sender2CreditStateUpdated++;
+ });
+
+ Assert.IsTrue(sender1.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+ Assert.IsTrue(sender2.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+ // Open, Begin, Attach, Attach
+ Assert.AreEqual(4, asyncIOCallbacks.Count);
+ foreach (Action action in asyncIOCallbacks)
+ {
+ action.Invoke();
+ }
+ asyncIOCallbacks.Clear();
+
+ IOutgoingDelivery delivery = sender1.Next();
+ delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+ peer.WaitForScriptToComplete();
+
+ Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+ Assert.IsFalse(sender1.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+ // Sender 2 shouldn't be able to send since sender 1 consumed the
outgoing window
+ Assert.IsFalse(sender2.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+ // Free a frame's worth of window which should trigger both senders
sendable update event
+ asyncIOCallbacks.Dequeue().Invoke();
+ Assert.AreEqual(0, asyncIOCallbacks.Count);
+
+ Assert.IsTrue(sender1.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+ Assert.AreEqual(1, sender1CreditStateUpdated);
+ Assert.IsTrue(sender2.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+ Assert.AreEqual(1, sender2CreditStateUpdated);
+
+ peer.WaitForScriptToComplete();
+ Assert.IsNull(failure);
+ }
+
+ [Test]
+ public void
TestSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow()
+ {
+ IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+ engine.ErrorHandler((error) => failure = error.FailureCause);
+ Queue<Action> asyncIOCallbacks = new Queue<Action>();
+ ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+ byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+ IDeliveryTagGenerator generator =
ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+ peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+ peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+
peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+ peer.ExpectAttach().Respond();
+
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+ peer.ExpectAttach().Respond();
+
peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+ IConnection connection = engine.Start();
+ connection.MaxFrameSize = 1024;
+ connection.Open();
+ ISession session = connection.Session();
+ session.OutgoingCapacity = 1024;
+ session.Open();
+ ISender sender1 = session.Sender("test1");
+ sender1.DeliveryTagGenerator = generator;
+ sender1.Open();
+ ISender sender2 = session.Sender("test2");
+ sender2.DeliveryTagGenerator = generator;
+ sender2.Open();
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectTransfer().WithPayload(payload);
+
+ int sender1CreditStateUpdated = 0;
+ sender1.CreditStateUpdateHandler((self) =>
+ {
+ sender1CreditStateUpdated++;
+ if (self.IsSendable)
+ {
+ IOutgoingDelivery delivery = self.Next();
+
delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+ }
+ });
+
+ int sender2CreditStateUpdated = 0;
+ sender2.CreditStateUpdateHandler((self) =>
+ {
+ sender2CreditStateUpdated++;
+ });
+
+ Assert.IsTrue(sender1.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+ Assert.IsTrue(sender2.IsSendable);
+ Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+ // Open, Begin, Attach, Attach
+ Assert.AreEqual(4, asyncIOCallbacks.Count);
+ foreach (Action action in asyncIOCallbacks)
+ {
+ action.Invoke();
+ }
+ asyncIOCallbacks.Clear();
+
+ IOutgoingDelivery delivery = sender1.Next();
+ delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectTransfer().WithPayload(payload);
+
+ Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+ Assert.IsFalse(sender1.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+ // Sender 2 shouldn't be able to send since sender 1 consumed the
outgoing window
+ Assert.IsFalse(sender2.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+ // Should trigger sender 1 to send which should exhaust the outgoing
credit
+ asyncIOCallbacks.Dequeue().Invoke();
+ Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+ Assert.IsFalse(sender1.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+ Assert.AreEqual(1, sender1CreditStateUpdated);
+ Assert.IsFalse(sender2.IsSendable);
+ Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+ // Should not have triggered an event for sender 2 being able to send
since
+ // sender one consumed the outgoing window already.
+ Assert.AreEqual(0, sender2CreditStateUpdated);
+
+ peer.WaitForScriptToComplete();
+ Assert.IsNull(failure);
+ }
+
[Test]
public void
TestHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]