This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7de5b3957cf7d32dcff0edee61d6ed0710febb56 Author: Aaron Ai <[email protected]> AuthorDate: Thu Feb 16 13:51:02 2023 +0800 Implement transaction message --- csharp/examples/ProducerBenchmark.cs | 2 +- csharp/examples/ProducerFifoMessageExample.cs | 81 +++++++------- ...ark.cs => ProducerTransactionMessageExample.cs} | 71 +++++------- csharp/examples/QuickStart.cs | 4 +- csharp/rocketmq-client-csharp/Client.cs | 18 ++- csharp/rocketmq-client-csharp/ClientManager.cs | 6 + csharp/rocketmq-client-csharp/IClientManager.cs | 3 + .../ITransaction.cs} | 15 +-- .../ITransactionChecker.cs} | 19 ++-- csharp/rocketmq-client-csharp/MessageView.cs | 5 + csharp/rocketmq-client-csharp/Producer.cs | 78 +++++++++++-- csharp/rocketmq-client-csharp/SendReceipt.cs | 15 ++- csharp/rocketmq-client-csharp/Transaction.cs | 121 +++++++++++++++++++++ .../TransactionResolution.cs} | 15 +-- csharp/tests/SendResultTest.cs | 6 +- 15 files changed, 322 insertions(+), 137 deletions(-) diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs index 4e334104..3918666d 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerBenchmark.cs @@ -68,7 +68,7 @@ namespace examples Keys = keys }; - const int tpsLimit = 800; + const int tpsLimit = 500; Task.Run(async () => { diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs index d9a72a59..87f953c3 100644 --- a/csharp/examples/ProducerFifoMessageExample.cs +++ b/csharp/examples/ProducerFifoMessageExample.cs @@ -17,55 +17,60 @@ using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; using NLog; using Org.Apache.Rocketmq; namespace examples { - static class ProducerFifoMessageExample + internal static class ProducerFifoMessageExample { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); internal static async Task QuickStart() { - // string accessKey = "yourAccessKey"; - // string secretKey = "yourSecretKey"; - // // Credential provider is optional for client configuration. - // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - // string endpoints = "foobar.com:8080"; - // // In most case, you don't need to create too many producers, single pattern is recommended. - // var producer = new Producer(endpoints) - // { - // CredentialsProvider = credentialsProvider - // }; - // string topic = "yourFifoTopic"; - // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // // the topic route before message publishing. - // producer.AddTopicOfInterest(topic); - // - // await producer.Start(); - // // Define your message body. - // byte[] bytes = Encoding.UTF8.GetBytes("foobar"); - // string tag = "yourMessageTagA"; - // // You could set multiple keys for the single message. - // var keys = new List<string> - // { - // "yourMessageKey-6cc8b65ed1c8", - // "yourMessageKey-43783375d9a5" - // }; - // // Set topic for current message. - // var message = new Message(topic, bytes) - // { - // Tag = tag, - // Keys = keys, - // // Essential for FIFO message, messages that belongs to the same message group follow the FIFO semantics. - // MessageGroup = "yourMessageGroup0" - // }; - // var sendReceipt = await producer.Send(message); - // Logger.Info($"Send FIFO message successfully, sendReceipt={sendReceipt}."); - // // Close the producer if you don't need it anymore. - // await producer.Shutdown(); + const string accessKey = "5jFk0wK7OU6Uq395"; + const string secretKey = "V1u8z19URHs4o6RQ"; + + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); + const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + var clientConfig = new ClientConfig(endpoints) + { + CredentialsProvider = credentialsProvider + }; + // In most case, you don't need to create too many producers, single pattern is recommended. + var producer = new Producer(clientConfig); + + const string topic = "lingchu_fifo_topic"; + producer.SetTopics(topic); + // Set the topic name(s), which is optional but recommended. It makes producer could prefetch + // the topic route before message publishing. + await producer.Start(); + // Define your message body. + var bytes = Encoding.UTF8.GetBytes("foobar"); + const string tag = "yourMessageTagA"; + // You could set multiple keys for the single message. + var keys = new List<string> + { + "yourMessageKey-7044358f98fc", + "yourMessageKey-f72539fbc246" + }; + const string messageGroup = "yourMessageGroup"; + // Set topic for current message. + var message = new Message(topic, bytes) + { + Tag = tag, + Keys = keys, + // Set message group for FIFO message. + MessageGroup = messageGroup + }; + var sendReceipt = await producer.Send(message); + Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); + Thread.Sleep(9999999); + // Close the producer if you don't need it anymore. + await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerTransactionMessageExample.cs similarity index 58% copy from csharp/examples/ProducerBenchmark.cs copy to csharp/examples/ProducerTransactionMessageExample.cs index 4e334104..edc4d41f 100644 --- a/csharp/examples/ProducerBenchmark.cs +++ b/csharp/examples/ProducerTransactionMessageExample.cs @@ -15,31 +15,33 @@ * limitations under the License. */ -using System; using System.Collections.Generic; using System.Text; -using System.Threading; using System.Threading.Tasks; using NLog; using Org.Apache.Rocketmq; namespace examples { - public class ProducerBenchmark + internal static class ProducerTransactionMessageExample { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0); - private static long _counter = 0; - - internal static void QuickStart() + private class TransactionChecker : ITransactionChecker { - const string accessKey = "amKhwEM40L61znSz"; - const string secretKey = "bT6c3gpF3EFB10F3"; + public TransactionResolution Check(MessageView messageView) + { + return TransactionResolution.COMMIT; + } + } + internal static async Task QuickStart() + { + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080"; + const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; var clientConfig = new ClientConfig(endpoints) { CredentialsProvider = credentialsProvider @@ -47,11 +49,12 @@ namespace examples // In most case, you don't need to create too many producers, single pattern is recommended. var producer = new Producer(clientConfig); - const string topic = "lingchu_normal_topic"; + const string topic = "lingchu_transactional_topic"; producer.SetTopics(topic); - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // the topic route before message publishing. - producer.Start().Wait(); + producer.SetTransactionChecker(new TransactionChecker()); + + await producer.Start(); + var transaction = producer.BeginTransaction(); // Define your message body. var bytes = Encoding.UTF8.GetBytes("foobar"); const string tag = "yourMessageTagA"; @@ -67,38 +70,14 @@ namespace examples Tag = tag, Keys = keys }; - - const int tpsLimit = 800; - - Task.Run(async () => - { - while (true) - { - _semaphore.Release(tpsLimit); - await Task.Delay(TimeSpan.FromMilliseconds(1000)); - } - }); - - Task.Run(async () => - { - while (true) - { - Logger.Info($"Send {_counter} messages successfully."); - Interlocked.Exchange(ref _counter, 0); - await Task.Delay(TimeSpan.FromSeconds(1)); - } - }); - - var tasks = new List<Task>(); - while (true) - { - _semaphore.Wait(); - Interlocked.Increment(ref _counter); - var task = producer.Send(message); - tasks.Add(task); - } - - Task.WhenAll(tasks).Wait(); + var sendReceipt = await producer.Send(message, transaction); + Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId); + // Commit the transaction. + transaction.commit(); + // Or rollback the transaction. + // transaction.rollback(); + // Close the producer if you don't need it anymore. + await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs index 474d6063..8323218f 100644 --- a/csharp/examples/QuickStart.cs +++ b/csharp/examples/QuickStart.cs @@ -27,11 +27,11 @@ namespace examples { // Console.WriteLine(MetadataConstants.Instance.ClientVersion); - ProducerNormalMessageExample.QuickStart().Wait(); + // ProducerNormalMessageExample.QuickStart().Wait(); // await ProducerFifoMessageExample.QuickStart(); // await ProducerDelayMessageExample.QuickStart(); // await SimpleConsumerExample.QuickStart(); - // ProducerBenchmark.QuickStart(); + ProducerBenchmark.QuickStart(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index fc6871b9..63c24c91 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -389,10 +389,22 @@ namespace Org.Apache.Rocketmq return ClientConfig; } - public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, + public virtual async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { - // TODO + Logger.Warn($"Ignore orphaned transaction recovery command from remote, which is not expected, " + + $"clientId={ClientId}, endpoints={endpoints}"); + var status = new Proto.Status + { + Code = Proto.Code.InternalError, + Message = "Current client don't support transaction message recovery" + }; + var telemetryCommand = new Proto.TelemetryCommand + { + Status = status + }; + var (_, session) = GetSession(endpoints); + await session.write(telemetryCommand); } public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) @@ -405,7 +417,7 @@ namespace Org.Apache.Rocketmq Code = Proto.Code.Unsupported, Message = "Message consumption verification is not supported" }; - var telemetryCommand = new Proto.TelemetryCommand() + var telemetryCommand = new Proto.TelemetryCommand { Status = status }; diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index 967a2ac1..f464d461 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -146,5 +146,11 @@ namespace Org.Apache.Rocketmq { return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout); } + + public async Task<Proto.EndTransactionResponse> EndTransaction(Endpoints endpoints, + Proto.EndTransactionRequest request, TimeSpan timeout) + { + return await GetRpcClient(endpoints).EndTransaction(_client.Sign(), request, timeout); + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index df2035ab..2082584c 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -81,6 +81,9 @@ namespace Org.Apache.Rocketmq Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest request, TimeSpan timeout); + Task<EndTransactionResponse> EndTransaction(Endpoints endpoints, EndTransactionRequest request, + TimeSpan timeout); + Task Shutdown(); } } \ No newline at end of file diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransaction.cs similarity index 71% copy from csharp/tests/SendResultTest.cs copy to csharp/rocketmq-client-csharp/ITransaction.cs index fae7a7bb..b9898de0 100644 --- a/csharp/tests/SendResultTest.cs +++ b/csharp/rocketmq-client-csharp/ITransaction.cs @@ -15,19 +15,12 @@ * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; - namespace Org.Apache.Rocketmq { - [TestClass] - public class SendResultTest + public interface ITransaction { - [TestMethod] - public void testCtor() - { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId); - Assert.AreEqual(messageId, sendResult.MessageId); - } + void commit(); + + void rollback(); } } \ No newline at end of file diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransactionChecker.cs similarity index 67% copy from csharp/tests/SendResultTest.cs copy to csharp/rocketmq-client-csharp/ITransactionChecker.cs index fae7a7bb..f03350b1 100644 --- a/csharp/tests/SendResultTest.cs +++ b/csharp/rocketmq-client-csharp/ITransactionChecker.cs @@ -15,19 +15,16 @@ * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; - namespace Org.Apache.Rocketmq { - [TestClass] - public class SendResultTest + public interface ITransactionChecker { - [TestMethod] - public void testCtor() - { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId); - Assert.AreEqual(messageId, sendResult.MessageId); - } + /// <summary> + /// Interface that implement this interface will be able to check transactions and + /// return a TransactionResolution object representing the result of the check. + /// </summary> + /// <param name="messageView"></param> + /// <returns></returns> + TransactionResolution Check(MessageView messageView); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index dfb45e45..fd095819 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -80,6 +80,11 @@ namespace Org.Apache.Rocketmq public int DeliveryAttempt { get; } + public static MessageView FromProtobuf(Proto.Message message) + { + return FromProtobuf(message, null); + } + public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue) { var topic = message.Topic.Name; diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 4bbc4bfa..23041e4c 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -29,9 +29,9 @@ namespace Org.Apache.Rocketmq { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache; - private readonly PublishingSettings _publishingSettings; + internal readonly PublishingSettings PublishingSettings; private readonly ConcurrentDictionary<string, bool> _publishingTopics; - + private ITransactionChecker _checker = null; public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3) { @@ -47,7 +47,7 @@ namespace Org.Apache.Rocketmq base(clientConfig) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); - _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, + PublishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, clientConfig.RequestTimeout, publishingTopics); _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>(); _publishingTopics = publishingTopics; @@ -61,6 +61,11 @@ namespace Org.Apache.Rocketmq } } + public void SetTransactionChecker(ITransactionChecker checker) + { + _checker = checker; + } + protected override IEnumerable<string> GetTopics() { return _publishingTopics.Keys; @@ -115,13 +120,13 @@ namespace Org.Apache.Rocketmq private IRetryPolicy GetRetryPolicy() { - return _publishingSettings.GetRetryPolicy(); + return PublishingSettings.GetRetryPolicy(); } public async Task<SendReceipt> Send(Message message) { var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic); - var publishingMessage = new PublishingMessage(message, _publishingSettings, false); + var publishingMessage = new PublishingMessage(message, PublishingSettings, false); var retryPolicy = GetRetryPolicy(); var maxAttempts = retryPolicy.GetMaxAttempts(); @@ -147,6 +152,12 @@ namespace Org.Apache.Rocketmq throw exception!; } + public async Task<SendReceipt> Send(Message message, Transaction transaction) + { + // TODO + return null; + } + private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq) { return new Proto.SendMessageRequest @@ -160,7 +171,7 @@ namespace Org.Apache.Rocketmq { var candidateIndex = (attempt - 1) % candidates.Count; var mq = candidates[candidateIndex]; - if (_publishingSettings.IsValidateMessageType() && + if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType)) { throw new ArgumentException("Current message type does not match with the accept message types," + @@ -174,7 +185,7 @@ namespace Org.Apache.Rocketmq await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); try { - var sendReceipts = SendReceipt.ProcessSendMessageResponse(response); + var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response); var sendReceipt = sendReceipts.First(); if (attempt > 1) @@ -206,7 +217,58 @@ namespace Org.Apache.Rocketmq public override Settings GetSettings() { - return _publishingSettings; + return PublishingSettings; + } + + public override async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, + Proto.RecoverOrphanedTransactionCommand command) + { + var messageId = command.Message.SystemProperties.MessageId; + if (null == _checker) + { + Logger.Error($"No transaction checker registered, ignore it, messageId={messageId}, " + + $"transactionId={command.TransactionId}, endpoints={endpoints}, clientId={ClientId}"); + return; + } + + var message = MessageView.FromProtobuf(command.Message); + var transactionResolution = _checker.Check(message); + switch (transactionResolution) + { + case TransactionResolution.COMMIT: + case TransactionResolution.ROLLBACK: + await EndTransaction(endpoints, message.Topic, message.MessageId, command.TransactionId, + transactionResolution); + break; + case TransactionResolution.UNKNOWN: + default: + break; + } + } + + public Transaction BeginTransaction() + { + return new Transaction(this); + } + + internal async Task EndTransaction(Endpoints endpoints, string topic, string messageId, string transactionId, + TransactionResolution resolution) + { + var topicResource = new Proto.Resource + { + Name = topic + }; + var request = new Proto.EndTransactionRequest + { + TransactionId = transactionId, + MessageId = messageId, + Topic = topicResource, + Resolution = TransactionResolution.COMMIT == resolution + ? Proto.TransactionResolution.Commit + : Proto.TransactionResolution.Rollback + }; + var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout); + StatusChecker.Check(response.Status, request); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index fa5c75c7..1e7c61bd 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -23,19 +23,28 @@ namespace Org.Apache.Rocketmq { public sealed class SendReceipt { - public SendReceipt(string messageId) + public SendReceipt(string messageId, string transactionId, MessageQueue messageQueue) { MessageId = messageId; + TransactionId = transactionId; + MessageQueue = messageQueue; } public string MessageId { get; } + public string TransactionId { get; } + + public MessageQueue MessageQueue { get; } + + public Endpoints Endpoints => MessageQueue.Broker.Endpoints; + public override string ToString() { return $"{nameof(MessageId)}: {MessageId}"; } - public static List<SendReceipt> ProcessSendMessageResponse(Proto.SendMessageResponse response) + public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq, + Proto.SendMessageResponse response) { var status = response.Status; foreach (var entry in response.Entries) @@ -48,7 +57,7 @@ namespace Org.Apache.Rocketmq // May throw exception. StatusChecker.Check(status, response); - return response.Entries.Select(entry => new SendReceipt(entry.MessageId)).ToList(); + return response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs new file mode 100644 index 00000000..e44c0675 --- /dev/null +++ b/csharp/rocketmq-client-csharp/Transaction.cs @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; + +namespace Org.Apache.Rocketmq +{ + public class Transaction : ITransaction + { + private const int MaxMessageNum = 1; + + private readonly Producer _producer; + private readonly HashSet<PublishingMessage> _messages; + private readonly ReaderWriterLockSlim _messagesLock; + private readonly ConcurrentDictionary<PublishingMessage, SendReceipt> _messageSendReceiptDict; + + public Transaction(Producer producer) + { + _producer = producer; + _messages = new HashSet<PublishingMessage>(); + _messagesLock = new ReaderWriterLockSlim(); + _messageSendReceiptDict = new ConcurrentDictionary<PublishingMessage, SendReceipt>(); + } + + public PublishingMessage TryAddMessage(Message message) + { + _messagesLock.EnterReadLock(); + try + { + if (_messages.Count > MaxMessageNum) + { + throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}"); + } + } + finally + { + _messagesLock.ExitReadLock(); + } + + _messagesLock.EnterWriteLock(); + try + { + if (_messages.Count > MaxMessageNum) + { + throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}"); + } + + var publishingMessage = new PublishingMessage(message, _producer.PublishingSettings, true); + _messages.Add(publishingMessage); + return publishingMessage; + } + finally + { + _messagesLock.ExitWriteLock(); + } + } + + public void TryAddReceipt(PublishingMessage publishingMessage, SendReceipt sendReceipt) + { + _messagesLock.EnterReadLock(); + try + { + if (!_messages.Contains(publishingMessage)) + { + throw new ArgumentException("Message is not in the transaction"); + } + + _messageSendReceiptDict[publishingMessage] = sendReceipt; + } + finally + { + _messagesLock.ExitReadLock(); + } + } + + public async void commit() + { + if (_messageSendReceiptDict.IsEmpty) + { + throw new ArgumentException("Transactional message has not been sent yet"); + } + + foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict) + { + await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId, + sendReceipt.TransactionId, TransactionResolution.COMMIT); + } + } + + public async void rollback() + { + if (_messageSendReceiptDict.IsEmpty) + { + throw new ArgumentException("Transaction message has not been sent yet"); + } + + foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict) + { + await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId, + sendReceipt.TransactionId, TransactionResolution.ROLLBACK); + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/TransactionResolution.cs similarity index 71% copy from csharp/tests/SendResultTest.cs copy to csharp/rocketmq-client-csharp/TransactionResolution.cs index fae7a7bb..5bb4d5e1 100644 --- a/csharp/tests/SendResultTest.cs +++ b/csharp/rocketmq-client-csharp/TransactionResolution.cs @@ -15,19 +15,12 @@ * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; - namespace Org.Apache.Rocketmq { - [TestClass] - public class SendResultTest + public enum TransactionResolution { - [TestMethod] - public void testCtor() - { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId); - Assert.AreEqual(messageId, sendResult.MessageId); - } + COMMIT, + ROLLBACK, + UNKNOWN } } \ No newline at end of file diff --git a/csharp/tests/SendResultTest.cs b/csharp/tests/SendResultTest.cs index fae7a7bb..262410da 100644 --- a/csharp/tests/SendResultTest.cs +++ b/csharp/tests/SendResultTest.cs @@ -25,9 +25,9 @@ namespace Org.Apache.Rocketmq [TestMethod] public void testCtor() { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId); - Assert.AreEqual(messageId, sendResult.MessageId); + // string messageId = new string("abc"); + // var sendResult = new SendReceipt(messageId); + // Assert.AreEqual(messageId, sendResult.MessageId); } } } \ No newline at end of file
