This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 15007046eae023bf6320ec1605fbb3f02845d589 Author: Aaron Ai <[email protected]> AuthorDate: Wed Feb 22 18:44:31 2023 +0800 Apply state machine in transactional message --- csharp/examples/ProducerTransactionMessageExample.cs | 2 +- csharp/rocketmq-client-csharp/Client.cs | 2 +- csharp/rocketmq-client-csharp/ITransaction.cs | 4 ++-- csharp/rocketmq-client-csharp/Producer.cs | 13 +++++++++++-- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 15 +++++++++++++++ csharp/rocketmq-client-csharp/Transaction.cs | 14 ++++++++++++-- 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs index edc4d41f..8b331722 100644 --- a/csharp/examples/ProducerTransactionMessageExample.cs +++ b/csharp/examples/ProducerTransactionMessageExample.cs @@ -73,7 +73,7 @@ namespace examples var sendReceipt = await producer.Send(message, transaction); Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId); // Commit the transaction. - transaction.commit(); + transaction.Commit(); // Or rollback the transaction. // transaction.rollback(); // Close the producer if you don't need it anymore. diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index a9cd093a..305383e2 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -58,7 +58,7 @@ namespace Org.Apache.Rocketmq private readonly Dictionary<Endpoints, Session> _sessionsTable; private readonly ReaderWriterLockSlim _sessionLock; - protected volatile State State; + internal volatile State State; protected Client(ClientConfig clientConfig) { diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs index b9898de0..27c770b1 100644 --- a/csharp/rocketmq-client-csharp/ITransaction.cs +++ b/csharp/rocketmq-client-csharp/ITransaction.cs @@ -19,8 +19,8 @@ namespace Org.Apache.Rocketmq { public interface ITransaction { - void commit(); + void Commit(); - void rollback(); + void Rollback(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index d376d14c..838263a2 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -179,12 +179,20 @@ namespace Org.Apache.Rocketmq public async Task<SendReceipt> Send(Message message) { + if (State.Running != State) + { + throw new InvalidOperationException("Producer is not running"); + } var sendReceipt = await Send(message, false); return sendReceipt; } public async Task<SendReceipt> Send(Message message, ITransaction transaction) { + if (State.Running != State) + { + throw new InvalidOperationException("Producer is not running"); + } var tx = (Transaction)transaction; var publishingMessage = tx.TryAddMessage(message); var sendReceipt = await Send(message, true); @@ -223,8 +231,9 @@ namespace Org.Apache.Rocketmq var sendReceipt = sendReceipts.First(); if (attempt > 1) { - Logger.Info($"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + - $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); + Logger.Info( + $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + + $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); } return sendReceipt; diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index bf9614e1..d25dceab 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -157,6 +157,11 @@ namespace Org.Apache.Rocketmq public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration) { + if (State.Running != State) + { + throw new InvalidOperationException("Simple consumer is not running"); + } + if (maxMessageNum <= 0) { throw new InternalErrorException("maxMessageNum must be greater than 0"); @@ -182,6 +187,11 @@ namespace Org.Apache.Rocketmq public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) { + if (State.Running != State) + { + throw new InvalidOperationException("Simple consumer is not running"); + } + var request = WrapChangeInvisibleDuration(messageView, invisibleDuration); var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints, request, ClientConfig.RequestTimeout); @@ -191,6 +201,11 @@ namespace Org.Apache.Rocketmq public async Task Ack(MessageView messageView) { + if (State.Running != State) + { + throw new InvalidOperationException("Simple consumer is not running"); + } + var request = WrapAckMessageRequest(messageView); var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request, ClientConfig.RequestTimeout); diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs index e44c0675..5084ae4e 100644 --- a/csharp/rocketmq-client-csharp/Transaction.cs +++ b/csharp/rocketmq-client-csharp/Transaction.cs @@ -90,8 +90,13 @@ namespace Org.Apache.Rocketmq } } - public async void commit() + public async void Commit() { + if (State.Running != _producer.State) + { + throw new InvalidOperationException("Producer is not running"); + } + if (_messageSendReceiptDict.IsEmpty) { throw new ArgumentException("Transactional message has not been sent yet"); @@ -104,8 +109,13 @@ namespace Org.Apache.Rocketmq } } - public async void rollback() + public async void Rollback() { + if (State.Running != _producer.State) + { + throw new InvalidOperationException("Producer is not running"); + } + if (_messageSendReceiptDict.IsEmpty) { throw new ArgumentException("Transaction message has not been sent yet");
