This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f5a67db Moving SequenceId from the ProducerChannel to the Producer.
f5a67db is described below
commit f5a67dbca2a1ac7b116f6bae6d1d4524d7563e46
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Jul 6 13:46:01 2020 +0200
Moving SequenceId from the ProducerChannel to the Producer.
---
samples/Producing/Program.cs | 6 ++-
.../Internal/Abstractions/IProducerChannel.cs | 2 +-
src/DotPulsar/Internal/ConsumerChannelFactory.cs | 1 -
src/DotPulsar/Internal/NotReadyChannel.cs | 2 +-
src/DotPulsar/Internal/Producer.cs | 23 ++++++++--
src/DotPulsar/Internal/ProducerChannel.cs | 52 ++++++++++------------
src/DotPulsar/Internal/ProducerChannelFactory.cs | 5 +--
src/DotPulsar/Internal/ReaderChannelFactory.cs | 1 -
.../IProducerChannel.cs => RequestId.cs} | 22 ++++-----
src/DotPulsar/Internal/RequestResponseHandler.cs | 4 +-
src/DotPulsar/Internal/SequenceId.cs | 18 ++------
src/DotPulsar/PulsarClient.cs | 2 +-
12 files changed, 70 insertions(+), 68 deletions(-)
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 134d406..8db5b3d 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -61,8 +61,10 @@ namespace Producing
{
while (!cancellationToken.IsCancellationRequested)
{
- var data = Encoding.UTF8.GetBytes("Sent " +
DateTime.UtcNow);
- _ = await producer.Send(data,
cancellationToken).ConfigureAwait(false);
+ var data = DateTime.UtcNow.ToLongTimeString();
+ var bytes = Encoding.UTF8.GetBytes(data);
+ _ = await producer.Send(bytes,
cancellationToken).ConfigureAwait(false);
+ Console.WriteLine("Sent: " + data);
await Task.Delay(delay).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 8375447..9fe2eb1 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,7 @@ namespace DotPulsar.Internal.Abstractions
public interface IProducerChannel : IAsyncDisposable
{
- Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload,
CancellationToken cancellationToken);
+ Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte>
payload, CancellationToken cancellationToken);
Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 94a68b9..f98d9fd 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -66,7 +66,6 @@ namespace DotPulsar.Internal
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister,
messageQueue);
var response = await connection.Send(_subscribe, channel,
cancellationToken).ConfigureAwait(false);
-
return new ConsumerChannel(response.ConsumerId,
_messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index 93f572a..fee9bb4 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -30,7 +30,7 @@ namespace DotPulsar.Internal
public ValueTask<Message> Receive(CancellationToken cancellationToken
= default)
=> throw GetException();
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload,
CancellationToken cancellationToken)
+ public Task<CommandSendReceipt> Send(ulong sequenceId,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
=> throw GetException();
public Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index efe5bc0..da2aacd 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,6 +30,7 @@ namespace DotPulsar.Internal
private IProducerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ProducerState> _state;
+ private readonly SequenceId _sequenceId;
private int _isDisposed;
public string Topic { get; }
@@ -37,6 +38,7 @@ namespace DotPulsar.Internal
public Producer(
Guid correlationId,
string topic,
+ ulong initialSequenceId,
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
@@ -44,6 +46,7 @@ namespace DotPulsar.Internal
{
_correlationId = correlationId;
Topic = topic;
+ _sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
_channel = initialChannel;
_executor = executor;
@@ -90,7 +93,8 @@ namespace DotPulsar.Internal
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(data,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ var sequenceId = _sequenceId.FetchNext();
+ var response = await _executor.Execute(() =>
_channel.Send(sequenceId, data, cancellationToken),
cancellationToken).ConfigureAwait(false);
return new MessageId(response.MessageId);
}
@@ -103,8 +107,21 @@ namespace DotPulsar.Internal
public async ValueTask<MessageId> Send(MessageMetadata metadata,
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() =>
_channel.Send(metadata.Metadata, data, cancellationToken),
cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+
+ var autoAssignSequenceId = metadata.SequenceId == 0;
+ if (autoAssignSequenceId)
+ metadata.SequenceId = _sequenceId.FetchNext();
+
+ try
+ {
+ var response = await _executor.Execute(() =>
_channel.Send(metadata.Metadata, data, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return new MessageId(response.MessageId);
+ }
+ finally
+ {
+ if (autoAssignSequenceId)
+ metadata.SequenceId = 0;
+ }
}
internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs
b/src/DotPulsar/Internal/ProducerChannel.cs
index 0ba8cd1..e7e062b 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -25,20 +25,20 @@ namespace DotPulsar.Internal
public sealed class ProducerChannel : IProducerChannel
{
- private readonly MessageMetadata _cachedMetadata;
+ private readonly ObjectPool<MessageMetadata> _messageMetadataPool;
private readonly ObjectPool<SendPackage> _sendPackagePool;
private readonly ulong _id;
private readonly string _name;
- private readonly SequenceId _sequenceId;
private readonly IConnection _connection;
- public ProducerChannel(ulong id, string name, SequenceId sequenceId,
IConnection connection)
+ public ProducerChannel(ulong id, string name, IConnection connection)
{
- _cachedMetadata = new MessageMetadata { ProducerName = name };
- _sendPackagePool = new DefaultObjectPool<SendPackage>(new
DefaultPooledObjectPolicy<SendPackage>());
+ var messageMetadataPolicy = new
DefaultPooledObjectPolicy<MessageMetadata>();
+ _messageMetadataPool = new
DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+ var sendPackagePolicy = new
DefaultPooledObjectPolicy<SendPackage>();
+ _sendPackagePool = new
DefaultObjectPool<SendPackage>(sendPackagePolicy);
_id = id;
_name = name;
- _sequenceId = sequenceId;
_connection = connection;
}
@@ -55,19 +55,30 @@ namespace DotPulsar.Internal
}
}
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload,
CancellationToken cancellationToken)
- => SendPackage(_cachedMetadata, payload, true, cancellationToken);
+ public Task<CommandSendReceipt> Send(ulong sequenceId,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ {
+ var metadata = _messageMetadataPool.Get();
+ metadata.ProducerName = _name;
+ metadata.SequenceId = sequenceId;
+ try
+ {
+ return SendPackage(metadata, payload, cancellationToken);
+ }
+ finally
+ {
+ _messageMetadataPool.Return(metadata);
+ }
+ }
public Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
metadata.ProducerName = _name;
- return SendPackage(metadata, payload, metadata.SequenceId == 0,
cancellationToken);
+ return SendPackage(metadata, payload, cancellationToken);
}
private async Task<CommandSendReceipt> SendPackage(
MessageMetadata metadata,
ReadOnlySequence<byte> payload,
- bool autoAssignSequenceId,
CancellationToken cancellationToken)
{
var sendPackage = _sendPackagePool.Get();
@@ -81,36 +92,21 @@ namespace DotPulsar.Internal
};
}
+ sendPackage.Command.SequenceId = metadata.SequenceId;
sendPackage.Metadata = metadata;
sendPackage.Payload = payload;
+ metadata.PublishTime = (ulong)
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // TODO Benchmark against
StopWatch
try
{
- metadata.PublishTime = (ulong)
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
-
- if (autoAssignSequenceId)
- {
- var newSequenceId = _sequenceId.FetchNext();
- sendPackage.Command.SequenceId = newSequenceId;
- sendPackage.Metadata.SequenceId = newSequenceId;
- }
- else
- sendPackage.Command.SequenceId =
sendPackage.Metadata.SequenceId;
-
var response = await _connection.Send(sendPackage,
cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
-
return response.SendReceipt;
}
finally
{
- // Reset in case the user reuse the MessageMetadata, but is
not explicitly setting the sequenceId
- if (autoAssignSequenceId)
- sendPackage.Metadata.SequenceId = 0;
-
_sendPackagePool.Return(sendPackage);
}
}
}
}
-
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index e7d8327..87c76de 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,7 +26,6 @@ namespace DotPulsar.Internal
private readonly IRegisterEvent _eventRegister;
private readonly IConnectionPool _connectionPool;
private readonly IExecute _executor;
- private readonly SequenceId _sequenceId;
private readonly CommandProducer _commandProducer;
public ProducerChannelFactory(
@@ -40,7 +39,6 @@ namespace DotPulsar.Internal
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
- _sequenceId = new SequenceId(options.InitialSequenceId);
_commandProducer = new CommandProducer
{
@@ -57,8 +55,7 @@ namespace DotPulsar.Internal
var connection = await
_connectionPool.FindConnectionForTopic(_commandProducer.Topic,
cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new
AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel,
cancellationToken).ConfigureAwait(false);
-
- return new ProducerChannel(response.ProducerId,
response.ProducerName, _sequenceId, connection);
+ return new ProducerChannel(response.ProducerId,
response.ProducerName, connection);
}
}
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs
b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 03140ad..9efe033 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -65,7 +65,6 @@ namespace DotPulsar.Internal
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister,
messageQueue);
var response = await connection.Send(_subscribe, channel,
cancellationToken).ConfigureAwait(false);
-
return new ConsumerChannel(response.ConsumerId,
_messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
b/src/DotPulsar/Internal/RequestId.cs
similarity index 55%
copy from src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
copy to src/DotPulsar/Internal/RequestId.cs
index 8375447..128ac5e 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/RequestId.cs
@@ -12,17 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Internal
{
- using PulsarApi;
- using System;
- using System.Buffers;
- using System.Threading;
- using System.Threading.Tasks;
-
- public interface IProducerChannel : IAsyncDisposable
+ public sealed class RequestId
{
- Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload,
CancellationToken cancellationToken);
- Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ private ulong _current;
+
+ public RequestId()
+ => _current = 0;
+
+ public bool IsPastInitialId()
+ => _current != 0;
+
+ public ulong FetchNext()
+ => _current++;
}
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c855337..1885aa4 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@ namespace DotPulsar.Internal
private const string ConnectResponseIdentifier = "Connected";
private readonly Awaiter<string, BaseCommand> _responses;
- private SequenceId _requestId;
+ private RequestId _requestId;
public RequestResponseHandler()
{
_responses = new Awaiter<string, BaseCommand>();
- _requestId = new SequenceId(1);
+ _requestId = new RequestId();
}
public void Dispose()
diff --git a/src/DotPulsar/Internal/SequenceId.cs
b/src/DotPulsar/Internal/SequenceId.cs
index 55ba94a..aba1c25 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,32 +12,22 @@
* limitations under the License.
*/
-using System.Threading;
-
namespace DotPulsar.Internal
{
+ using System.Threading;
+
public sealed class SequenceId
{
private long _current;
- private ulong _initial;
public SequenceId(ulong initialSequenceId)
{
// Subtracting one because Interlocked.Increment will return the
post-incremented value
// which is expected to be the initialSequenceId for the first call
- _current = unchecked((long)initialSequenceId - 1);
- _initial = initialSequenceId - 1;
- }
-
- // Returns false if FetchNext has not been called on this object
before (or if it somehow wrapped around 2^64)
- public bool IsPastInitialId()
- {
- return unchecked((ulong)_current != _initial);
+ _current = unchecked((long) initialSequenceId - 1);
}
public ulong FetchNext()
- {
- return unchecked((ulong)Interlocked.Increment(ref _current));
- }
+ => unchecked((ulong) Interlocked.Increment(ref _current));
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 260f3ce..982ee3b 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@ namespace DotPulsar
var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
var factory = new ProducerChannelFactory(correlationId,
_processManager, _connectionPool, executor, options);
var stateManager = new
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted);
- var producer = new Producer(correlationId, options.Topic,
_processManager, new NotReadyChannel(), executor, stateManager);
+ var producer = new Producer(correlationId, options.Topic,
options.InitialSequenceId, _processManager, new NotReadyChannel(), executor,
stateManager);
var process = new ProducerProcess(correlationId, stateManager,
factory, producer);
_processManager.Add(process);
process.Start();