This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a512cc8 Feature/producer properties (#222)
a512cc8 is described below
commit a512cc8482377972b4932d84bb281d1398e006e1
Author: Kristian Andersen <[email protected]>
AuthorDate: Fri Jun 7 10:33:45 2024 +0200
Feature/producer properties (#222)
* Add option to setup producer properties
* Add changelog entry
---
CHANGELOG.md | 6 ++++++
src/DotPulsar/Abstractions/IProducerBuilder.cs | 5 +++++
src/DotPulsar/Internal/Producer.cs | 3 ++-
src/DotPulsar/Internal/ProducerBuilder.cs | 11 ++++++++++-
src/DotPulsar/Internal/ProducerChannelFactory.cs | 6 +++++-
src/DotPulsar/ProducerOptions.cs | 6 ++++++
6 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aae57f0..0186af3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [3.3.0] - ?
+
+### Added
+
+- Producer properties can be added when creating a producer
+
## [3.2.1] - 2024-04-24
### Fixed
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs
b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 74c08a7..01c99aa 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -65,6 +65,11 @@ public interface IProducerBuilder<TMessage>
/// </summary>
IProducerBuilder<TMessage> MaxPendingMessages(uint maxPendingMessages);
+ /// <summary>
+ /// Add/Set a property key/value on the producer. This is optional.
+ /// </summary>
+ IProducerBuilder<TMessage> ProducerProperty(string key, string value);
+
/// <summary>
/// Create the producer.
/// </summary>
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index c5fd667..45db4b5 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -159,7 +159,8 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
var producerName = _options.ProducerName;
var schema = _options.Schema;
var producerAccessMode = (PulsarApi.ProducerAccessMode)
_options.ProducerAccessMode;
- var factory = new ProducerChannelFactory(correlationId,
_processManager, _connectionPool, topic, producerName, producerAccessMode,
schema.SchemaInfo, _compressorFactory);
+ var producerProperties = _options.ProducerProperties;
+ var factory = new ProducerChannelFactory(correlationId,
_processManager, _connectionPool, topic, producerName, producerAccessMode,
schema.SchemaInfo, _compressorFactory, producerProperties);
var stateManager = CreateStateManager();
var initialChannel = new NotReadyChannel<TMessage>();
var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs
b/src/DotPulsar/Internal/ProducerBuilder.cs
index dbdac4e..2a5f025 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -21,6 +21,7 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
{
private readonly IPulsarClient _pulsarClient;
private readonly ISchema<TMessage> _schema;
+ private readonly Dictionary<string, string> _producerProperties;
private string? _producerName;
private ProducerAccessMode _producerAccessMode;
private bool _attachTraceInfoToMessages;
@@ -40,6 +41,7 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
_initialSequenceId =
ProducerOptions<TMessage>.DefaultInitialSequenceId;
_maxPendingMessages = 500;
_producerAccessMode =
ProducerOptions<TMessage>.DefaultProducerAccessMode;
+ _producerProperties = [];
}
public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool
attachTraceInfoToMessages)
@@ -96,6 +98,12 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
return this;
}
+ public IProducerBuilder<TMessage> ProducerProperty(string key, string
value)
+ {
+ _producerProperties[key] = value;
+ return this;
+ }
+
public IProducer<TMessage> Create()
{
if (string.IsNullOrEmpty(_topic))
@@ -112,7 +120,8 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
InitialSequenceId = _initialSequenceId,
ProducerName = _producerName,
StateChangedHandler = _stateChangedHandler,
- MaxPendingMessages = _maxPendingMessages
+ MaxPendingMessages = _maxPendingMessages,
+ ProducerProperties = _producerProperties
};
if (_messageRouter is not null)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 7dab27c..ce75cf4 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -36,7 +36,8 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
string? producerName,
ProducerAccessMode producerAccessMode,
SchemaInfo schemaInfo,
- ICompressorFactory? compressorFactory)
+ ICompressorFactory? compressorFactory,
+ Dictionary<string,string>? properties)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
@@ -49,6 +50,9 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
Topic = topic
};
+ if (properties is not null)
+ _commandProducer.Metadatas.AddRange(properties.Select(x => new
KeyValue { Key = x.Key, Value = x.Value }));
+
_compressorFactory = compressorFactory;
_schema = schemaInfo.PulsarSchema;
}
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 127162f..aa6c4d4 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -48,6 +48,7 @@ public sealed class ProducerOptions<TMessage>
Topic = topic;
Schema = schema;
MessageRouter = new RoundRobinPartitionRouter();
+ ProducerProperties = [];
}
/// <summary>
@@ -99,4 +100,9 @@ public sealed class ProducerOptions<TMessage>
/// Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
/// </summary>
public uint MaxPendingMessages { get; set; }
+
+ /// <summary>
+ /// Add/Set the producers's properties. This is optional.
+ /// </summary>
+ public Dictionary<string, string> ProducerProperties { get; set; }
}