This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02ad325  Performance tuning PulsarStream (#103)
02ad325 is described below

commit 02ad325b9a8f932e08ad96c2f40448b7bf0ace0a
Author: Kristian Andersen <[email protected]>
AuthorDate: Wed Apr 20 12:50:11 2022 +0200

    Performance tuning PulsarStream (#103)
    
    Use ReadAtLeast to wait until a usable amount of data is ready.
---
 src/DotPulsar/Internal/PulsarStream.cs | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/src/DotPulsar/Internal/PulsarStream.cs 
b/src/DotPulsar/Internal/PulsarStream.cs
index cbf3104..7c6fbe0 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -117,9 +117,11 @@ public sealed class PulsarStream : IPulsarStream
 
         try
         {
+            uint? frameSize = null;
             while (true)
             {
-                var result = await 
_reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+                var minimumSize = frameSize.HasValue ? (int) frameSize.Value + 
4 : 4;
+                var result = await _reader.ReadAtLeastAsync(minimumSize, 
cancellationToken).ConfigureAwait(false);
                 var buffer = result.Buffer;
 
                 while (true)
@@ -127,15 +129,16 @@ public sealed class PulsarStream : IPulsarStream
                     if (buffer.Length < 4)
                         break;
 
-                    var frameSize = buffer.ReadUInt32(0, true);
-                    var totalSize = frameSize + 4;
+                    frameSize ??= buffer.ReadUInt32(0, true);
+                    var totalSize = frameSize.Value + 4;
 
                     if (buffer.Length < totalSize)
                         break;
 
-                    yield return buffer.Slice(4, frameSize);
+                    yield return buffer.Slice(4, frameSize.Value);
 
                     buffer = buffer.Slice(totalSize);
+                    frameSize = null;
                 }
 
                 if (result.IsCompleted)

Reply via email to