This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5a67db  Moving SequenceId from the ProducerChannel to the Producer.
f5a67db is described below

commit f5a67dbca2a1ac7b116f6bae6d1d4524d7563e46
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Jul 6 13:46:01 2020 +0200

    Moving SequenceId from the ProducerChannel to the Producer.
---
 samples/Producing/Program.cs                       |  6 ++-
 .../Internal/Abstractions/IProducerChannel.cs      |  2 +-
 src/DotPulsar/Internal/ConsumerChannelFactory.cs   |  1 -
 src/DotPulsar/Internal/NotReadyChannel.cs          |  2 +-
 src/DotPulsar/Internal/Producer.cs                 | 23 ++++++++--
 src/DotPulsar/Internal/ProducerChannel.cs          | 52 ++++++++++------------
 src/DotPulsar/Internal/ProducerChannelFactory.cs   |  5 +--
 src/DotPulsar/Internal/ReaderChannelFactory.cs     |  1 -
 .../IProducerChannel.cs => RequestId.cs}           | 22 ++++-----
 src/DotPulsar/Internal/RequestResponseHandler.cs   |  4 +-
 src/DotPulsar/Internal/SequenceId.cs               | 18 ++------
 src/DotPulsar/PulsarClient.cs                      |  2 +-
 12 files changed, 70 insertions(+), 68 deletions(-)

diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 134d406..8db5b3d 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -61,8 +61,10 @@ namespace Producing
             {
                 while (!cancellationToken.IsCancellationRequested)
                 {
-                    var data = Encoding.UTF8.GetBytes("Sent " + 
DateTime.UtcNow);
-                    _ = await producer.Send(data, 
cancellationToken).ConfigureAwait(false);
+                    var data = DateTime.UtcNow.ToLongTimeString();
+                    var bytes = Encoding.UTF8.GetBytes(data);
+                    _ = await producer.Send(bytes, 
cancellationToken).ConfigureAwait(false);
+                    Console.WriteLine("Sent: " + data);
                     await Task.Delay(delay).ConfigureAwait(false);
                 }
             }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 8375447..9fe2eb1 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,7 @@ namespace DotPulsar.Internal.Abstractions
 
     public interface IProducerChannel : IAsyncDisposable
     {
-        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken);
+        Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> 
payload, CancellationToken cancellationToken);
         Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs 
b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 94a68b9..f98d9fd 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -66,7 +66,6 @@ namespace DotPulsar.Internal
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, 
messageQueue);
             var response = await connection.Send(_subscribe, channel, 
cancellationToken).ConfigureAwait(false);
-
             return new ConsumerChannel(response.ConsumerId, 
_messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs 
b/src/DotPulsar/Internal/NotReadyChannel.cs
index 93f572a..fee9bb4 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -30,7 +30,7 @@ namespace DotPulsar.Internal
         public ValueTask<Message> Receive(CancellationToken cancellationToken 
= default)
             => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken)
+        public Task<CommandSendReceipt> Send(ulong sequenceId, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
             => throw GetException();
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index efe5bc0..da2aacd 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,6 +30,7 @@ namespace DotPulsar.Internal
         private IProducerChannel _channel;
         private readonly IExecute _executor;
         private readonly IStateChanged<ProducerState> _state;
+        private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
         public string Topic { get; }
@@ -37,6 +38,7 @@ namespace DotPulsar.Internal
         public Producer(
             Guid correlationId,
             string topic,
+            ulong initialSequenceId,
             IRegisterEvent registerEvent,
             IProducerChannel initialChannel,
             IExecute executor,
@@ -44,6 +46,7 @@ namespace DotPulsar.Internal
         {
             _correlationId = correlationId;
             Topic = topic;
+            _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
             _channel = initialChannel;
             _executor = executor;
@@ -90,7 +93,8 @@ namespace DotPulsar.Internal
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            var sequenceId = _sequenceId.FetchNext();
+            var response = await _executor.Execute(() => 
_channel.Send(sequenceId, data, cancellationToken), 
cancellationToken).ConfigureAwait(false);
             return new MessageId(response.MessageId);
         }
 
@@ -103,8 +107,21 @@ namespace DotPulsar.Internal
         public async ValueTask<MessageId> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => 
_channel.Send(metadata.Metadata, data, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-            return new MessageId(response.MessageId);
+
+            var autoAssignSequenceId = metadata.SequenceId == 0;
+            if (autoAssignSequenceId)
+                metadata.SequenceId = _sequenceId.FetchNext();
+
+            try
+            {
+                var response = await _executor.Execute(() => 
_channel.Send(metadata.Metadata, data, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+                return new MessageId(response.MessageId);
+            }
+            finally
+            {
+                if (autoAssignSequenceId)
+                    metadata.SequenceId = 0;
+            }
         }
 
         internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs 
b/src/DotPulsar/Internal/ProducerChannel.cs
index 0ba8cd1..e7e062b 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -25,20 +25,20 @@ namespace DotPulsar.Internal
 
     public sealed class ProducerChannel : IProducerChannel
     {
-        private readonly MessageMetadata _cachedMetadata;
+        private readonly ObjectPool<MessageMetadata> _messageMetadataPool;
         private readonly ObjectPool<SendPackage> _sendPackagePool;
         private readonly ulong _id;
         private readonly string _name;
-        private readonly SequenceId _sequenceId;
         private readonly IConnection _connection;
 
-        public ProducerChannel(ulong id, string name, SequenceId sequenceId, 
IConnection connection)
+        public ProducerChannel(ulong id, string name, IConnection connection)
         {
-            _cachedMetadata = new MessageMetadata { ProducerName = name };
-            _sendPackagePool = new DefaultObjectPool<SendPackage>(new 
DefaultPooledObjectPolicy<SendPackage>());
+            var messageMetadataPolicy = new 
DefaultPooledObjectPolicy<MessageMetadata>();
+            _messageMetadataPool = new 
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+            var sendPackagePolicy = new 
DefaultPooledObjectPolicy<SendPackage>();
+            _sendPackagePool = new 
DefaultObjectPool<SendPackage>(sendPackagePolicy);
             _id = id;
             _name = name;
-            _sequenceId = sequenceId;
             _connection = connection;
         }
 
@@ -55,19 +55,30 @@ namespace DotPulsar.Internal
             }
         }
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken)
-            => SendPackage(_cachedMetadata, payload, true, cancellationToken);
+        public Task<CommandSendReceipt> Send(ulong sequenceId, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+        {
+            var metadata = _messageMetadataPool.Get();
+            metadata.ProducerName = _name;
+            metadata.SequenceId = sequenceId;
+            try
+            {
+                return SendPackage(metadata, payload, cancellationToken);
+            }
+            finally
+            {
+                _messageMetadataPool.Return(metadata);
+            }
+        }
 
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             metadata.ProducerName = _name;
-            return SendPackage(metadata, payload, metadata.SequenceId == 0, 
cancellationToken);
+            return SendPackage(metadata, payload, cancellationToken);
         }
 
         private async Task<CommandSendReceipt> SendPackage(
             MessageMetadata metadata,
             ReadOnlySequence<byte> payload,
-            bool autoAssignSequenceId,
             CancellationToken cancellationToken)
         {
             var sendPackage = _sendPackagePool.Get();
@@ -81,36 +92,21 @@ namespace DotPulsar.Internal
                 };
             }
 
+            sendPackage.Command.SequenceId = metadata.SequenceId;
             sendPackage.Metadata = metadata;
             sendPackage.Payload = payload;
+            metadata.PublishTime = (ulong) 
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // TODO Benchmark against 
StopWatch
 
             try
             {
-                metadata.PublishTime = (ulong) 
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
-
-                if (autoAssignSequenceId)
-                {
-                    var newSequenceId = _sequenceId.FetchNext();
-                    sendPackage.Command.SequenceId = newSequenceId;
-                    sendPackage.Metadata.SequenceId = newSequenceId;
-                }
-                else
-                    sendPackage.Command.SequenceId = 
sendPackage.Metadata.SequenceId;
-
                 var response = await _connection.Send(sendPackage, 
cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
-
                 return response.SendReceipt;
             }
             finally
             {
-                // Reset in case the user reuse the MessageMetadata, but is 
not explicitly setting the sequenceId
-                if (autoAssignSequenceId)
-                    sendPackage.Metadata.SequenceId = 0;
-
                 _sendPackagePool.Return(sendPackage);
             }
         }
     }
 }
-
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index e7d8327..87c76de 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,7 +26,6 @@ namespace DotPulsar.Internal
         private readonly IRegisterEvent _eventRegister;
         private readonly IConnectionPool _connectionPool;
         private readonly IExecute _executor;
-        private readonly SequenceId _sequenceId;
         private readonly CommandProducer _commandProducer;
 
         public ProducerChannelFactory(
@@ -40,7 +39,6 @@ namespace DotPulsar.Internal
             _eventRegister = eventRegister;
             _connectionPool = connectionPool;
             _executor = executor;
-            _sequenceId = new SequenceId(options.InitialSequenceId);
 
             _commandProducer = new CommandProducer
             {
@@ -57,8 +55,7 @@ namespace DotPulsar.Internal
             var connection = await 
_connectionPool.FindConnectionForTopic(_commandProducer.Topic, 
cancellationToken).ConfigureAwait(false);
             var channel = new Channel(_correlationId, _eventRegister, new 
AsyncQueue<MessagePackage>());
             var response = await connection.Send(_commandProducer, channel, 
cancellationToken).ConfigureAwait(false);
-
-            return new ProducerChannel(response.ProducerId, 
response.ProducerName, _sequenceId, connection);
+            return new ProducerChannel(response.ProducerId, 
response.ProducerName, connection);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs 
b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 03140ad..9efe033 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -65,7 +65,6 @@ namespace DotPulsar.Internal
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, 
messageQueue);
             var response = await connection.Send(_subscribe, channel, 
cancellationToken).ConfigureAwait(false);
-
             return new ConsumerChannel(response.ConsumerId, 
_messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/RequestId.cs
similarity index 55%
copy from src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
copy to src/DotPulsar/Internal/RequestId.cs
index 8375447..128ac5e 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/RequestId.cs
@@ -12,17 +12,19 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Internal
 {
-    using PulsarApi;
-    using System;
-    using System.Buffers;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    public interface IProducerChannel : IAsyncDisposable
+    public sealed class RequestId
     {
-        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken);
-        Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+        private ulong _current;
+
+        public RequestId()
+            => _current = 0;
+
+        public bool IsPastInitialId()
+            => _current != 0;
+
+        public ulong FetchNext()
+            => _current++;
     }
 }
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs 
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c855337..1885aa4 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@ namespace DotPulsar.Internal
         private const string ConnectResponseIdentifier = "Connected";
 
         private readonly Awaiter<string, BaseCommand> _responses;
-        private SequenceId _requestId;
+        private RequestId _requestId;
 
         public RequestResponseHandler()
         {
             _responses = new Awaiter<string, BaseCommand>();
-            _requestId = new SequenceId(1);
+            _requestId = new RequestId();
         }
 
         public void Dispose()
diff --git a/src/DotPulsar/Internal/SequenceId.cs 
b/src/DotPulsar/Internal/SequenceId.cs
index 55ba94a..aba1c25 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,32 +12,22 @@
  * limitations under the License.
  */
 
-using System.Threading;
-
 namespace DotPulsar.Internal
 {
+    using System.Threading;
+
     public sealed class SequenceId
     {
         private long _current;
-        private ulong _initial;
 
         public SequenceId(ulong initialSequenceId)
         {
             // Subtracting one because Interlocked.Increment will return the 
post-incremented value
             // which is expected to be the initialSequenceId for the first call
-            _current = unchecked((long)initialSequenceId - 1);
-            _initial = initialSequenceId - 1;
-        }
-
-        // Returns false if FetchNext has not been called on this object 
before (or if it somehow wrapped around 2^64)
-        public bool IsPastInitialId()
-        {
-            return unchecked((ulong)_current != _initial);
+            _current = unchecked((long) initialSequenceId - 1);
         }
 
         public ulong FetchNext()
-        {
-            return unchecked((ulong)Interlocked.Increment(ref _current));
-        }
+            => unchecked((ulong) Interlocked.Increment(ref _current));
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 260f3ce..982ee3b 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@ namespace DotPulsar
             var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
             var factory = new ProducerChannelFactory(correlationId, 
_processManager, _connectionPool, executor, options);
             var stateManager = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
-            var producer = new Producer(correlationId, options.Topic, 
_processManager, new NotReadyChannel(), executor, stateManager);
+            var producer = new Producer(correlationId, options.Topic, 
options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, 
stateManager);
             var process = new ProducerProcess(correlationId, stateManager, 
factory, producer);
             _processManager.Add(process);
             process.Start();

Reply via email to