This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 9f0ba427afc407e52b3c9afb7fa210c050aa9ee3 Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 27 14:27:58 2023 +0800 Make it adapt with the newest IDL --- csharp/rocketmq-client-csharp/Consumer.cs | 3 ++- .../Protos/apache/rocketmq/v2/definition.proto | 10 ++++++++++ .../Protos/apache/rocketmq/v2/service.proto | 1 + csharp/rocketmq-client-csharp/SimpleConsumer.cs | 3 ++- 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs index d5a1d9a5..925f1852 100644 --- a/csharp/rocketmq-client-csharp/Consumer.cs +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -84,7 +84,7 @@ namespace Org.Apache.Rocketmq } protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, - FilterExpression filterExpression, TimeSpan invisibleDuration) + FilterExpression filterExpression, TimeSpan awaitDuration, TimeSpan invisibleDuration) { var group = new Proto.Resource { @@ -95,6 +95,7 @@ namespace Org.Apache.Rocketmq Group = group, MessageQueue = mq.ToProtobuf(), FilterExpression = WrapFilterExpression(filterExpression), + LongPollingTimeout = Duration.FromTimeSpan(awaitDuration), BatchSize = batchSize, AutoRenew = false, InvisibleDuration = Duration.FromTimeSpan(invisibleDuration) diff --git a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto index f0a637d7..d10418df 100644 --- a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto +++ b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto @@ -266,6 +266,16 @@ message SystemProperties { // orphan. Servers that manages orphan messages would pick up // a capable publisher to resolve optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19; + + // Information to identify whether this message is from dead letter queue. + optional DeadLetterQueue dead_letter_queue = 20; +} + +message DeadLetterQueue { + // Original topic for this DLQ message. + string topic = 1; + // Original message id for this DLQ message. + string message_id = 2; } message Message { diff --git a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto index 8e880e29..6d203d4d 100644 --- a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto +++ b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto @@ -96,6 +96,7 @@ message ReceiveMessageRequest { optional google.protobuf.Duration invisible_duration = 5; // For message auto renew and clean bool auto_renew = 6; + optional google.protobuf.Duration long_polling_timeout = 7; } message ReceiveMessageResponse { diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index d25dceab..dc5b61c7 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -180,7 +180,8 @@ namespace Org.Apache.Rocketmq var subscriptionLoadBalancer = await GetSubscriptionLoadBalancer(topic); var mq = subscriptionLoadBalancer.TakeMessageQueue(); - var request = WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration); + var request = + WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, _awaitDuration, invisibleDuration); var receiveMessageResult = await ReceiveMessage(request, mq, _awaitDuration); return receiveMessageResult.Messages; }
