blankensteiner commented on code in PR #109:
URL: https://github.com/apache/pulsar-dotpulsar/pull/109#discussion_r1009409230


##########
src/DotPulsar/Abstractions/IProducerBuilder.cs:
##########
@@ -54,6 +54,11 @@ public interface IProducerBuilder<TMessage>
     /// </summary>
     IProducerBuilder<TMessage> MessageRouter(IMessageRouter messageRouter);
 
+    /// <summary>
+    /// Set the max size of the queue holding the messages pending to receive 
an acknowledgment from the broker. The default is 500.

Review Comment:
   Would it make sense to set the default to 1000 (prefetch is also 1000)? What 
does the java client set it to?



##########
src/DotPulsar/Internal/Producer.cs:
##########
@@ -40,17 +40,20 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     private readonly ICompressorFactory? _compressorFactory;
     private readonly ProducerOptions<TMessage> _options;
     private readonly ProcessManager _processManager;
-    private readonly ConcurrentDictionary<int, SubProducer<TMessage>> 
_producers;
+    private readonly ConcurrentDictionary<int, SubProducer> _producers;
     private readonly IMessageRouter _messageRouter;
     private readonly CancellationTokenSource _cts;
     private readonly IExecute _executor;
     private int _isDisposed;
     private int _producerCount;
+    private ISendChannel<TMessage>? _sendChannel;
     private Exception? _throw;
 
     public Uri ServiceUrl { get; }
     public string Topic { get; }
 
+    public ISendChannel<TMessage> SendChannel { get => _sendChannel ??= new 
SendChannel<TMessage>(this); }

Review Comment:
   I don't think this is thread-safe, so better just to create it in the 
constructor.



##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using Exceptions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendChannel<TMessage> : ISendChannel<TMessage>

Review Comment:
   Let's keep it sealed



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<ValueTask> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask<TResult> Execute<TResult>(Func<TResult> func, 
CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                action();
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<ValueTask> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask<TResult> Execute<TResult>(Func<TResult> func, 
CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                action();
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);
+            if (success) return result!;
         }
     }
 
-    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func, 
CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                await func().ConfigureAwait(false);
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/Abstractions/IProducerChannel.cs:
##########
@@ -22,6 +22,8 @@ namespace DotPulsar.Internal.Abstractions;
 
 public interface IProducerChannel : IAsyncDisposable
 {
-    Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+    // TODO: Why does one return ValueTask and the other Task?

Review Comment:
   Good question. I would think it should just return ValueTask



##########
src/DotPulsar/Internal/Abstractions/IExecute.cs:
##########
@@ -31,4 +31,16 @@ public interface IExecute
     ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func, 
CancellationToken cancellationToken = default);
 
     ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>> func, 
CancellationToken cancellationToken = default);
+
+    ValueTask<bool> TryExecuteOnce(Action action, CancellationToken 
cancellationToken = default);
+
+    ValueTask<bool> TryExecuteOnce(Func<Task> func, CancellationToken 
cancellationToken = default);
+
+    ValueTask<bool> TryExecuteOnce(Func<ValueTask> func, CancellationToken 
cancellationToken = default);
+
+    ValueTask<(bool success, TResult? result)> 
TryExecuteOnce<TResult>(Func<TResult> func, CancellationToken cancellationToken 
= default);

Review Comment:
   Maybe "out TResult result" as a parameter instead? and then only returning 
bool?



##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;
+
+using DotPulsar.Abstractions;
+using Exceptions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendChannel<TMessage> : ISendChannel<TMessage>
+{
+    private readonly Producer<TMessage> _producer;
+    private int _isCompleted;
+
+    public SendChannel(Producer<TMessage> producer)
+    {
+        _producer = producer;
+    }
+
+    public async ValueTask Send(MessageMetadata metadata, TMessage message, 
Func<MessageId, ValueTask>? onMessageSent = default, CancellationToken 
cancellationToken = default)
+    {
+        if (_isCompleted != 0) throw new SendChannelCompletedException();
+        await _producer.Enqueue(metadata, message, onMessageSent, 
cancellationToken).ConfigureAwait(false);
+    }
+
+    public void Complete()
+    {
+        _isCompleted = 1;
+        //Interlocked.Exchange(ref _isCompleted, 1);

Review Comment:
   Remove if not needed :)



##########
src/DotPulsar/Internal/AsyncQueueWithCursor.cs:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using Exceptions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable
+{
+    private readonly AsyncLock _pendingLock;
+    private readonly SemaphoreSlim _cursorSemaphore;
+    private readonly LinkedList<T> _queue;
+    private readonly IList<TaskCompletionSource<object>> _queueEmptyTcs;
+    private readonly uint _maxItems;
+    private IDisposable? _pendingLockGrant;
+    private LinkedListNode<T>? _currentNode;
+    private TaskCompletionSource<LinkedListNode<T>>? _cursorNextItemTcs;
+    private int _isDisposed;
+
+    public AsyncQueueWithCursor(uint maxItems)
+    {
+        _pendingLock = new AsyncLock();
+        _cursorSemaphore = new SemaphoreSlim(1, 1);
+        // TODO: Figure out if we can use 
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcompletionsource?view=net-6.0

Review Comment:
   Can we? :-)



##########
src/DotPulsar/Internal/SendOp.cs:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using PulsarApi;
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+
+public class SendOp

Review Comment:
   sealed



##########
src/DotPulsar/Internal/AsyncQueueWithCursor.cs:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal;
+
+using Exceptions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+public sealed class AsyncQueueWithCursor<T> : IAsyncDisposable
+{
+    private readonly AsyncLock _pendingLock;
+    private readonly SemaphoreSlim _cursorSemaphore;
+    private readonly LinkedList<T> _queue;
+    private readonly IList<TaskCompletionSource<object>> _queueEmptyTcs;
+    private readonly uint _maxItems;
+    private IDisposable? _pendingLockGrant;
+    private LinkedListNode<T>? _currentNode;
+    private TaskCompletionSource<LinkedListNode<T>>? _cursorNextItemTcs;
+    private int _isDisposed;
+
+    public AsyncQueueWithCursor(uint maxItems)
+    {
+        _pendingLock = new AsyncLock();
+        _cursorSemaphore = new SemaphoreSlim(1, 1);
+        // TODO: Figure out if we can use 
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskcompletionsource?view=net-6.0
+        _queueEmptyTcs = new List<TaskCompletionSource<object>>();
+        _queue = new LinkedList<T>();
+        _maxItems = maxItems;
+    }
+
+    /// <summary>
+    /// Enqueue item
+    /// </summary>
+    public async ValueTask Enqueue(T item, CancellationToken cancellationToken)
+    {
+        ThrowIfDisposed();
+        try
+        {
+            var grant = await 
_pendingLock.Lock(cancellationToken).ConfigureAwait(false);
+            lock (_pendingLock)
+            {
+                _pendingLockGrant = grant;
+            }
+        }
+        catch (Exception)
+        {
+            ReleasePendingLockGrant();
+            throw;
+        }
+
+        lock (_queue)
+        {

Review Comment:
   Since we check the queue count I guess it can be full? If yes, then we would 
exit without adding the item to the queue.



##########
src/DotPulsar/Internal/SendChannel.cs:
##########
@@ -0,0 +1,35 @@
+namespace DotPulsar.Internal;

Review Comment:
   Need apache header



##########
src/DotPulsar/Internal/Exceptions/ProducerSendReceiptOrderingException.cs:
##########
@@ -0,0 +1,11 @@
+namespace DotPulsar.Internal.Exceptions;

Review Comment:
   Needs apache header



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/Producer.cs:
##########
@@ -259,37 +283,74 @@ public async ValueTask<MessageId> Send(MessageMetadata 
metadata, TMessage messag
         try
         {
             var partition = await ChoosePartitions(metadata, 
cancellationToken).ConfigureAwait(false);
-            var producer = _producers[partition];
+            var subProducer = _producers[partition];
             var data = _options.Schema.Encode(message);
 
-            var messageId = await producer.Send(metadata.Metadata, data, 
cancellationToken).ConfigureAwait(false);
-
-            if (startTimestamp != 0)
-                DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+            var tcs = new TaskCompletionSource<MessageId>();
+            await subProducer.Send(new SendOp(metadata.Metadata, data, tcs, 
sendOpCancelable ? cancellationToken : CancellationToken.None), 
cancellationToken).ConfigureAwait(false);
 
-            if (activity is not null && activity.IsAllDataRequested)
+            _ = tcs.Task.ContinueWith(async task =>
             {
-                activity.SetMessageId(messageId);
-                activity.SetPayloadSize(data.Length);
-                activity.SetStatus(ActivityStatusCode.Ok);
-            }
+                if (startTimestamp != 0)
+                    DotPulsarMeter.MessageSent(startTimestamp, _meterTags);
+
+                if (task.IsFaulted || task.IsCanceled)
+                {
+                    FailActivity(task.IsCanceled ? new 
OperationCanceledException() : task.Exception!, activity);
+
+                    if (autoAssignSequenceId)
+                        metadata.SequenceId = 0;
+                }
 
-            return messageId;
+                CompleteActivity(task.Result, data.Length, activity);
+                try
+                {
+                    if (onMessageSent is not null)
+                        await 
onMessageSent.Invoke(task.Result).ConfigureAwait(false);

Review Comment:
   Consider creating a task for executing the user's callback. They might 
highjack the task.



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<ValueTask> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -80,30 +80,133 @@ public async ValueTask DisposeAsync()
             return;
 
         _eventRegister.Register(new ProducerDisposed(_correlationId));
+        _newChannelLock.Dispose();
+
+        try
+        {
+            _dispatcherCts?.Cancel();
+            _dispatcherCts?.Dispose();
+            await (_dispatcherTask ?? 
Task.CompletedTask).ConfigureAwait(false);
+        }
+        catch
+        {
+            // Ignored
+        }
+        await _sendQueue.DisposeAsync().ConfigureAwait(false);
         await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
         await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage 
message, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata.Metadata, 
_schema.Encode(message), cancellationToken), 
cancellationToken).ConfigureAwait(false);
+    public async ValueTask Send(SendOp sendOp, CancellationToken 
cancellationToken)
+    {
+        if (IsFinalState()) throw new ProducerClosedException(); // TODO: This 
exception might be intended for other purposes.
+        await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
+    }
+
+    public async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
+    {
+        await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+    }
+
+    private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
+    {
+        var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+        var responseProcessorTask = ResponseProcessor(responseQueue, 
cancellationToken);
+
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            SendOp sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+            if (sendOp.CancellationToken.IsCancellationRequested)
+            {
+                _sendQueue.RemoveCurrentItem();
+                continue;
+            }
+
+            var tcs = new TaskCompletionSource<BaseCommand>();
+            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
+                TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
 
-    public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
+            bool success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
 
-    private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
+
+        await responseProcessorTask.ConfigureAwait(false);
+    }
+
+    private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> 
responseQueue, CancellationToken cancellationToken)
     {
-        var response = await _channel.Send(metadata, data, 
cancellationToken).ConfigureAwait(false);
-        return response.MessageId.ToMessageId();
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            var responseTask = await 
responseQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+            bool success = await _executor.TryExecuteOnce(() =>
+            {
+                if (responseTask.IsFaulted) throw responseTask.Exception!;
+                responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
+                ProcessReceipt(responseTask.Result.SendReceipt);
+            }, CancellationToken.None).ConfigureAwait(false); // Use 
CancellationToken.None here because otherwise it will throw exceptions on all 
fault actions even retry.
+
+            // TODO: Should we crate a new event instead of channel 
disconnected?
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
     }
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    private void ProcessReceipt(CommandSendReceipt sendReceipt)
     {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+        ulong receiptSequenceId = sendReceipt.SequenceId;
+
+        if (!_sendQueue.TryPeek(out SendOp? sendOp) || sendOp is null)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId} but send queue is empty");
+
+        ulong expectedSequenceId = sendOp.Metadata.SequenceId;
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
+        if (receiptSequenceId != expectedSequenceId)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId}. Expected {expectedSequenceId}");
+
+        _sendQueue.Dequeue();
+        sendOp.ReceiptTcs.TrySetResult(sendReceipt.MessageId.ToMessageId());
+    }
+
+    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    {
+        try
+        {
+            await 
_newChannelLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+            if (_dispatcherCts is not null && 
!_dispatcherCts.IsCancellationRequested)
+            {
+                _dispatcherCts.Cancel();
+                _dispatcherCts.Dispose();
+            }
+
+            await _executor.TryExecuteOnce(() => _dispatcherTask ?? 
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
+
+            var oldChannel = _channel;
+            // TODO: Not sure we need to actually close the channel?
+            await 
oldChannel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+            // TODO: Why does IProducerChannel.DisposeAsync not do anything?
             await oldChannel.DisposeAsync().ConfigureAwait(false);
 
-        _channel = channel;
+            _channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+
+            _sendQueue.ResetCursor();
+            _dispatcherCts = new CancellationTokenSource();
+            _dispatcherTask = MessageDispatcher(_channel, 
_dispatcherCts.Token);
+        }
+        catch (Exception)
+        {
+            // Ignored

Review Comment:
   If something outside _executor.Execute fails, the channel with never be 
established and the state of the producer will not change?



##########
src/DotPulsar/Internal/Executor.cs:
##########
@@ -35,114 +35,147 @@ public Executor(Guid correlationId, IRegisterEvent 
eventRegister, IHandleExcepti
     }
 
     public async ValueTask Execute(Action action, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(action, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask Execute(Func<ValueTask> func, CancellationToken 
cancellationToken)
+    {
+        while (!await TryExecuteOnce(func, cancellationToken)) { }
+    }
+
+    public async ValueTask<TResult> Execute<TResult>(Func<TResult> func, 
CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                action();
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);
+            if (success) return result!;
         }
     }
 
-    public async ValueTask Execute(Func<Task> func, CancellationToken 
cancellationToken)
+    public async ValueTask<TResult> Execute<TResult>(Func<Task<TResult>> func, 
CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                await func().ConfigureAwait(false);
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);
+            if (success) return result!;
         }
     }
 
-    public async ValueTask Execute(Func<ValueTask> func, CancellationToken 
cancellationToken)
+    public async ValueTask<TResult> Execute<TResult>(Func<ValueTask<TResult>> 
func, CancellationToken cancellationToken)
     {
         while (true)
         {
-            try
-            {
-                await func().ConfigureAwait(false);
-                return;
-            }
-            catch (Exception ex)
-            {
-                if (await Handle(ex, cancellationToken).ConfigureAwait(false))
-                    throw;
-            }
-
-            cancellationToken.ThrowIfCancellationRequested();
+            var (success, result) = await TryExecuteOnce(func, 
cancellationToken);

Review Comment:
   .ConfigureAwait(false);



##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -80,30 +80,133 @@ public async ValueTask DisposeAsync()
             return;
 
         _eventRegister.Register(new ProducerDisposed(_correlationId));
+        _newChannelLock.Dispose();
+
+        try
+        {
+            _dispatcherCts?.Cancel();
+            _dispatcherCts?.Dispose();
+            await (_dispatcherTask ?? 
Task.CompletedTask).ConfigureAwait(false);
+        }
+        catch
+        {
+            // Ignored
+        }
+        await _sendQueue.DisposeAsync().ConfigureAwait(false);
         await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
         await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    public async ValueTask<MessageId> Send(MessageMetadata metadata, TMessage 
message, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata.Metadata, 
_schema.Encode(message), cancellationToken), 
cancellationToken).ConfigureAwait(false);
+    public async ValueTask Send(SendOp sendOp, CancellationToken 
cancellationToken)
+    {
+        if (IsFinalState()) throw new ProducerClosedException(); // TODO: This 
exception might be intended for other purposes.
+        await _sendQueue.Enqueue(sendOp, 
cancellationToken).ConfigureAwait(false);
+    }
+
+    public async ValueTask WaitForSendQueueEmpty(CancellationToken 
cancellationToken)
+    {
+        await _sendQueue.WaitForEmpty(cancellationToken).ConfigureAwait(false);
+    }
+
+    private async Task MessageDispatcher(IProducerChannel channel, 
CancellationToken cancellationToken)
+    {
+        var responseQueue = new AsyncQueue<Task<BaseCommand>>();
+        var responseProcessorTask = ResponseProcessor(responseQueue, 
cancellationToken);
+
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            SendOp sendOp = await 
_sendQueue.NextItem(cancellationToken).ConfigureAwait(false);
+
+            if (sendOp.CancellationToken.IsCancellationRequested)
+            {
+                _sendQueue.RemoveCurrentItem();
+                continue;
+            }
+
+            var tcs = new TaskCompletionSource<BaseCommand>();
+            _ = tcs.Task.ContinueWith(task => responseQueue.Enqueue(task),
+                TaskContinuationOptions.NotOnCanceled | 
TaskContinuationOptions.ExecuteSynchronously);
 
-    public async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalSend(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            // Use CancellationToken.None here because otherwise it will throw 
exceptions on all fault actions even retry.
+            bool success = await _executor.TryExecuteOnce(() => 
channel.Send(sendOp.Metadata, sendOp.Data, tcs, cancellationToken), 
CancellationToken.None).ConfigureAwait(false);
 
-    private async ValueTask<MessageId> InternalSend(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
+
+        await responseProcessorTask.ConfigureAwait(false);
+    }
+
+    private async ValueTask ResponseProcessor(IDequeue<Task<BaseCommand>> 
responseQueue, CancellationToken cancellationToken)
     {
-        var response = await _channel.Send(metadata, data, 
cancellationToken).ConfigureAwait(false);
-        return response.MessageId.ToMessageId();
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            var responseTask = await 
responseQueue.Dequeue(cancellationToken).ConfigureAwait(false);
+
+            bool success = await _executor.TryExecuteOnce(() =>
+            {
+                if (responseTask.IsFaulted) throw responseTask.Exception!;
+                responseTask.Result.Expect(BaseCommand.Type.SendReceipt);
+                ProcessReceipt(responseTask.Result.SendReceipt);
+            }, CancellationToken.None).ConfigureAwait(false); // Use 
CancellationToken.None here because otherwise it will throw exceptions on all 
fault actions even retry.
+
+            // TODO: Should we crate a new event instead of channel 
disconnected?
+            if (success) continue;
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+            break;
+        }
     }
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    private void ProcessReceipt(CommandSendReceipt sendReceipt)
     {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+        ulong receiptSequenceId = sendReceipt.SequenceId;
+
+        if (!_sendQueue.TryPeek(out SendOp? sendOp) || sendOp is null)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId} but send queue is empty");
+
+        ulong expectedSequenceId = sendOp.Metadata.SequenceId;
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
+        if (receiptSequenceId != expectedSequenceId)
+            throw new ProducerSendReceiptOrderingException($"Received 
sequenceId {receiptSequenceId}. Expected {expectedSequenceId}");
+
+        _sendQueue.Dequeue();
+        sendOp.ReceiptTcs.TrySetResult(sendReceipt.MessageId.ToMessageId());
+    }
+
+    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    {
+        try
+        {
+            await 
_newChannelLock.WaitAsync(cancellationToken).ConfigureAwait(false);
+
+            if (_dispatcherCts is not null && 
!_dispatcherCts.IsCancellationRequested)
+            {
+                _dispatcherCts.Cancel();
+                _dispatcherCts.Dispose();
+            }
+
+            await _executor.TryExecuteOnce(() => _dispatcherTask ?? 
Task.CompletedTask, cancellationToken).ConfigureAwait(false);
+
+            var oldChannel = _channel;
+            // TODO: Not sure we need to actually close the channel?

Review Comment:
   I see many TODO in the files. We need to talk about every one of them. Maybe 
on Teams? :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to