This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new a2011a3  PROTON-2808 Account for zero copy buffers on ingest activity 
tracking
a2011a3 is described below

commit a2011a3e0f5decf2814212ec514c1ecac3ae2854
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 15 14:22:47 2024 -0400

    PROTON-2808 Account for zero copy buffers on ingest activity tracking
    
    Update data ingest in the engine to account for the zero copy mechanics
    in proton buffers resulting in missing the update to the incoming
    sequence tracker and early read check idle timeout errors.
---
 src/Proton/Engine/Implementation/ProtonEngine.cs   | 18 +++---
 .../Engine/Implementation/ProtonEngineTest.cs      | 70 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 8 deletions(-)

diff --git a/src/Proton/Engine/Implementation/ProtonEngine.cs 
b/src/Proton/Engine/Implementation/ProtonEngine.cs
index 88ecd39..953dcf0 100644
--- a/src/Proton/Engine/Implementation/ProtonEngine.cs
+++ b/src/Proton/Engine/Implementation/ProtonEngine.cs
@@ -231,19 +231,21 @@ namespace Apache.Qpid.Proton.Engine.Implementation
             throw new EngineNotWritableException("Engine is currently not 
accepting new input");
          }
 
-         try
+         if (input.IsReadable)
          {
-            long startIndex = input.ReadOffset;
-            pipeline.FireRead(input);
-            if (input.ReadOffset != startIndex)
+            try
+            {
+               pipeline.FireRead(input);
+            }
+            catch (Exception error)
+            {
+               throw EngineFailed(error);
+            }
+            finally
             {
                inputSequence++;
             }
          }
-         catch (Exception error)
-         {
-            throw EngineFailed(error);
-         }
 
          return this;
       }
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs 
b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
index a24192e..3f2d0b6 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
@@ -1452,5 +1452,75 @@ namespace Apache.Qpid.Proton.Engine.Implementation
 
          peer.WaitForScriptToComplete();
       }
+
+      [Test]
+      public void TestSlowFrameCoalesceDoesNotTriggerReadIdleTimeout()
+      {
+         // Frame data for: Transfer
+         //   Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x01, 
messageFormat=null, settled=true, more=false,
+         //            rcvSettleMode=null, state=null, resume=false, 
aborted=false, batchable=false}
+         //   payload of size: 169
+         byte[] completedTransfer1 = new byte[] {
+            0, 0, 0, 193, 2, 0, 0, 0, 0, 83, 20, 192, 11, 5, 82, 0, 82, 1, 
160, 2, 0, 1, 64, 65, 0, 83, 115,
+            208, 0, 0, 0, 28, 0, 0, 0, 3, 152, 149, 181, 19, 123, 103, 50, 77, 
43, 183, 93, 29, 105, 64};
+         byte[] completedTransfer2 = new byte[] {
+            172, 45, 110, 64, 161, 4, 116, 101, 115, 116, 0, 83, 116, 193, 23, 
2, 161, 9, 116, 105, 109, 101,
+            115, 116, 97, 109, 112, 161, 9, 49, 50, 51, 52, 53, 54, 55, 56, 
57, 0, 83, 117, 160, 100, 65, 65};
+         byte[] completedTransfer3 = new byte[] {
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65};
+
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         ProtonTestConnector peer = CreateTestPeer(engine);
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().WithIdleTimeOut(1000).Respond().WithIdleTimeOut(0);
+         peer.ExpectBegin().Respond();
+         peer.ExpectAttach().Respond();
+         peer.ExpectFlow();
+
+         IConnection connection = engine.Start();
+         connection.IdleTimeout = 1000;
+         connection.Open();
+         ISession session = connection.Session().Open();
+         IReceiver receiver = session.Receiver("test").Open().AddCredit(10);
+
+         bool deliveryArrived = false;
+         IIncomingDelivery receivedDelivery = null;
+         receiver.DeliveryReadHandler((delivery) =>
+         {
+            deliveryArrived = true;
+            receivedDelivery = delivery;
+         });
+
+         // Initial tick sets first deadline
+         Assert.AreEqual(2000, connection.Tick(1000));
+
+         peer.RemoteBytes().WithBytes(completedTransfer1).Now();
+         Assert.AreEqual(2500, connection.Tick(1500));
+
+         peer.RemoteBytes().WithBytes(completedTransfer2).Now();
+         Assert.AreEqual(3000, connection.Tick(2000));
+
+         peer.RemoteBytes().WithBytes(completedTransfer3).Now();
+         Assert.AreEqual(3500, connection.Tick(2500));
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectDetach().Respond();
+         peer.ExpectEnd().Respond();
+
+         Assert.IsTrue(deliveryArrived);
+         Assert.IsNotNull(receivedDelivery);
+
+         receiver.Detach();
+         session.Close();
+
+         peer.WaitForScriptToComplete();
+
+         Assert.IsNull(failure);
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to