This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit c9e3d21fe927c87358c15226534e2a6102723c8f Author: Aaron Ai <[email protected]> AuthorDate: Thu Feb 16 16:42:50 2023 +0800 Implement Producer#send with transaction --- csharp/rocketmq-client-csharp/Producer.cs | 43 ++++++++++++++-------- csharp/rocketmq-client-csharp/PublishingMessage.cs | 2 +- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 23041e4c..f9825d32 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -123,12 +123,17 @@ namespace Org.Apache.Rocketmq return PublishingSettings.GetRetryPolicy(); } - public async Task<SendReceipt> Send(Message message) + private async Task<SendReceipt> Send(Message message, bool txEnabled) { var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic); - var publishingMessage = new PublishingMessage(message, PublishingSettings, false); + var publishingMessage = new PublishingMessage(message, PublishingSettings, txEnabled); var retryPolicy = GetRetryPolicy(); var maxAttempts = retryPolicy.GetMaxAttempts(); + if (MessageType.Transaction == publishingMessage.MessageType) + { + // No more retries for transactional message. + maxAttempts = 1; + } // Prepare the candidate message queue(s) for retry-sending in advance. var candidates = null == publishingMessage.MessageGroup @@ -152,10 +157,19 @@ namespace Org.Apache.Rocketmq throw exception!; } - public async Task<SendReceipt> Send(Message message, Transaction transaction) + public async Task<SendReceipt> Send(Message message) + { + var sendReceipt = await Send(message, false); + return sendReceipt; + } + + public async Task<SendReceipt> Send(Message message, ITransaction transaction) { - // TODO - return null; + var tx = (Transaction) transaction; + var publishingMessage = tx.TryAddMessage(message); + var sendReceipt = await Send(message, true); + tx.TryAddReceipt(publishingMessage, sendReceipt); + return sendReceipt; } private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq) @@ -181,8 +195,7 @@ namespace Org.Apache.Rocketmq var sendMessageRequest = WrapSendMessageRequest(message, mq); var endpoints = mq.Broker.Endpoints; - var response = - await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); + var response = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); try { var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response); @@ -190,9 +203,8 @@ namespace Org.Apache.Rocketmq var sendReceipt = sendReceipts.First(); if (attempt > 1) { - Logger.Info( - $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + - $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); + Logger.Info($"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + + $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); } return sendReceipt; @@ -203,14 +215,15 @@ namespace Org.Apache.Rocketmq Isolated[endpoints] = true; if (attempt >= maxAttempts) { - Logger.Error( - $"Failed to send message finally, run out of attempt times, topic={message.Topic}, " + - $"maxAttempt={maxAttempts}, attempt={attempt}, endpoints={endpoints}, clientId={ClientId}"); + Logger.Error("Failed to send message finally, run out of attempt times, " + + $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " + + $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}"); throw; } Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " + - $"attempt={attempt}, endpoints={endpoints}, clientId={ClientId}"); + $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," + + $" clientId={ClientId}"); throw; } } @@ -246,7 +259,7 @@ namespace Org.Apache.Rocketmq } } - public Transaction BeginTransaction() + public ITransaction BeginTransaction() { return new Transaction(this); } diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs b/csharp/rocketmq-client-csharp/PublishingMessage.cs index 7839edaa..c5b4b22c 100644 --- a/csharp/rocketmq-client-csharp/PublishingMessage.cs +++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs @@ -31,7 +31,7 @@ namespace Org.Apache.Rocketmq { public MessageType MessageType { set; get; } - private string MessageId { get; } + internal string MessageId { get; } public PublishingMessage(Message message, PublishingSettings publishingSettings, bool txEnabled) : base( message.Topic, message.Body)
