This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit dd411737e1503b090db9d7b64e271a10ae2e690c Author: Aaron Ai <[email protected]> AuthorDate: Tue Feb 28 10:10:35 2023 +0800 Apply builder pattern --- csharp/examples/ProducerBenchmark.cs | 80 +++++++++--------- csharp/examples/ProducerDelayMessageExample.cs | 36 ++++---- csharp/examples/ProducerFifoMessageExample.cs | 37 ++++---- csharp/examples/ProducerNormalMessageExample.cs | 37 ++++---- .../examples/ProducerTransactionMessageExample.cs | 32 +++---- csharp/examples/QuickStart.cs | 2 +- csharp/examples/SimpleConsumerExample.cs | 18 ++-- csharp/rocketmq-client-csharp/ClientConfig.cs | 40 +++++++-- csharp/rocketmq-client-csharp/Message.cs | 93 ++++++++++++++------ .../{ClientConfig.cs => Preconditions.cs} | 17 ++-- csharp/rocketmq-client-csharp/Producer.cs | 83 ++++++++++++------ csharp/rocketmq-client-csharp/SimpleConsumer.cs | 51 ++++++++++- csharp/tests/MessageTest.cs | 98 ++-------------------- 13 files changed, 348 insertions(+), 276 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs index a114f879..dc5d372f 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerBenchmark.cs @@ -30,51 +30,16 @@ namespace examples private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private static readonly SemaphoreSlim Semaphore = new(0); + private const int TpsLimit = 1; private static long _counter = 0; - internal static void QuickStart() + private static void DoStats() { - const string accessKey = "amKhwEM40L61znSz"; - const string secretKey = "bT6c3gpF3EFB10F3"; - - // Credential provider is optional for client configuration. - var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; - // In most case, you don't need to create too many producers, single pattern is recommended. - var producer = new Producer(clientConfig); - - const string topic = "lingchu_normal_topic"; - producer.SetTopics(topic); - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // the topic route before message publishing. - producer.Start().Wait(); - // Define your message body. - var bytes = Encoding.UTF8.GetBytes("foobar"); - const string tag = "yourMessageTagA"; - // You could set multiple keys for the single message. - var keys = new List<string> - { - "yourMessageKey-7044358f98fc", - "yourMessageKey-f72539fbc246" - }; - // Set topic for current message. - var message = new Message(topic, bytes) - { - Tag = tag, - Keys = keys - }; - - const int tpsLimit = 1; - Task.Run(async () => { while (true) { - Semaphore.Release(tpsLimit); + Semaphore.Release(TpsLimit); await Task.Delay(TimeSpan.FromSeconds(1)); } }); @@ -87,7 +52,46 @@ namespace examples await Task.Delay(TimeSpan.FromSeconds(1)); } }); + } + + internal static async Task QuickStart() + { + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; + + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); + const string endpoints = "foobar.com:8080"; + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); + const string topic = "yourNormalTopic"; + // In most case, you don't need to create too many producers, single pattern is recommended. + await using var producer = await new Producer.Builder() + // Set the topic name(s), which is optional but recommended. + // It makes producer could prefetch the topic route before message publishing. + .SetTopics(topic) + .SetClientConfig(clientConfig) + .Build(); + + // Define your message body. + var bytes = Encoding.UTF8.GetBytes("foobar"); + const string tag = "yourMessageTagA"; + // You could set multiple keys for the single message. + var keys = new List<string> + { + "yourMessageKey-7044358f98fc", + "yourMessageKey-f72539fbc246" + }; + var message = new Message.Builder() + .SetTopic(topic) + .SetBody(bytes) + .SetTag(tag) + .SetKeys(keys) + .Build(); + DoStats(); var tasks = new List<Task>(); while (true) { diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs index edb356a5..31a40be7 100644 --- a/csharp/examples/ProducerDelayMessageExample.cs +++ b/csharp/examples/ProducerDelayMessageExample.cs @@ -35,18 +35,18 @@ namespace examples // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "foobar.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; - // In most case, you don't need to create too many producers, single pattern is recommended. - var producer = new Producer(clientConfig); + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); const string topic = "yourDelayTopic"; - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // the topic route before message publishing. - producer.SetTopics(topic); - - await producer.Start(); + // In most case, you don't need to create too many producers, single pattern is recommended. + await using var producer = await new Producer.Builder() + // Set the topic name(s), which is optional but recommended. + // It makes producer could prefetch the topic route before message publishing. + .SetTopics(topic) + .SetClientConfig(clientConfig) + .Build(); // Define your message body. var bytes = Encoding.UTF8.GetBytes("foobar"); const string tag = "yourMessageTagA"; @@ -56,14 +56,12 @@ namespace examples "yourMessageKey-2f00df144e48", "yourMessageKey-49df1dd332b7" }; - // Set topic for current message. - var message = new Message(topic, bytes) - { - Tag = tag, - Keys = keys, - // Essential for DELAY message. - DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30) - }; + var message = new Message.Builder() + .SetTopic(topic) + .SetBody(bytes) + .SetTag(tag) + .SetKeys(keys) + .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(30)).Build(); var sendReceipt = await producer.Send(message); Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); // Close the producer if you don't need it anymore. diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs index bfca32f5..9a1bdf76 100644 --- a/csharp/examples/ProducerFifoMessageExample.cs +++ b/csharp/examples/ProducerFifoMessageExample.cs @@ -36,18 +36,18 @@ namespace examples // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "foobar.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; - // In most case, you don't need to create too many producers, single pattern is recommended. - var producer = new Producer(clientConfig); - + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); const string topic = "yourFifoTopic"; - producer.SetTopics(topic); - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // the topic route before message publishing. - await producer.Start(); + // In most case, you don't need to create too many producers, single pattern is recommended. + await using var producer = await new Producer.Builder() + // Set the topic name(s), which is optional but recommended. + // It makes producer could prefetch the topic route before message publishing. + .SetTopics(topic) + .SetClientConfig(clientConfig) + .Build(); // Define your message body. var bytes = Encoding.UTF8.GetBytes("foobar"); const string tag = "yourMessageTagA"; @@ -58,14 +58,13 @@ namespace examples "yourMessageKey-f72539fbc246" }; const string messageGroup = "yourMessageGroup"; - // Set topic for current message. - var message = new Message(topic, bytes) - { - Tag = tag, - Keys = keys, - // Set message group for FIFO message. - MessageGroup = messageGroup - }; + var message = new Message.Builder() + .SetTopic(topic) + .SetBody(bytes) + .SetTag(tag) + .SetKeys(keys) + .SetMessageGroup(messageGroup) + .Build(); var sendReceipt = await producer.Send(message); Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); Thread.Sleep(9999999); diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs index 09e4dff1..258886e2 100644 --- a/csharp/examples/ProducerNormalMessageExample.cs +++ b/csharp/examples/ProducerNormalMessageExample.cs @@ -17,7 +17,6 @@ using System.Collections.Generic; using System.Text; -using System.Threading; using System.Threading.Tasks; using NLog; using Org.Apache.Rocketmq; @@ -36,18 +35,18 @@ namespace examples // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "foobar.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; - // In most case, you don't need to create too many producers, single pattern is recommended. - var producer = new Producer(clientConfig); - + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); const string topic = "yourNormalTopic"; - producer.SetTopics(topic); - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // the topic route before message publishing. - await producer.Start(); + // In most case, you don't need to create too many producers, single pattern is recommended. + await using var producer = await new Producer.Builder() + // Set the topic name(s), which is optional but recommended. + // It makes producer could prefetch the topic route before message publishing. + .SetTopics(topic) + .SetClientConfig(clientConfig) + .Build(); // Define your message body. var bytes = Encoding.UTF8.GetBytes("foobar"); const string tag = "yourMessageTagA"; @@ -58,16 +57,14 @@ namespace examples "yourMessageKey-f72539fbc246" }; // Set topic for current message. - var message = new Message(topic, bytes) - { - Tag = tag, - Keys = keys - }; + var message = new Message.Builder() + .SetTopic(topic) + .SetBody(bytes) + .SetTag(tag) + .SetKeys(keys) + .Build(); var sendReceipt = await producer.Send(message); Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); - Thread.Sleep(9999999); - // Close the producer if you don't need it anymore. - await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs index 10b61142..06cb3995 100644 --- a/csharp/examples/ProducerTransactionMessageExample.cs +++ b/csharp/examples/ProducerTransactionMessageExample.cs @@ -42,16 +42,20 @@ namespace examples // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "foobar.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; - // In most case, you don't need to create too many producers, single pattern is recommended. - var producer = new Producer(clientConfig); + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); const string topic = "yourTransactionTopic"; - producer.SetTopics(topic); - producer.SetTransactionChecker(new TransactionChecker()); + // In most case, you don't need to create too many producers, single pattern is recommended. + await using var producer = await new Producer.Builder() + // Set the topic name(s), which is optional but recommended. + // It makes producer could prefetch the topic route before message publishing. + .SetTopics(topic) + .SetClientConfig(clientConfig) + .SetTransactionChecker(new TransactionChecker()) + .Build(); await producer.Start(); var transaction = producer.BeginTransaction(); @@ -64,12 +68,12 @@ namespace examples "yourMessageKey-7044358f98fc", "yourMessageKey-f72539fbc246" }; - // Set topic for current message. - var message = new Message(topic, bytes) - { - Tag = tag, - Keys = keys - }; + var message = new Message.Builder() + .SetTopic(topic) + .SetBody(bytes) + .SetTag(tag) + .SetKeys(keys) + .Build(); var sendReceipt = await producer.Send(message, transaction); Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId); // Commit the transaction. diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs index 8323218f..c1915035 100644 --- a/csharp/examples/QuickStart.cs +++ b/csharp/examples/QuickStart.cs @@ -31,7 +31,7 @@ namespace examples // await ProducerFifoMessageExample.QuickStart(); // await ProducerDelayMessageExample.QuickStart(); // await SimpleConsumerExample.QuickStart(); - ProducerBenchmark.QuickStart(); + ProducerBenchmark.QuickStart().Wait(); } } } \ No newline at end of file diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs index fa78e845..c153c544 100644 --- a/csharp/examples/SimpleConsumerExample.cs +++ b/csharp/examples/SimpleConsumerExample.cs @@ -35,18 +35,21 @@ namespace examples // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "foobar.com:8080"; - var clientConfig = new ClientConfig(endpoints) - { - CredentialsProvider = credentialsProvider - }; + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoints) + .SetCredentialsProvider(credentialsProvider) + .Build(); // Add your subscriptions. const string consumerGroup = "yourConsumerGroup"; const string topic = "yourTopic"; var subscription = new Dictionary<string, FilterExpression> { { topic, new FilterExpression("*") } }; // In most case, you don't need to create too many consumers, single pattern is recommended. - var simpleConsumer = - new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription); + await using var simpleConsumer = new SimpleConsumer.Builder() + .SetClientConfig(clientConfig).SetConsumerGroup(consumerGroup) + .SetAwaitDuration(TimeSpan.FromSeconds(15)) + .SetSubscriptionExpression(subscription) + .Build(); await simpleConsumer.Start(); var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15)); @@ -58,9 +61,6 @@ namespace examples // await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15)); // Logger.Info($"Changing message invisible duration successfully, message=id={message.MessageId}"); } - - // Close the consumer if you don't need it anymore. - await simpleConsumer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs index 82ee8403..f6fa4de0 100644 --- a/csharp/rocketmq-client-csharp/ClientConfig.cs +++ b/csharp/rocketmq-client-csharp/ClientConfig.cs @@ -16,22 +16,52 @@ */ using System; -using System.Threading; namespace Org.Apache.Rocketmq { public class ClientConfig : IClientConfig { - public ClientConfig(string endpoints) + private ClientConfig(ICredentialsProvider credentialsProvider, TimeSpan requestTimeout, string endpoints) { - RequestTimeout = TimeSpan.FromSeconds(3); + CredentialsProvider = credentialsProvider; + RequestTimeout = requestTimeout; Endpoints = endpoints; } - public ICredentialsProvider CredentialsProvider { get; set; } + public ICredentialsProvider CredentialsProvider { get; } - public TimeSpan RequestTimeout { get; set; } + public TimeSpan RequestTimeout { get; } public string Endpoints { get; } + + public class Builder + { + private ICredentialsProvider _credentialsProvider; + private TimeSpan _requestTimeout = TimeSpan.FromSeconds(3); + private string _endpoints; + + public Builder SetCredentialsProvider(ICredentialsProvider credentialsProvider) + { + _credentialsProvider = credentialsProvider; + return this; + } + + public Builder SetRequestTimeout(TimeSpan requestTimeout) + { + _requestTimeout = requestTimeout; + return this; + } + + public Builder SetEndpoints(string endpoints) + { + _endpoints = endpoints; + return this; + } + + public ClientConfig Build() + { + return new ClientConfig(_credentialsProvider, _requestTimeout, _endpoints); + } + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Message.cs b/csharp/rocketmq-client-csharp/Message.cs index 19e868fc..fe2dee79 100644 --- a/csharp/rocketmq-client-csharp/Message.cs +++ b/csharp/rocketmq-client-csharp/Message.cs @@ -22,26 +22,16 @@ namespace Org.Apache.Rocketmq { public class Message { - public Message() : this(null, null) - { - } - - public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) - { - } - - public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) - { - } - - public Message(string topic, string tag, List<string> keys, byte[] body) + private Message(string topic, byte[] body, string tag, List<string> keys, + Dictionary<string, string> userProperties, DateTime? deliveryTimestamp, string messageGroup) { Topic = topic; Tag = tag; Keys = keys; Body = body; - UserProperties = new Dictionary<string, string>(); - DeliveryTimestamp = null; + UserProperties = userProperties; + DeliveryTimestamp = deliveryTimestamp; + MessageGroup = messageGroup; } internal Message(Message message) @@ -50,25 +40,80 @@ namespace Org.Apache.Rocketmq Tag = message.Tag; Keys = message.Keys; Body = message.Body; - MessageGroup = message.MessageGroup; UserProperties = message.UserProperties; + MessageGroup = message.MessageGroup; DeliveryTimestamp = message.DeliveryTimestamp; } - public string Topic { get; set; } + public string Topic { get; } + + public byte[] Body { get; } + + public string Tag { get; } + + public List<string> Keys { get; } + public Dictionary<string, string> UserProperties { get; } + + public DateTime? DeliveryTimestamp { get; } - public byte[] Body { get; set; } + public string MessageGroup { get; } - public string Tag { get; set; } + public class Builder + { + private string _topic; + private byte[] _body; + private string _tag; + private List<string> _keys = new(); + private Dictionary<string, string> _userProperties = new(); + private DateTime? _deliveryTimestamp; + private string _messageGroup; + + public Builder SetTopic(string topic) + { + _topic = topic; + return this; + } + + public Builder SetBody(byte[] body) + { + _body = body; + return this; + } - public List<string> Keys { get; set; } - public Dictionary<string, string> UserProperties { get; set; } + public Builder SetTag(string tag) + { + _tag = tag; + return this; + } + public Builder SetKeys(List<string> keys) + { + _keys = keys; + return this; + } - public DateTime? DeliveryTimestamp { get; set; } + public Builder SetUserProperties(Dictionary<string, string> userProperties) + { + _userProperties = userProperties; + return this; + } - public int DeliveryAttempt { get; internal set; } + public Builder SetDeliveryTimestamp(DateTime deliveryTimestamp) + { + _deliveryTimestamp = deliveryTimestamp; + return this; + } - public string MessageGroup { get; set; } + public Builder SetMessageGroup(string messageGroup) + { + _messageGroup = messageGroup; + return this; + } + + public Message Build() + { + return new Message(_topic, _body, _tag, _keys, _userProperties, _deliveryTimestamp, _messageGroup); + } + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/Preconditions.cs similarity index 70% copy from csharp/rocketmq-client-csharp/ClientConfig.cs copy to csharp/rocketmq-client-csharp/Preconditions.cs index 82ee8403..235f81b6 100644 --- a/csharp/rocketmq-client-csharp/ClientConfig.cs +++ b/csharp/rocketmq-client-csharp/Preconditions.cs @@ -16,22 +16,17 @@ */ using System; -using System.Threading; namespace Org.Apache.Rocketmq { - public class ClientConfig : IClientConfig + public static class Preconditions { - public ClientConfig(string endpoints) + public static void CheckArgument(bool condition, string message) { - RequestTimeout = TimeSpan.FromSeconds(3); - Endpoints = endpoints; + if (!condition) + { + throw new ArgumentException(message); + } } - - public ICredentialsProvider CredentialsProvider { get; set; } - - public TimeSpan RequestTimeout { get; set; } - - public string Endpoints { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 81d2b8d7..c1e411a8 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -27,28 +27,18 @@ using NLog; namespace Org.Apache.Rocketmq { - public class Producer : Client + public class Producer : Client, IAsyncDisposable, IDisposable { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache; internal readonly PublishingSettings PublishingSettings; private readonly ConcurrentDictionary<string, bool> _publishingTopics; - private ITransactionChecker _checker = null; + private readonly ITransactionChecker _checker; private readonly Histogram<double> _sendCostTimeHistogram; - public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3) - { - } - - public Producer(ClientConfig clientConfig, int maxAttempts) : this(clientConfig, - new ConcurrentDictionary<string, bool>(), maxAttempts) - { - } - private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, - int maxAttempts) : - base(clientConfig) + int maxAttempts, ITransactionChecker checker) : base(clientConfig) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy, @@ -57,18 +47,6 @@ namespace Org.Apache.Rocketmq _publishingTopics = publishingTopics; _sendCostTimeHistogram = ClientMeterManager.Meter.CreateHistogram<double>(MetricConstant.SendCostTimeMetricName, "milliseconds"); - } - - public void SetTopics(params string[] topics) - { - foreach (var topic in topics) - { - _publishingTopics.TryAdd(topic, true); - } - } - - public void SetTransactionChecker(ITransactionChecker checker) - { _checker = checker; } @@ -94,6 +72,18 @@ namespace Org.Apache.Rocketmq } } + public async ValueTask DisposeAsync() + { + await Shutdown().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + public void Dispose() + { + Shutdown().Wait(); + GC.SuppressFinalize(this); + } + public override async Task Shutdown() { try @@ -330,5 +320,48 @@ namespace Org.Apache.Rocketmq var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout); StatusChecker.Check(response.Status, request); } + + public class Builder + { + private ClientConfig _clientConfig; + private readonly ConcurrentDictionary<string, bool> _publishingTopics = new(); + private int _maxAttempts = 3; + private ITransactionChecker _checker; + + public Builder SetClientConfig(ClientConfig clientConfig) + { + _clientConfig = clientConfig; + return this; + } + + public Builder SetTopics(params string[] topics) + { + foreach (var topic in topics) + { + _publishingTopics[topic] = true; + } + + return this; + } + + public Builder SetMaxAttempts(int maxAttempts) + { + _maxAttempts = maxAttempts; + return this; + } + + public Builder SetTransactionChecker(ITransactionChecker checker) + { + _checker = checker; + return this; + } + + public async Task<Producer> Build() + { + var producer = new Producer(_clientConfig, _publishingTopics, _maxAttempts, _checker); + await producer.Start(); + return producer; + } + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index dc5b61c7..97fe407c 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -27,7 +27,7 @@ using Org.Apache.Rocketmq.Error; namespace Org.Apache.Rocketmq { - public class SimpleConsumer : Consumer + public class SimpleConsumer : Consumer, IAsyncDisposable, IDisposable { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache; @@ -91,6 +91,18 @@ namespace Org.Apache.Rocketmq } } + public async ValueTask DisposeAsync() + { + await Shutdown().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + public void Dispose() + { + Shutdown().Wait(); + GC.SuppressFinalize(this); + } + public override async Task Shutdown() { try @@ -256,5 +268,42 @@ namespace Org.Apache.Rocketmq Name = ConsumerGroup }; } + + public class Builder + { + private ClientConfig _clientConfig; + private string _consumerGroup; + private TimeSpan _awaitDuration; + private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions; + + public Builder SetClientConfig(ClientConfig clientConfig) + { + _clientConfig = clientConfig; + return this; + } + + public Builder SetConsumerGroup(string consumerGroup) + { + _consumerGroup = consumerGroup; + return this; + } + + public Builder SetAwaitDuration(TimeSpan awaitDuration) + { + _awaitDuration = awaitDuration; + return this; + } + + public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions) + { + _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions); + return this; + } + + public SimpleConsumer Build() + { + return new SimpleConsumer(_clientConfig, _consumerGroup, _awaitDuration, _subscriptionExpressions); + } + } } } \ No newline at end of file diff --git a/csharp/tests/MessageTest.cs b/csharp/tests/MessageTest.cs index 9bb0f66c..6ba00b8e 100644 --- a/csharp/tests/MessageTest.cs +++ b/csharp/tests/MessageTest.cs @@ -14,106 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; + using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; using System.Text; -using System.Collections.Generic; namespace Org.Apache.Rocketmq { [TestClass] public class MessageTest { - - [TestMethod] - public void testCtor() - { - var msg1 = new Message(); - Assert.IsNull(msg1.Topic); - Assert.IsNull(msg1.Body); - Assert.IsNull(msg1.Tag); - Assert.AreEqual(msg1.Keys.Count, 0); - Assert.AreEqual(msg1.UserProperties.Count, 0); - } - - [TestMethod] - public void testCtor2() - { - string topic = "T1"; - string bodyString = "body"; - byte[] body = Encoding.ASCII.GetBytes(bodyString); - var msg1 = new Message(topic, body); - Assert.AreEqual(msg1.Topic, topic); - Assert.AreEqual(msg1.Body, body); - Assert.IsNull(msg1.Tag); - Assert.AreEqual(msg1.Keys.Count, 0); - Assert.AreEqual(msg1.UserProperties.Count, 0); - } - - [TestMethod] - public void testCtor3() - { - string topic = "T1"; - string bodyString = "body"; - byte[] body = Encoding.ASCII.GetBytes(bodyString); - string tag = "TagA"; - var msg1 = new Message(topic, tag, body); - Assert.AreEqual(msg1.Topic, topic); - Assert.AreEqual(msg1.Body, body); - Assert.AreEqual(msg1.Tag, tag); - Assert.AreEqual(msg1.Keys.Count, 0); - Assert.AreEqual(msg1.UserProperties.Count, 0); - } - - [TestMethod] - public void testCtor4() - { - string topic = "T1"; - string bodyString = "body"; - byte[] body = Encoding.ASCII.GetBytes(bodyString); - string tag = "TagA"; - List<string> keys = new List<string>(); - keys.Add("Key1"); - keys.Add("Key2"); - - var msg1 = new Message(topic, tag, keys, body); - Assert.AreEqual(msg1.Topic, topic); - Assert.AreEqual(msg1.Body, body); - Assert.AreEqual(msg1.Tag, tag); - Assert.AreEqual(msg1.Keys, keys); - Assert.AreEqual(msg1.UserProperties.Count, 0); - } - [TestMethod] - public void testCtor5() + [ExpectedException(typeof(ArgumentException))] + public void TestIllegalTopic0() { - string topic = "T1"; - string bodyString = "body"; - byte[] body = Encoding.ASCII.GetBytes(bodyString); - string tag = "TagA"; - List<string> keys = new List<string>(); - keys.Add("Key1"); - keys.Add("Key2"); - - var msg1 = new Message(topic, tag, keys, body); - - msg1.UserProperties.Add("k", "v"); - msg1.UserProperties.Add("k2", "v2"); - - Assert.AreEqual(msg1.Topic, topic); - Assert.AreEqual(msg1.Body, body); - Assert.AreEqual(msg1.Tag, tag); - Assert.AreEqual(msg1.Keys, keys); - Assert.AreEqual(msg1.UserProperties.Count, 2); - - string value; - msg1.UserProperties.TryGetValue("k", out value); - Assert.AreEqual(value, "v"); - - msg1.UserProperties.TryGetValue("k2", out value); - Assert.AreEqual(value, "v2"); - + // const string topic = "\t\n"; + // const string bodyString = "body"; + // var body = Encoding.ASCII.GetBytes(bodyString); + // var _ = new Message(topic, body); } - } } \ No newline at end of file
