This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02ad325 Performance tuning PulsarStream (#103)
02ad325 is described below
commit 02ad325b9a8f932e08ad96c2f40448b7bf0ace0a
Author: Kristian Andersen <[email protected]>
AuthorDate: Wed Apr 20 12:50:11 2022 +0200
Performance tuning PulsarStream (#103)
Use ReadAtLeast to wait until a usable amount of data is ready.
---
src/DotPulsar/Internal/PulsarStream.cs | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/src/DotPulsar/Internal/PulsarStream.cs
b/src/DotPulsar/Internal/PulsarStream.cs
index cbf3104..7c6fbe0 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -117,9 +117,11 @@ public sealed class PulsarStream : IPulsarStream
try
{
+ uint? frameSize = null;
while (true)
{
- var result = await
_reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+ var minimumSize = frameSize.HasValue ? (int) frameSize.Value +
4 : 4;
+ var result = await _reader.ReadAtLeastAsync(minimumSize,
cancellationToken).ConfigureAwait(false);
var buffer = result.Buffer;
while (true)
@@ -127,15 +129,16 @@ public sealed class PulsarStream : IPulsarStream
if (buffer.Length < 4)
break;
- var frameSize = buffer.ReadUInt32(0, true);
- var totalSize = frameSize + 4;
+ frameSize ??= buffer.ReadUInt32(0, true);
+ var totalSize = frameSize.Value + 4;
if (buffer.Length < totalSize)
break;
- yield return buffer.Slice(4, frameSize);
+ yield return buffer.Slice(4, frameSize.Value);
buffer = buffer.Slice(totalSize);
+ frameSize = null;
}
if (result.IsCompleted)