blankensteiner commented on a change in pull request #23:
URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449505264
##########
File path: src/DotPulsar/Internal/ProducerChannel.cs
##########
@@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata
metadata, ReadOnlySequence<
if (autoAssignSequenceId)
{
- sendPackage.Command.SequenceId = _sequenceId.Current;
- sendPackage.Metadata.SequenceId = _sequenceId.Current;
+ var newSequenceId = _sequenceId.FetchNext();
+ sendPackage.Command.SequenceId = newSequenceId;
+ sendPackage.Metadata.SequenceId = newSequenceId;
}
else
sendPackage.Command.SequenceId =
sendPackage.Metadata.SequenceId;
var response = await _connection.Send(sendPackage,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
- if (autoAssignSequenceId)
- _sequenceId.Increment();
Review comment:
The problem with removing this is that the sequenceId will be moved
forward even if sending the package fails (either the broker rejects it or the
connection is lost).
##########
File path: src/DotPulsar/DotPulsar.csproj
##########
@@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.ObjectPool"
Version="3.1.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0"
PrivateAssets="All" />
- <PackageReference Include="protobuf-net" Version="2.4.6" />
+ <PackageReference Include="protobuf-net" Version="2.3.*" />
Review comment:
Hi Tim :-)
Could you elaborate a bit on the need for this downgrade?
I'm asking because version 3.0.13 is out now and version 3.1 (which I hope
will be coming soon) will bring support for [ReadOnly]Memory<T> and
ReadOnlySequence<T>, which will give a significant performance boost.
Supporting older version of protobuf-net is therefore not something I aim
for, but if it's a problem, then we could look into including protobuf-net in
DotPulsar the same way Newtonsoft.Json is included in Elasticsearch.Net (or
something like that).
##########
File path: src/DotPulsar/Internal/SequenceId.cs
##########
@@ -12,21 +12,24 @@
* limitations under the License.
*/
+using System.Threading;
+
namespace DotPulsar.Internal
{
public sealed class SequenceId
{
+ private long _current;
+
public SequenceId(ulong initialSequenceId)
{
- Current = initialSequenceId;
-
- if (initialSequenceId > 0)
- Increment();
+ // Subtracting one because Interlocked.Increment will return the
post-incremented value
+ // which is expected to be the initialSequenceId for the first call
+ _current = unchecked((long)initialSequenceId - 1);
}
- public ulong Current { get; private set; }
-
- public void Increment()
- => ++Current;
+ public ulong FetchNext()
+ {
+ return unchecked((ulong)Interlocked.Increment(ref _current));
Review comment:
I wonder if it would be more performant to have _current be a ulong and
just return Interlocked.Increment(ref _current) - 1?
##########
File path: src/DotPulsar/Internal/RequestResponseHandler.cs
##########
@@ -53,31 +54,40 @@ private void SetRequestId(BaseCommand cmd)
switch (cmd.CommandType)
{
case BaseCommand.Type.Seek:
- cmd.Seek.RequestId = _requestId++;
+ cmd.Seek.RequestId = _requestId.FetchNext();
+ _pastInitialRequestId = true;
Review comment:
An alternative implementation would be to store the initial value on the
SequenceId object and then have an IsPastInitialRequestId comparing that to the
current number. Let me know what you think.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]