This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit c9e3d21fe927c87358c15226534e2a6102723c8f
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Feb 16 16:42:50 2023 +0800

    Implement Producer#send with transaction
---
 csharp/rocketmq-client-csharp/Producer.cs          | 43 ++++++++++++++--------
 csharp/rocketmq-client-csharp/PublishingMessage.cs |  2 +-
 2 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 23041e4c..f9825d32 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -123,12 +123,17 @@ namespace Org.Apache.Rocketmq
             return PublishingSettings.GetRetryPolicy();
         }
 
-        public async Task<SendReceipt> Send(Message message)
+        private async Task<SendReceipt> Send(Message message, bool txEnabled)
         {
             var publishingLoadBalancer = await 
GetPublishingLoadBalancer(message.Topic);
-            var publishingMessage = new PublishingMessage(message, 
PublishingSettings, false);
+            var publishingMessage = new PublishingMessage(message, 
PublishingSettings, txEnabled);
             var retryPolicy = GetRetryPolicy();
             var maxAttempts = retryPolicy.GetMaxAttempts();
+            if (MessageType.Transaction == publishingMessage.MessageType)
+            {
+                // No more retries for transactional message.
+                maxAttempts = 1;
+            }
 
             // Prepare the candidate message queue(s) for retry-sending in 
advance.
             var candidates = null == publishingMessage.MessageGroup
@@ -152,10 +157,19 @@ namespace Org.Apache.Rocketmq
             throw exception!;
         }
 
-        public async Task<SendReceipt> Send(Message message, Transaction 
transaction)
+        public async Task<SendReceipt> Send(Message message)
+        {
+            var sendReceipt = await Send(message, false);
+            return sendReceipt;
+        }
+
+        public async Task<SendReceipt> Send(Message message, ITransaction 
transaction)
         {
-            // TODO
-            return null;
+            var tx = (Transaction) transaction;
+            var publishingMessage = tx.TryAddMessage(message);
+            var sendReceipt = await Send(message, true);
+            tx.TryAddReceipt(publishingMessage, sendReceipt);
+            return sendReceipt;
         }
 
         private static Proto.SendMessageRequest 
WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
@@ -181,8 +195,7 @@ namespace Org.Apache.Rocketmq
 
             var sendMessageRequest = WrapSendMessageRequest(message, mq);
             var endpoints = mq.Broker.Endpoints;
-            var response =
-                await ClientManager.SendMessage(endpoints, sendMessageRequest, 
ClientConfig.RequestTimeout);
+            var response = await ClientManager.SendMessage(endpoints, 
sendMessageRequest, ClientConfig.RequestTimeout);
             try
             {
                 var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, 
response);
@@ -190,9 +203,8 @@ namespace Org.Apache.Rocketmq
                 var sendReceipt = sendReceipts.First();
                 if (attempt > 1)
                 {
-                    Logger.Info(
-                        $"Re-send message successfully, topic={message.Topic}, 
messageId={sendReceipt.MessageId}," +
-                        $" maxAttempts={maxAttempts}, endpoints={endpoints}, 
clientId={ClientId}");
+                    Logger.Info($"Re-send message successfully, 
topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+                                $" maxAttempts={maxAttempts}, 
endpoints={endpoints}, clientId={ClientId}");
                 }
 
                 return sendReceipt;
@@ -203,14 +215,15 @@ namespace Org.Apache.Rocketmq
                 Isolated[endpoints] = true;
                 if (attempt >= maxAttempts)
                 {
-                    Logger.Error(
-                        $"Failed to send message finally, run out of attempt 
times, topic={message.Topic}, " +
-                        $"maxAttempt={maxAttempts}, attempt={attempt}, 
endpoints={endpoints}, clientId={ClientId}");
+                    Logger.Error("Failed to send message finally, run out of 
attempt times, " +
+                                 $"topic={message.Topic}, 
maxAttempt={maxAttempts}, attempt={attempt}, " +
+                                 $"endpoints={endpoints}, 
messageId={message.MessageId}, clientId={ClientId}");
                     throw;
                 }
 
                 Logger.Warn(e, $"Failed to send message, 
topic={message.Topic}, maxAttempts={maxAttempts}, " +
-                               $"attempt={attempt}, endpoints={endpoints}, 
clientId={ClientId}");
+                               $"attempt={attempt}, endpoints={endpoints}, 
messageId={message.MessageId}," +
+                               $" clientId={ClientId}");
                 throw;
             }
         }
@@ -246,7 +259,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public Transaction BeginTransaction()
+        public ITransaction BeginTransaction()
         {
             return new Transaction(this);
         }
diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs 
b/csharp/rocketmq-client-csharp/PublishingMessage.cs
index 7839edaa..c5b4b22c 100644
--- a/csharp/rocketmq-client-csharp/PublishingMessage.cs
+++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs
@@ -31,7 +31,7 @@ namespace Org.Apache.Rocketmq
     {
         public MessageType MessageType { set; get; }
 
-        private string MessageId { get; }
+        internal string MessageId { get; }
 
         public PublishingMessage(Message message, PublishingSettings 
publishingSettings, bool txEnabled) : base(
             message.Topic, message.Body)

Reply via email to