This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new a2011a3 PROTON-2808 Account for zero copy buffers on ingest activity
tracking
a2011a3 is described below
commit a2011a3e0f5decf2814212ec514c1ecac3ae2854
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 15 14:22:47 2024 -0400
PROTON-2808 Account for zero copy buffers on ingest activity tracking
Update data ingest in the engine to account for the zero copy mechanics
in proton buffers resulting in missing the update to the incoming
sequence tracker and early read check idle timeout errors.
---
src/Proton/Engine/Implementation/ProtonEngine.cs | 18 +++---
.../Engine/Implementation/ProtonEngineTest.cs | 70 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 8 deletions(-)
diff --git a/src/Proton/Engine/Implementation/ProtonEngine.cs
b/src/Proton/Engine/Implementation/ProtonEngine.cs
index 88ecd39..953dcf0 100644
--- a/src/Proton/Engine/Implementation/ProtonEngine.cs
+++ b/src/Proton/Engine/Implementation/ProtonEngine.cs
@@ -231,19 +231,21 @@ namespace Apache.Qpid.Proton.Engine.Implementation
throw new EngineNotWritableException("Engine is currently not
accepting new input");
}
- try
+ if (input.IsReadable)
{
- long startIndex = input.ReadOffset;
- pipeline.FireRead(input);
- if (input.ReadOffset != startIndex)
+ try
+ {
+ pipeline.FireRead(input);
+ }
+ catch (Exception error)
+ {
+ throw EngineFailed(error);
+ }
+ finally
{
inputSequence++;
}
}
- catch (Exception error)
- {
- throw EngineFailed(error);
- }
return this;
}
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
index a24192e..3f2d0b6 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
@@ -1452,5 +1452,75 @@ namespace Apache.Qpid.Proton.Engine.Implementation
peer.WaitForScriptToComplete();
}
+
+ [Test]
+ public void TestSlowFrameCoalesceDoesNotTriggerReadIdleTimeout()
+ {
+ // Frame data for: Transfer
+ // Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x01,
messageFormat=null, settled=true, more=false,
+ // rcvSettleMode=null, state=null, resume=false,
aborted=false, batchable=false}
+ // payload of size: 169
+ byte[] completedTransfer1 = new byte[] {
+ 0, 0, 0, 193, 2, 0, 0, 0, 0, 83, 20, 192, 11, 5, 82, 0, 82, 1,
160, 2, 0, 1, 64, 65, 0, 83, 115,
+ 208, 0, 0, 0, 28, 0, 0, 0, 3, 152, 149, 181, 19, 123, 103, 50, 77,
43, 183, 93, 29, 105, 64};
+ byte[] completedTransfer2 = new byte[] {
+ 172, 45, 110, 64, 161, 4, 116, 101, 115, 116, 0, 83, 116, 193, 23,
2, 161, 9, 116, 105, 109, 101,
+ 115, 116, 97, 109, 112, 161, 9, 49, 50, 51, 52, 53, 54, 55, 56,
57, 0, 83, 117, 160, 100, 65, 65};
+ byte[] completedTransfer3 = new byte[] {
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65};
+
+ IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+ engine.ErrorHandler((error) => failure = error.FailureCause);
+ ProtonTestConnector peer = CreateTestPeer(engine);
+
+ peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+ peer.ExpectOpen().WithIdleTimeOut(1000).Respond().WithIdleTimeOut(0);
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().Respond();
+ peer.ExpectFlow();
+
+ IConnection connection = engine.Start();
+ connection.IdleTimeout = 1000;
+ connection.Open();
+ ISession session = connection.Session().Open();
+ IReceiver receiver = session.Receiver("test").Open().AddCredit(10);
+
+ bool deliveryArrived = false;
+ IIncomingDelivery receivedDelivery = null;
+ receiver.DeliveryReadHandler((delivery) =>
+ {
+ deliveryArrived = true;
+ receivedDelivery = delivery;
+ });
+
+ // Initial tick sets first deadline
+ Assert.AreEqual(2000, connection.Tick(1000));
+
+ peer.RemoteBytes().WithBytes(completedTransfer1).Now();
+ Assert.AreEqual(2500, connection.Tick(1500));
+
+ peer.RemoteBytes().WithBytes(completedTransfer2).Now();
+ Assert.AreEqual(3000, connection.Tick(2000));
+
+ peer.RemoteBytes().WithBytes(completedTransfer3).Now();
+ Assert.AreEqual(3500, connection.Tick(2500));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectDetach().Respond();
+ peer.ExpectEnd().Respond();
+
+ Assert.IsTrue(deliveryArrived);
+ Assert.IsNotNull(receivedDelivery);
+
+ receiver.Detach();
+ session.Close();
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsNull(failure);
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]