This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 513de8f3eaa802f445f10928312614016f866b3b Author: Aaron Ai <[email protected]> AuthorDate: Wed Mar 15 11:50:43 2023 +0800 Log TooManyRequest --- csharp/rocketmq-client-csharp/Producer.cs | 149 ++++++++++++++++-------------- diff_sec.zip | Bin 0 -> 174 bytes 2 files changed, 79 insertions(+), 70 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index da7e8d03..392c50d4 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -168,39 +168,8 @@ namespace Org.Apache.Rocketmq ? publishingLoadBalancer.TakeMessageQueues(new HashSet<Endpoints>(Isolated.Keys), maxAttempts) : new List<MessageQueue> { publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) }; - Exception exception = null; - for (var attempt = 1; attempt <= maxAttempts; attempt++) - { - var stopwatch = Stopwatch.StartNew(); - try - { - var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts); - return sendReceipt; - } - catch (Exception e) - { - exception = e; - } - finally - { - var elapsed = stopwatch.Elapsed.Milliseconds; - _sendCostTimeHistogram.Record(elapsed, - new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic), - new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId), - new KeyValuePair<string, object>(MetricConstant.InvocationStatus, - null == exception ? MetricConstant.Success : MetricConstant.Failure)); - // Retry immediately if the request is not throttled. - if (exception is TooManyRequestsException) - { - var nextAttempt = 1 + attempt; - var delay = retryPolicy.GetNextAttemptDelay(nextAttempt); - await Task.Delay(delay); - } - } - } - - throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}", - exception); + var sendReceipt = await Send0(publishingMessage, candidates); + return sendReceipt; } public async Task<ISendReceipt> Send(Message message) @@ -236,53 +205,93 @@ namespace Org.Apache.Rocketmq }; } - private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates, int attempt, - int maxAttempts) + private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates) { - var candidateIndex = (attempt - 1) % candidates.Count; - var mq = candidates[candidateIndex]; - if (PublishingSettings.IsValidateMessageType() && - !mq.AcceptMessageTypes.Contains(message.MessageType)) - { - throw new ArgumentException("Current message type does not match with the accept message types," + - $" topic={message.Topic}, actualMessageType={message.MessageType}" + - $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}"); - } - - var sendMessageRequest = WrapSendMessageRequest(message, mq); - var endpoints = mq.Broker.Endpoints; - var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); - try + var retryPolicy = GetRetryPolicy(); + var maxAttempts = retryPolicy.GetMaxAttempts(); + Exception exception = null; + for (var attempt = 1; attempt <= maxAttempts; attempt++) { - var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation); + var stopwatch = Stopwatch.StartNew(); - var sendReceipt = sendReceipts.First(); - if (attempt > 1) + var candidateIndex = (attempt - 1) % candidates.Count; + var mq = candidates[candidateIndex]; + if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType)) { - Logger.Info( - $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + - $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); + throw new ArgumentException( + "Current message type does not match with the accept message types," + + $" topic={message.Topic}, actualMessageType={message.MessageType}" + + $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}"); } - return sendReceipt; - } - catch (Exception e) - { - // Isolate current endpoints. - Isolated[endpoints] = true; - if (attempt >= maxAttempts) + var sendMessageRequest = WrapSendMessageRequest(message, mq); + var endpoints = mq.Broker.Endpoints; + try { - Logger.Error(e, "Failed to send message finally, run out of attempt times, " + - $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " + - $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}"); - throw; + var invocation = + await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout); + var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation); + + var sendReceipt = sendReceipts.First(); + if (attempt > 1) + { + Logger.Info( + $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," + + $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}"); + } + + return sendReceipt; } + catch (Exception e) + { + // Isolate current endpoints. + Isolated[endpoints] = true; + if (attempt >= maxAttempts) + { + Logger.Error(e, "Failed to send message finally, run out of attempt times, " + + $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " + + $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}"); + throw; + } - Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " + - $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," + - $" clientId={ClientId}"); - throw; + if (MessageType.Transaction == message.MessageType) + { + Logger.Error(e, "Failed to send transaction message, run out of attempt times, " + + $"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " + + $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}"); + throw; + } + + exception = e; + if (exception is not TooManyRequestsException) + { + // Retry immediately if the request is not throttled. + Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " + + $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," + + $" clientId={ClientId}"); + continue; + } + + var nextAttempt = 1 + attempt; + var delay = retryPolicy.GetNextAttemptDelay(nextAttempt); + await Task.Delay(delay); + Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " + + $"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " + + $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}"); + } + finally + { + var elapsed = stopwatch.Elapsed.Milliseconds; + _sendCostTimeHistogram.Record(elapsed, + new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic), + new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId), + new KeyValuePair<string, object>(MetricConstant.InvocationStatus, + null == exception ? MetricConstant.Success : MetricConstant.Failure)); + } } + + throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}", + exception); } internal override Settings GetSettings() diff --git a/diff_sec.zip b/diff_sec.zip new file mode 100644 index 00000000..89278e56 Binary files /dev/null and b/diff_sec.zip differ
