This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d420ac  PROTON-2584 Better manage prefetch and credit replenish
0d420ac is described below

commit 0d420ac3a738bbbee390eaebe39d7353a00410e6
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Jul 27 13:41:38 2022 -0400

    PROTON-2584 Better manage prefetch and credit replenish
    
    Better manage the handling of prefetched messages and how the
    credit replenish check determines when to refill to improve
    the time it takes to provide a message from prefetch to a
    receive call.
---
 .../Client/Implementation/ClientReceiver.cs        | 55 +++++++++++++++-------
 .../Implementation/ClientReceiverLinkType.cs       |  2 +-
 2 files changed, 38 insertions(+), 19 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientReceiver.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
index 221fa21..af49833 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiver.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiver.cs
@@ -37,6 +37,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       private readonly IDeque<TaskCompletionSource<IDelivery>> receiveRequests 
=
          new ArrayDeque<TaskCompletionSource<IDelivery>>();
+      private readonly IDeque<IIncomingDelivery> prefetch = new 
ArrayDeque<IIncomingDelivery>();
 
       internal ClientReceiver(ClientSession session, ReceiverOptions options, 
string receiverId, Engine.IReceiver receiver)
        : base(session, options, receiverId, receiver)
@@ -77,21 +78,8 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             if (NotClosedOrFailed(receive))
             {
-               IIncomingDelivery delivery = null;
-
-               // Scan all unsettled deliveries in the link and check if any 
are still
-               // lacking a linked stream delivery instance which indicates 
they have
-               // not been made part of a receive yet.
-               foreach (IIncomingDelivery candidate in protonLink.Unsettled)
-               {
-                  if (candidate.LinkedResource == null && !candidate.IsPartial)
-                  {
-                     delivery = candidate;
-                     break;
-                  }
-               }
-
-               if (delivery == null)
+               // Check prefetch for an available message
+               if (!prefetch.TryDequeue(out IIncomingDelivery delivery))
                {
                   if (timeout == TimeSpan.Zero)
                   {
@@ -162,6 +150,34 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
       protected override IReceiver Self => this;
 
+      protected override void ReplenishCreditIfNeeded()
+      {
+         uint creditWindow = options.CreditWindow;
+         if (creditWindow > 0)
+         {
+            uint currentCredit = protonLink.Credit;
+            if (currentCredit <= creditWindow * 0.5)
+            {
+               uint potentialPrefetch = currentCredit + (uint)prefetch.Count;
+
+               if (potentialPrefetch <= creditWindow * 0.7)
+               {
+                  uint additionalCredit = creditWindow - potentialPrefetch;
+
+                  LOG.Trace("Receiver granting additional credit: {0}", 
additionalCredit);
+                  try
+                  {
+                     protonLink.AddCredit(additionalCredit);
+                  }
+                  catch (Exception ex)
+                  {
+                     LOG.Debug("Error caught during credit top-up", ex);
+                  }
+               }
+            }
+         }
+      }
+
       #endregion
 
       #region Private Receiver Implementation
@@ -388,6 +404,9 @@ namespace Apache.Qpid.Proton.Client.Implementation
 
          if (!delivery.IsPartial)
          {
+            // Either there is a waiter or we enqueue this into the prefetch 
buffer far
+            // later receive calls. If there was a waiter we must either auto 
accept or
+            // check for credit window expansion to complete the async 
operation.
             LOG.Trace("{0} has incoming Message(s).", this);
             if (receiveRequests.TryDequeue(out TaskCompletionSource<IDelivery> 
entry))
             {
@@ -403,9 +422,9 @@ namespace Apache.Qpid.Proton.Client.Implementation
             }
             else
             {
-               // Ensure the credit window is expanded to allow the prefetch
-               // to fill since we are delivering fully read messages from this
-               // receiver
+               prefetch.Enqueue(delivery);
+               // Allow the prefetch to fill even in the case there is a 
session window
+               // that would otherwise prevent new incoming bytes.
                delivery.ClaimAvailableBytes();
             }
          }
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs 
b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index f619258..72f41d0 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -158,7 +158,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
-      protected void ReplenishCreditIfNeeded()
+      protected virtual void ReplenishCreditIfNeeded()
       {
          uint creditWindow = options.CreditWindow;
          if (creditWindow > 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to