This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 0d420ac PROTON-2584 Better manage prefetch and credit replenish
0d420ac is described below
commit 0d420ac3a738bbbee390eaebe39d7353a00410e6
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Jul 27 13:41:38 2022 -0400
PROTON-2584 Better manage prefetch and credit replenish
Better manage the handling of prefetched messages and how the
credit replenish check determines when to refill to improve
the time it takes to provide a message from prefetch to a
receive call.
---
.../Client/Implementation/ClientReceiver.cs | 55 +++++++++++++++-------
.../Implementation/ClientReceiverLinkType.cs | 2 +-
2 files changed, 38 insertions(+), 19 deletions(-)
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiver.cs
b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
index 221fa21..af49833 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
@@ -37,6 +37,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
private readonly IDeque<TaskCompletionSource<IDelivery>> receiveRequests
=
new ArrayDeque<TaskCompletionSource<IDelivery>>();
+ private readonly IDeque<IIncomingDelivery> prefetch = new
ArrayDeque<IIncomingDelivery>();
internal ClientReceiver(ClientSession session, ReceiverOptions options,
string receiverId, Engine.IReceiver receiver)
: base(session, options, receiverId, receiver)
@@ -77,21 +78,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
{
if (NotClosedOrFailed(receive))
{
- IIncomingDelivery delivery = null;
-
- // Scan all unsettled deliveries in the link and check if any
are still
- // lacking a linked stream delivery instance which indicates
they have
- // not been made part of a receive yet.
- foreach (IIncomingDelivery candidate in protonLink.Unsettled)
- {
- if (candidate.LinkedResource == null && !candidate.IsPartial)
- {
- delivery = candidate;
- break;
- }
- }
-
- if (delivery == null)
+ // Check prefetch for an available message
+ if (!prefetch.TryDequeue(out IIncomingDelivery delivery))
{
if (timeout == TimeSpan.Zero)
{
@@ -162,6 +150,34 @@ namespace Apache.Qpid.Proton.Client.Implementation
protected override IReceiver Self => this;
+ protected override void ReplenishCreditIfNeeded()
+ {
+ uint creditWindow = options.CreditWindow;
+ if (creditWindow > 0)
+ {
+ uint currentCredit = protonLink.Credit;
+ if (currentCredit <= creditWindow * 0.5)
+ {
+ uint potentialPrefetch = currentCredit + (uint)prefetch.Count;
+
+ if (potentialPrefetch <= creditWindow * 0.7)
+ {
+ uint additionalCredit = creditWindow - potentialPrefetch;
+
+ LOG.Trace("Receiver granting additional credit: {0}",
additionalCredit);
+ try
+ {
+ protonLink.AddCredit(additionalCredit);
+ }
+ catch (Exception ex)
+ {
+ LOG.Debug("Error caught during credit top-up", ex);
+ }
+ }
+ }
+ }
+ }
+
#endregion
#region Private Receiver Implementation
@@ -388,6 +404,9 @@ namespace Apache.Qpid.Proton.Client.Implementation
if (!delivery.IsPartial)
{
+ // Either there is a waiter or we enqueue this into the prefetch
buffer far
+ // later receive calls. If there was a waiter we must either auto
accept or
+ // check for credit window expansion to complete the async
operation.
LOG.Trace("{0} has incoming Message(s).", this);
if (receiveRequests.TryDequeue(out TaskCompletionSource<IDelivery>
entry))
{
@@ -403,9 +422,9 @@ namespace Apache.Qpid.Proton.Client.Implementation
}
else
{
- // Ensure the credit window is expanded to allow the prefetch
- // to fill since we are delivering fully read messages from this
- // receiver
+ prefetch.Enqueue(delivery);
+ // Allow the prefetch to fill even in the case there is a
session window
+ // that would otherwise prevent new incoming bytes.
delivery.ClaimAvailableBytes();
}
}
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index f619258..72f41d0 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -158,7 +158,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
}
}
- protected void ReplenishCreditIfNeeded()
+ protected virtual void ReplenishCreditIfNeeded()
{
uint creditWindow = options.CreditWindow;
if (creditWindow > 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]