This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 2ac0bdde The C# client supports producers in recalling timed and
delayed messages, as well as sending priority messages. It also supports the
Lite Push Consumer. (#1229)
2ac0bdde is described below
commit 2ac0bdde1cbd5c1a83d5cd4153f158a31b2d8f48
Author: zhaohai <[email protected]>
AuthorDate: Wed May 6 14:52:29 2026 +0800
The C# client supports producers in recalling timed and delayed messages,
as well as sending priority messages. It also supports the Lite Push Consumer.
(#1229)
* C# Client supports Recalling timed/delay messages for Producer and
Priority Message.
* Update protos submodule to latest version
* update md
* add lite
* fix LitePushConsumerExample.cs
* fix lite push consumer
---
README-CN.md | 6 +-
README.md | 6 +-
csharp/README-CN.md | 17 +-
csharp/examples/LitePushConsumerExample.cs | 116 ++++++++
csharp/examples/ProducerLiteMessageExample.cs | 97 +++++++
csharp/examples/ProducerPriorityMessageExample.cs | 99 +++++++
.../ProducerWithRecallingTimedMessageExample.cs | 149 ++++++++++
csharp/rocketmq-client-csharp/Client.cs | 8 +-
csharp/rocketmq-client-csharp/ClientManager.cs | 9 +
csharp/rocketmq-client-csharp/ClientType.cs | 4 +-
csharp/rocketmq-client-csharp/Consumer.cs | 24 ++
.../LiteSubscriptionQuotaExceededException.cs} | 22 +-
csharp/rocketmq-client-csharp/FilterExpression.cs | 5 +
csharp/rocketmq-client-csharp/IClientManager.cs | 10 +
csharp/rocketmq-client-csharp/ILitePushConsumer.cs | 68 +++++
csharp/rocketmq-client-csharp/IRpcClient.cs | 5 +-
csharp/rocketmq-client-csharp/LitePushConsumer.cs | 323 +++++++++++++++++++++
.../LitePushSubscriptionSettings.cs | 48 +++
.../LiteSubscriptionManager.cs | 272 +++++++++++++++++
csharp/rocketmq-client-csharp/Message.cs | 80 ++++-
csharp/rocketmq-client-csharp/MessageType.cs | 8 +
csharp/rocketmq-client-csharp/MessageView.cs | 14 +-
csharp/rocketmq-client-csharp/OffsetOption.cs | 126 ++++++++
.../rocketmq-client-csharp/OffsetOptionHelper.cs | 78 +++++
csharp/rocketmq-client-csharp/Producer.cs | 19 ++
csharp/rocketmq-client-csharp/PublishingMessage.cs | 48 ++-
csharp/rocketmq-client-csharp/PushConsumer.cs | 67 ++++-
.../PushSubscriptionSettings.cs | 15 +-
csharp/rocketmq-client-csharp/RpcClient.cs | 10 +
csharp/rocketmq-client-csharp/Session.cs | 7 +
protos | 2 +-
31 files changed, 1726 insertions(+), 36 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index b5b715a2..8947e01a 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -23,12 +23,12 @@
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
-| Priority Message | ✅ | 🚧 | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Priority Message | ✅ | 🚧 | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
## 先决条件和构建
diff --git a/README.md b/README.md
index 0d064685..e416805f 100644
--- a/README.md
+++ b/README.md
@@ -23,12 +23,12 @@ Provide cloud-native and robust solutions for Java, C++,
C#, Golang, Rust and al
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Producer with recalling timed/delay messages | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | ✅ | 🚧 |
-| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
-| Priority Message | ✅ | 🚧 | 🚧 | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
+| Priority Message | ✅ | 🚧 | ✅ | ✅
| 🚧 | ✅ | ✅ | 🚧 |
## Prerequisite and Build
diff --git a/csharp/README-CN.md b/csharp/README-CN.md
index dacf7a2a..e8896177 100644
--- a/csharp/README-CN.md
+++ b/csharp/README-CN.md
@@ -37,6 +37,17 @@ dotnet add package RocketMQ.Client
你可以从 [Nuget Gallery](https://www.nuget.org/packages/RocketMQ.Client) 从获取最新的
`RocketMQ.Client`
版本,我们提供了[代码示例](./examples)来帮助你快速开始。
+### 可用示例
+
+- **ProducerNormalMessageExample**: 普通消息发送示例
+- **ProducerFifoMessageExample**: FIFO(顺序)消息发送示例
+- **ProducerDelayMessageExample**: 延迟/定时消息发送示例
+- **ProducerPriorityMessageExample**: 优先级消息发送示例 ✨ NEW
+- **ProducerWithRecallingTimedMessageExample**: 带召回功能的定时消息示例 ✨ NEW
+- **ProducerTransactionMessageExample**: 事务消息发送示例
+- **PushConsumerExample**: Push消费者示例(支持FIFO消费加速器)
+- **SimpleConsumerExample**: Simple消费者示例
+
## 构建
本项目的布局大致遵循[此处的指南](https://docs.microsoft.com/en-us/dotnet/core/tutorials/library-with-visual-studio-code?pivots=dotnet-5-0)
@@ -55,10 +66,10 @@ dotnet test -l "console;verbosity=detailed"
默认的 `LoggerFactory` 是 [NLog](https://nlog-project.org/)。与 Java
客户端类似,我们允许使用环境变量来自定义日志相关的配置:
-* `rocketmq_log_level`:日志输出级别,默认为 INFO。
-* `rocketmq_log_root`
+- `rocketmq_log_level`:日志输出级别,默认为 INFO。
+- `rocketmq_log_root`
:日志输出的根目录。默认路径为 `$HOME/logs/rocketmq`,因此完整路径为
`$HOME/logs/rocketmq/rocketmq-client.log`。
-* `rocketmq_log_file_maxIndex`:要保留的日志文件的最大数量。默认值为 10,单个日志文件的大小限制为 64 MB。暂不支持调整。
+- `rocketmq_log_file_maxIndex`:要保留的日志文件的最大数量。默认值为 10,单个日志文件的大小限制为 64 MB。暂不支持调整。
如果你想使用自定义的 `LoggerFactory`,可以使用 `MqLogManager.UseLoggerFactory` 方法来配置。
diff --git a/csharp/examples/LitePushConsumerExample.cs
b/csharp/examples/LitePushConsumerExample.cs
new file mode 100644
index 00000000..ec42456e
--- /dev/null
+++ b/csharp/examples/LitePushConsumerExample.cs
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace Org.Apache.Rocketmq.Examples
+{
+ /// <summary>
+ /// Example demonstrating how to use LitePushConsumer for consuming
messages from lite topics.
+ /// Lite topics enable dynamic topic routing without pre-defining all
topics.
+ ///
+ /// Key Points:
+ /// - SubscribeLite() adds lite topics dynamically
+ /// - UnsubscribeLite() removes lite topics
+ /// - GetLiteTopicSet() returns current subscriptions
+ /// - BindTopic is the parent topic for all lite topics
+ /// </summary>
+ internal static class LitePushConsumerExample
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger(typeof(LitePushConsumerExample).FullName);
+
+ internal static async Task QuickStart()
+ {
+ // Configure client for local testing (no authentication required)
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8081")
+ .Build();
+
+ // Define bind topic (parent topic) and consumer group
+ const string bindTopic = "topic-lite";
+ const string consumerGroup = "GID-lite-consumer";
+
+ // Build lite push consumer
+ Logger.LogInformation($"Creating LitePushConsumer,
bindTopic={bindTopic}, consumerGroup={consumerGroup}");
+ var litePushConsumer = await new LitePushConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetBindTopic(bindTopic)
+ .SetMessageListener(new CustomMessageListener())
+ .SetMaxCacheMessageCount(1024)
+ .SetMaxCacheMessageSizeInBytes(64 * 1024 * 1024)
+ .SetConsumptionThreadCount(20)
+ .Build();
+
+ Logger.LogInformation($"LitePushConsumer started successfully,
bindTopic={bindTopic}, consumerGroup={consumerGroup}");
+
+ try
+ {
+ // Subscribe to lite topics dynamically
+ var liteTopics = new[] { "order-created", "order-updated",
"order-completed" };
+
+ Logger.LogInformation($"Subscribing to {liteTopics.Length}
lite topics...");
+ foreach (var liteTopic in liteTopics)
+ {
+ await litePushConsumer.SubscribeLite(liteTopic);
+ Logger.LogInformation($"Subscribed to lite topic:
{liteTopic}");
+ }
+
+ // Optionally subscribe with offset option
+ // await litePushConsumer.SubscribeLite("new-topic",
OffsetOption.LastOffset);
+
+ // Get current lite topic set
+ var subscribedTopics = litePushConsumer.GetLiteTopicSet();
+ Logger.LogInformation($"Currently subscribed to
{subscribedTopics.Count} lite topics");
+
+ // Keep the consumer running
+ Logger.LogInformation("LitePushConsumer is running. Press any
key to exit...");
+ Console.ReadKey();
+
+ // Unsubscribe from a lite topic
+ await litePushConsumer.UnsubscribeLite("order-completed");
+ Logger.LogInformation("Unsubscribed from lite topic:
order-completed");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, "Error occurred while running
LitePushConsumer");
+ }
+ finally
+ {
+ // Shutdown the consumer
+ await litePushConsumer.DisposeAsync();
+ Logger.LogInformation("LitePushConsumer shutdown completed");
+ }
+ }
+
+ private class CustomMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ var body = Encoding.UTF8.GetString(messageView.Body);
+ Logger.LogInformation($"Received lite message:
messageId={messageView.MessageId}, " +
+ $"topic={messageView.Topic}, body={body}");
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
diff --git a/csharp/examples/ProducerLiteMessageExample.cs
b/csharp/examples/ProducerLiteMessageExample.cs
new file mode 100644
index 00000000..6408ec62
--- /dev/null
+++ b/csharp/examples/ProducerLiteMessageExample.cs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+ /// <summary>
+ /// Demonstrates how to send lite messages using Apache RocketMQ C# client.
+ /// Lite messages use dynamic topic routing without pre-defining all
topics.
+ ///
+ /// Key Points:
+ /// - LiteTopic enables flexible message routing
+ /// - Cannot be used with: messageGroup, deliveryTimestamp, or priority
+ /// - Parent topic must be configured for lite messaging
+ /// </summary>
+ internal static class ProducerLiteMessageExample
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger(typeof(ProducerLiteMessageExample).FullName);
+
+ private static readonly string AccessKey =
Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey =
Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint =
Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
+ internal static async Task QuickStart()
+ {
+ // Enable the switch if you use .NET Core 3.1 and want to disable
TLS/SSL.
+ //
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport",
true);
+
+ // Configure client for local testing (no authentication required)
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8081")
+ .Build();
+
+ const string parentTopic = "topic-lite";
+
+ // Create producer with singleton pattern (recommended)
+ var producer = await new Producer.Builder()
+ .SetTopics(parentTopic) // Prefetch topic route for better
performance
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ // Define message content
+ var body = Encoding.UTF8.GetBytes("This is a lite message for
testing");
+ const string tag = "LiteTest";
+
+ Logger.LogInformation("Sending lite messages with different lite
topics...");
+
+ // Send messages with different lite topics for dynamic routing
+ var liteTopics = new[] { "order-created", "order-updated",
"order-completed" };
+
+ foreach (var liteTopic in liteTopics)
+ {
+ var message = new Message.Builder()
+ .SetTopic(parentTopic)
+ .SetBody(body)
+ .SetTag(tag)
+ .SetKeys($"lite-{liteTopic}")
+ .SetLiteTopic(liteTopic) // Dynamic topic routing
+ .Build();
+
+ try
+ {
+ var sendReceipt = await producer.Send(message);
+ Logger.LogInformation($"Sent lite message successfully,
messageId={sendReceipt.MessageId}, liteTopic={liteTopic}");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Failed to send lite message,
liteTopic={liteTopic}");
+ }
+ }
+
+ Logger.LogInformation("\nAll lite messages sent. Messages will be
routed based on their lite topics.");
+
+ // Close the producer if you don't need it anymore.
+ await producer.DisposeAsync();
+ }
+ }
+}
diff --git a/csharp/examples/ProducerPriorityMessageExample.cs
b/csharp/examples/ProducerPriorityMessageExample.cs
new file mode 100644
index 00000000..036d0f06
--- /dev/null
+++ b/csharp/examples/ProducerPriorityMessageExample.cs
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+ /// <summary>
+ /// Demonstrates how to send priority messages using Apache RocketMQ C#
client.
+ /// Priority messages allow you to assign different priority levels to
messages,
+ /// where higher priority messages are consumed before lower priority ones.
+ ///
+ /// Key Points:
+ /// - Priority must be >= 0 (higher value = higher priority)
+ /// - Cannot be used with: messageGroup, deliveryTimestamp, or liteTopic
+ /// - Consumer Group for testing: GID-priority-consumer
+ /// </summary>
+ internal static class ProducerPriorityMessageExample
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger(typeof(ProducerPriorityMessageExample).FullName);
+
+ private static readonly string AccessKey =
Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey =
Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint =
Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
+ internal static async Task QuickStart()
+ {
+ // Enable the switch if you use .NET Core 3.1 and want to disable
TLS/SSL.
+ //
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport",
true);
+
+ // Configure client for local testing (no authentication required)
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8081")
+ .Build();
+
+ const string topic = "topic-priority";
+
+ // Create producer with singleton pattern (recommended)
+ var producer = await new Producer.Builder()
+ .SetTopics(topic) // Prefetch topic route for better
performance
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ // Define message content
+ var body = Encoding.UTF8.GetBytes("This is a priority message for
testing");
+ const string tag = "PriorityTest";
+
+ Logger.LogInformation($"Consumer Group: GID-priority-consumer");
+ Logger.LogInformation("Sending messages with different priority
levels (higher value = higher priority)...");
+
+ // Send messages with priority levels from low to high
+ var priorities = new[] { 1, 3, 5, 8, 10 };
+
+ foreach (var priority in priorities)
+ {
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(body)
+ .SetTag(tag)
+ .SetKeys($"priority-{priority}")
+ .SetPriority(priority) // Higher priority messages are
consumed first
+ .Build();
+
+ try
+ {
+ var sendReceipt = await producer.Send(message);
+ Logger.LogInformation($"Sent priority message
successfully, messageId={sendReceipt.MessageId}, priority={priority}");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Failed to send priority message,
priority={priority}");
+ }
+ }
+
+ Logger.LogInformation("\nAll priority messages sent. Messages will
be consumed in priority order (highest first).");
+
+ // Close the producer if you don't need it anymore.
+ await producer.DisposeAsync();
+ }
+ }
+}
diff --git a/csharp/examples/ProducerWithRecallingTimedMessageExample.cs
b/csharp/examples/ProducerWithRecallingTimedMessageExample.cs
new file mode 100644
index 00000000..7cee8352
--- /dev/null
+++ b/csharp/examples/ProducerWithRecallingTimedMessageExample.cs
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+ /// <summary>
+ /// Demonstrates how to send timed/delay messages and recall them before
delivery
+ /// using Apache RocketMQ C# client.
+ ///
+ /// This example shows:
+ /// 1. How to send a delay message with a future delivery timestamp
+ /// 2. How to recall (cancel) a scheduled message before it's delivered
+ /// 3. Use cases: order cancellation, appointment reminders, scheduled
notifications
+ /// </summary>
+ internal static class ProducerWithRecallingTimedMessageExample
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger(typeof(ProducerWithRecallingTimedMessageExample).FullName);
+
+ private static readonly string AccessKey =
Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
+ private static readonly string SecretKey =
Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
+ private static readonly string Endpoint =
Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
+
+ internal static async Task QuickStart()
+ {
+ // Enable the switch if you use .NET Core 3.1 and want to disable
TLS/SSL.
+ //
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport",
true);
+
+ // No credentials needed for local testing
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints("127.0.0.1:8081")
+ .Build();
+
+ const string topic = "topic-delay-new";
+ // In most case, you don't need to create too many producers,
singleton pattern is recommended.
+ var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ try
+ {
+ // Example 1: Send a delay message using RocketMQ delay levels
+ // RocketMQ supports 18 delay levels: 1s 5s 10s 30s 1m 2m 3m
4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+ Logger.LogInformation("=== Example 1: Send delay message with
level 5 (1 minute) ===");
+ var delayMessageBytes = Encoding.UTF8.GetBytes("This is a
delay message for testing recall functionality");
+ var delayMessage = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(delayMessageBytes)
+ .SetTag("DelayTest")
+ .SetKeys("test-delay-recall-001")
+ // Set delivery timestamp to 60 seconds from now (delay
level 5)
+ .SetDeliveryTimestamp(DateTime.Now.AddSeconds(60))
+ .Build();
+
+ var sendReceipt = await producer.Send(delayMessage);
+ Logger.LogInformation($"Delay message sent successfully,
messageId={sendReceipt.MessageId}");
+ Logger.LogInformation($"Consumer Group:
GID-normal-consumer_topic-normal");
+
+ // Note: In a real scenario, you would store the recallHandle
from sendReceipt
+ // to use for recalling the message later if needed.
+ // For this example, we'll demonstrate the recall API
structure.
+
+ // Example 2: Send a delay message and demonstrate recall
+ Logger.LogInformation("\n=== Example 2: Send delay message for
recall test ===");
+ var recallableMessageBytes = Encoding.UTF8.GetBytes("This
message will be recalled before delivery - Test Case");
+ var recallableMessage = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(recallableMessageBytes)
+ .SetTag("RecallTest")
+ .SetKeys("test-delay-recall-002")
+ // Set delivery timestamp to 120 seconds from now (delay
level 8)
+ .SetDeliveryTimestamp(DateTime.Now.AddSeconds(120))
+ .Build();
+
+ var recallableReceipt = await producer.Send(recallableMessage);
+ Logger.LogInformation($"Recallable message sent,
messageId={recallableReceipt.MessageId}");
+ Logger.LogInformation($"To recall this message, use the
recallHandle from SendReceipt");
+
+ // Simulate a scenario where you need to recall the message
+ // For example: user cancelled the order, so we don't need to
send the reminder
+ Logger.LogInformation("Simulating message recall (e.g., order
cancelled)...");
+
+ // In production, you would get the recallHandle from the send
receipt
+ // and store it in your database along with the business data
+ // var recallHandle = recallableReceipt.RecallHandle; // This
would be available in the receipt
+
+ // When you need to recall the message:
+ // var recallReceipt = await producer.RecallMessage(topic,
recallHandle);
+ // Logger.LogInformation($"Message recalled successfully,
recallReceipt={recallReceipt}");
+
+ // For demonstration purposes, we show the API call structure:
+ Logger.LogInformation("To recall a message, use: await
producer.RecallMessage(topic, recallHandle)");
+ Logger.LogInformation("The recallHandle should be obtained
from the SendReceipt when sending the message");
+
+ // Example 3: Send multiple delay messages with different
RocketMQ delay levels
+ Logger.LogInformation("\n=== Example 3: Multiple delay
messages with different levels ===");
+ // RocketMQ 18 delay levels: 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m,
5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
+ var delayLevels = new[] { 1, 5, 10, 30, 60 }; // seconds
+
+ foreach (var delaySeconds in delayLevels)
+ {
+ var messageBytes = Encoding.UTF8.GetBytes($"Delay message
- Level {delaySeconds}s");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(messageBytes)
+ .SetTag($"Delay-{delaySeconds}s")
+ .SetKeys($"test-delay-{delaySeconds}")
+
.SetDeliveryTimestamp(DateTime.Now.AddSeconds(delaySeconds))
+ .Build();
+
+ var receipt = await producer.Send(message);
+ Logger.LogInformation($"Delay message sent,
level={delaySeconds}s, messageId={receipt.MessageId}");
+ }
+
+ Logger.LogInformation("\nAll delay messages sent
successfully!");
+ Logger.LogInformation("In production, store recallHandle for
each message to enable recall functionality.");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, "Failed to send/recall delay messages");
+ }
+ finally
+ {
+ // Close the producer if you don't need it anymore.
+ await producer.DisposeAsync();
+ }
+ }
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/Client.cs
b/csharp/rocketmq-client-csharp/Client.cs
index 491ef4cd..5c2960fa 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -502,11 +502,17 @@ namespace Org.Apache.Rocketmq
await session.WriteAsync(telemetryCommand);
}
- internal void OnSettingsCommand(Endpoints endpoints, Proto.Settings
settings)
+ internal virtual void OnSettingsCommand(Endpoints endpoints,
Proto.Settings settings)
{
var metric = new Metric(settings.Metric ?? new Proto.Metric());
ClientMeterManager.Reset(metric);
GetSettings().Sync(settings);
}
+
+ internal virtual void OnNotifyUnsubscribeLiteCommand(Endpoints
endpoints, Proto.NotifyUnsubscribeLiteCommand command)
+ {
+ // Default implementation does nothing
+ // LitePushConsumer will override this to handle lite topic
unsubscription
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs
b/csharp/rocketmq-client-csharp/ClientManager.cs
index b061b5c9..eca4881c 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -194,5 +194,14 @@ namespace Org.Apache.Rocketmq
return new RpcInvocation<Proto.EndTransactionRequest,
Proto.EndTransactionResponse>(
request, response, metadata);
}
+
+ public async Task<RpcInvocation<Proto.SyncLiteSubscriptionRequest,
Proto.SyncLiteSubscriptionResponse>> SyncLiteSubscription(
+ Endpoints endpoints, Proto.SyncLiteSubscriptionRequest request,
TimeSpan timeout)
+ {
+ var metadata = _client.Sign();
+ var response = await
GetRpcClient(endpoints).SyncLiteSubscription(metadata, request, timeout);
+ return new RpcInvocation<Proto.SyncLiteSubscriptionRequest,
Proto.SyncLiteSubscriptionResponse>(
+ request, response, metadata);
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientType.cs
b/csharp/rocketmq-client-csharp/ClientType.cs
index 487b3bec..a2c7cf78 100644
--- a/csharp/rocketmq-client-csharp/ClientType.cs
+++ b/csharp/rocketmq-client-csharp/ClientType.cs
@@ -23,7 +23,8 @@ namespace Org.Apache.Rocketmq
{
Producer,
SimpleConsumer,
- PushConsumer
+ PushConsumer,
+ LitePushConsumer
}
public static class ClientTypeHelper
@@ -35,6 +36,7 @@ namespace Org.Apache.Rocketmq
ClientType.Producer => Proto.ClientType.Producer,
ClientType.SimpleConsumer => Proto.ClientType.SimpleConsumer,
ClientType.PushConsumer => Proto.ClientType.PushConsumer,
+ ClientType.LitePushConsumer =>
Proto.ClientType.LitePushConsumer,
_ => Proto.ClientType.Unspecified
};
}
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs
b/csharp/rocketmq-client-csharp/Consumer.cs
index 2ad135b6..ad3e3551 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -39,6 +39,30 @@ namespace Org.Apache.Rocketmq
ConsumerGroup = consumerGroup;
}
+ /// <summary>
+ /// Check if this is a lite consumer (LitePushConsumer or
LiteSimpleConsumer).
+ /// Note: This method checks the settings ClientType to determine if
it's a lite consumer.
+ /// </summary>
+ /// <returns>True if this is a lite consumer, false
otherwise.</returns>
+ public bool IsLiteConsumer()
+ {
+ // For now, we check if GetSettings returns
LitePushSubscriptionSettings
+ // This can be extended when LiteSimpleConsumer is implemented
+ var settings = GetSettings();
+ return settings is LitePushSubscriptionSettings;
+ }
+
+ /// <summary>
+ /// Get client type for this consumer instance.
+ /// Subclasses should override this to return their specific client
type.
+ /// </summary>
+ /// <returns>The client type.</returns>
+ protected virtual ClientType GetClientType()
+ {
+ // Default implementation, subclasses should override
+ return ClientType.PushConsumer;
+ }
+
internal async Task<ReceiveMessageResult>
ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
TimeSpan awaitDuration)
{
diff --git a/csharp/rocketmq-client-csharp/FilterExpression.cs
b/csharp/rocketmq-client-csharp/Error/LiteSubscriptionQuotaExceededException.cs
similarity index 63%
copy from csharp/rocketmq-client-csharp/FilterExpression.cs
copy to
csharp/rocketmq-client-csharp/Error/LiteSubscriptionQuotaExceededException.cs
index beef7863..52d1fe07 100644
--- a/csharp/rocketmq-client-csharp/FilterExpression.cs
+++
b/csharp/rocketmq-client-csharp/Error/LiteSubscriptionQuotaExceededException.cs
@@ -15,17 +15,23 @@
* limitations under the License.
*/
-namespace Org.Apache.Rocketmq
+using System;
+
+namespace Org.Apache.Rocketmq.Error
{
- public class FilterExpression
+ /// <summary>
+ /// Exception thrown when lite subscription quota is exceeded.
+ /// </summary>
+ public class LiteSubscriptionQuotaExceededException : ClientException
{
- public FilterExpression(string expression, ExpressionType type =
ExpressionType.Tag)
+ public LiteSubscriptionQuotaExceededException(string message)
+ : base(message)
{
- Expression = expression;
- Type = type;
}
- public ExpressionType Type { get; }
- public string Expression { get; }
+ public LiteSubscriptionQuotaExceededException(string message,
Exception innerException)
+ : base(message, innerException)
+ {
+ }
}
-}
\ No newline at end of file
+}
diff --git a/csharp/rocketmq-client-csharp/FilterExpression.cs
b/csharp/rocketmq-client-csharp/FilterExpression.cs
index beef7863..22585e3c 100644
--- a/csharp/rocketmq-client-csharp/FilterExpression.cs
+++ b/csharp/rocketmq-client-csharp/FilterExpression.cs
@@ -19,6 +19,11 @@ namespace Org.Apache.Rocketmq
{
public class FilterExpression
{
+ /// <summary>
+ /// Subscribe to all messages (wildcard expression).
+ /// </summary>
+ public static readonly FilterExpression SubAll = new
FilterExpression("*");
+
public FilterExpression(string expression, ExpressionType type =
ExpressionType.Tag)
{
Expression = expression;
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs
b/csharp/rocketmq-client-csharp/IClientManager.cs
index 62d733e9..60177eb6 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -142,6 +142,16 @@ namespace Org.Apache.Rocketmq
Task<RpcInvocation<EndTransactionRequest, EndTransactionResponse>>
EndTransaction(Endpoints endpoints,
EndTransactionRequest request, TimeSpan timeout);
+ /// <summary>
+ /// Sync lite subscription info for lite push consumer.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <param name="request">gRPC request for syncing lite
subscription.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns>Task of response.</returns>
+ Task<RpcInvocation<SyncLiteSubscriptionRequest,
SyncLiteSubscriptionResponse>> SyncLiteSubscription(
+ Endpoints endpoints, SyncLiteSubscriptionRequest request, TimeSpan
timeout);
+
Task Shutdown();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ILitePushConsumer.cs
b/csharp/rocketmq-client-csharp/ILitePushConsumer.cs
new file mode 100644
index 00000000..39c09c79
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ILitePushConsumer.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Lite push consumer interface for consuming messages from lite topics.
+ /// Lite topics allow dynamic topic routing without pre-defining all
topics.
+ /// </summary>
+ public interface ILitePushConsumer : IAsyncDisposable, IDisposable
+ {
+ /// <summary>
+ /// Subscribe to a lite topic.
+ /// The subscribeLite() method initiates network requests and performs
quota verification, so it may fail.
+ /// It's important to check the result of this call to ensure that the
subscription was successfully added.
+ /// Possible failure scenarios include:
+ /// 1. Network request errors, which can be retried.
+ /// 2. Quota verification failures, indicated by
LiteSubscriptionQuotaExceededException. In this case,
+ /// evaluate whether the quota is insufficient and promptly
unsubscribe from unused subscriptions
+ /// using UnsubscribeLite() to free up resources.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to
subscribe</param>
+ Task SubscribeLite(string liteTopic);
+
+ /// <summary>
+ /// Subscribe to a lite topic with offsetOption to specify the consume
from offset.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to
subscribe</param>
+ /// <param name="offsetOption">The consume from offset option</param>
+ Task SubscribeLite(string liteTopic, OffsetOption offsetOption);
+
+ /// <summary>
+ /// Unsubscribe from a lite topic.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to unsubscribe
from</param>
+ Task UnsubscribeLite(string liteTopic);
+
+ /// <summary>
+ /// Get the lite topic immutable set.
+ /// </summary>
+ /// <returns>Lite topic immutable set</returns>
+ ISet<string> GetLiteTopicSet();
+
+ /// <summary>
+ /// Get the load balancing group for the consumer.
+ /// </summary>
+ /// <returns>Consumer load balancing group</returns>
+ string GetConsumerGroup();
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs
b/csharp/rocketmq-client-csharp/IRpcClient.cs
index eb369c2d..e4206395 100644
--- a/csharp/rocketmq-client-csharp/IRpcClient.cs
+++ b/csharp/rocketmq-client-csharp/IRpcClient.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -54,6 +54,9 @@ namespace Org.Apache.Rocketmq
Task<RecallMessageResponse> RecallMessage(Metadata metadata,
RecallMessageRequest request, TimeSpan timeout);
+ Task<SyncLiteSubscriptionResponse> SyncLiteSubscription(Metadata
metadata,
+ SyncLiteSubscriptionRequest request, TimeSpan timeout);
+
Task Shutdown();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/LitePushConsumer.cs
b/csharp/rocketmq-client-csharp/LitePushConsumer.cs
new file mode 100644
index 00000000..527f88ca
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/LitePushConsumer.cs
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Lite push consumer extends standard push consumer with lite topic
support.
+ /// Lite topics allow dynamic topic routing without pre-defining all
topics.
+ /// </summary>
+ public class LitePushConsumer : PushConsumer, ILitePushConsumer
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger<LitePushConsumer>();
+
+ private readonly LiteSubscriptionManager _liteSubscriptionManager;
+ private readonly string _bindTopic;
+
+ /// <summary>
+ /// Creates a new instance of LitePushConsumer.
+ /// </summary>
+ /// <param name="clientConfig">Client configuration</param>
+ /// <param name="consumerGroup">Consumer group name</param>
+ /// <param name="bindTopic">The bind topic for lite
subscriptions</param>
+ /// <param name="subscriptionExpressions">Initial subscription
expressions (can be empty for lite consumer)</param>
+ /// <param name="messageListener">Message listener callback</param>
+ /// <param name="maxCacheMessageCount">Maximum cache message
count</param>
+ /// <param name="maxCacheMessageSizeInBytes">Maximum cache message
size in bytes</param>
+ /// <param name="consumptionThreadCount">Number of consumption
threads</param>
+ /// <param name="enableFifoConsumeAccelerator">Enable FIFO consume
accelerator</param>
+ public LitePushConsumer(
+ ClientConfig clientConfig,
+ string consumerGroup,
+ string bindTopic,
+ ConcurrentDictionary<string, FilterExpression>
subscriptionExpressions,
+ IMessageListener messageListener,
+ int maxCacheMessageCount,
+ int maxCacheMessageSizeInBytes,
+ int consumptionThreadCount,
+ bool enableFifoConsumeAccelerator = false)
+ : base(clientConfig, consumerGroup, subscriptionExpressions,
messageListener,
+ maxCacheMessageCount, maxCacheMessageSizeInBytes,
consumptionThreadCount,
+ enableFifoConsumeAccelerator)
+ {
+ if (string.IsNullOrWhiteSpace(bindTopic))
+ {
+ throw new ArgumentException("bindTopic cannot be null or
empty", nameof(bindTopic));
+ }
+
+ _bindTopic = bindTopic;
+ _liteSubscriptionManager = new LiteSubscriptionManager(this,
bindTopic, consumerGroup);
+ }
+
+ protected override async Task Start()
+ {
+ await base.Start();
+
+ // Manually fetch route for bind topic to ensure it's available
before lite subscription sync
+ // This triggers infrastructure initialization (Telemetry Session,
routing, etc.)
+ try
+ {
+ Logger.LogInformation($"Fetching route for bind topic:
{_bindTopic}, clientId={ClientId}");
+ var routeData = await GetRouteData(_bindTopic);
+ Logger.LogInformation($"Successfully fetched route for bind
topic: {_bindTopic}, " +
+
$"messageQueueCount={routeData.MessageQueues.Count}, clientId={ClientId}");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Failed to fetch route for bind topic:
{_bindTopic}, clientId={ClientId}. " +
+ $"Lite subscription sync may be affected.");
+ // Don't throw here, allow the consumer to start anyway
+ }
+
+ // Start LiteSubscriptionManager which will:
+ // 1. Perform initial sync of all lite subscriptions (even if
empty)
+ // 2. Schedule periodic sync every 30 seconds
+ _liteSubscriptionManager.Start();
+
+ Logger.LogInformation($"LitePushConsumer started successfully,
clientId={ClientId}, bindTopic={_bindTopic}, consumerGroup={ConsumerGroup}");
+ }
+
+ /// <summary>
+ /// Subscribe to a lite topic.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to
subscribe</param>
+ public async Task SubscribeLite(string liteTopic)
+ {
+ await _liteSubscriptionManager.SubscribeLite(liteTopic, null);
+ }
+
+ /// <summary>
+ /// Subscribe to a lite topic with offsetOption to specify the consume
from offset.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to
subscribe</param>
+ /// <param name="offsetOption">The consume from offset option. If
null, uses default offset policy.</param>
+ public async Task SubscribeLite(string liteTopic, OffsetOption
offsetOption)
+ {
+ await _liteSubscriptionManager.SubscribeLite(liteTopic,
offsetOption);
+ }
+
+ /// <summary>
+ /// Unsubscribe from a lite topic.
+ /// </summary>
+ /// <param name="liteTopic">The name of the lite topic to unsubscribe
from</param>
+ public async Task UnsubscribeLite(string liteTopic)
+ {
+ await _liteSubscriptionManager.UnsubscribeLite(liteTopic);
+ }
+
+ /// <summary>
+ /// Get the lite topic immutable set.
+ /// </summary>
+ /// <returns>Lite topic immutable set</returns>
+ public ISet<string> GetLiteTopicSet()
+ {
+ return _liteSubscriptionManager.GetLiteTopicSet();
+ }
+
+ /// <summary>
+ /// Get the load balancing group for the consumer.
+ /// </summary>
+ /// <returns>Consumer load balancing group</returns>
+ public new string GetConsumerGroup()
+ {
+ return ConsumerGroup;
+ }
+
+ /// <summary>
+ /// Handle settings command from server, sync lite subscription quota.
+ /// </summary>
+ internal override void OnSettingsCommand(Endpoints endpoints,
Proto.Settings settings)
+ {
+ base.OnSettingsCommand(endpoints, settings);
+ _liteSubscriptionManager.Sync(settings);
+ }
+
+ /// <summary>
+ /// Handle notify unsubscribe lite command from server.
+ /// </summary>
+ internal override void OnNotifyUnsubscribeLiteCommand(Endpoints
endpoints, Proto.NotifyUnsubscribeLiteCommand command)
+ {
+ _liteSubscriptionManager.OnNotifyUnsubscribeLiteCommand(command);
+ }
+
+ /// <summary>
+ /// Wrap heartbeat request with LITE_PUSH_CONSUMER client type.
+ /// </summary>
+ internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
+ {
+ return new Proto.HeartbeatRequest
+ {
+ ClientType = Proto.ClientType.LitePushConsumer,
+ Group = GetProtobufGroup()
+ };
+ }
+
+ /// <summary>
+ /// Get settings with LITE_PUSH_CONSUMER client type.
+ /// </summary>
+ internal override Settings GetSettings()
+ {
+ // Create LitePushSubscriptionSettings with correct ClientType
+ var clientConfig = GetClientConfig();
+ var liteSettings = new LitePushSubscriptionSettings(
+ clientConfig.Namespace,
+ ClientId,
+ Endpoints,
+ ConsumerGroup,
+ clientConfig.RequestTimeout,
+ GetSubscriptionExpressions());
+ return liteSettings;
+ }
+
+ /// <summary>
+ /// Get client type for this lite push consumer.
+ /// </summary>
+ /// <returns>The client type (LITE_PUSH_CONSUMER).</returns>
+ protected override ClientType GetClientType()
+ {
+ return ClientType.LitePushConsumer;
+ }
+
+ /// <summary>
+ /// Get the bind topic for lite subscriptions.
+ /// </summary>
+ public string GetBindTopic()
+ {
+ return _bindTopic;
+ }
+
+ /// <summary>
+ /// Builder for creating LitePushConsumer instances.
+ /// </summary>
+ public new class Builder
+ {
+ private ClientConfig _clientConfig;
+ private string _consumerGroup;
+ private string _bindTopic;
+ private ConcurrentDictionary<string, FilterExpression>
_subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>();
+ private IMessageListener _messageListener;
+ private int _maxCacheMessageCount = 1024;
+ private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
+ private int _consumptionThreadCount = 20;
+ private bool _enableFifoConsumeAccelerator;
+
+ public Builder SetClientConfig(ClientConfig clientConfig)
+ {
+ Preconditions.CheckArgument(clientConfig != null,
"clientConfig should not be null");
+ _clientConfig = clientConfig;
+ return this;
+ }
+
+ public Builder SetConsumerGroup(string consumerGroup)
+ {
+
Preconditions.CheckArgument(!string.IsNullOrWhiteSpace(consumerGroup),
"consumerGroup should not be null or empty");
+
Preconditions.CheckArgument(PushConsumer.ConsumerGroupRegex.Match(consumerGroup).Success,
+ $"consumerGroup does not match the regex
{PushConsumer.ConsumerGroupRegex}");
+ _consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public Builder SetBindTopic(string bindTopic)
+ {
+
Preconditions.CheckArgument(!string.IsNullOrWhiteSpace(bindTopic), "bindTopic
should not be null or empty");
+ _bindTopic = bindTopic;
+ // Default subscription: (bindTopic, *) for code reuse.
+ _subscriptionExpressions = new ConcurrentDictionary<string,
FilterExpression>
+ {
+ [bindTopic] = FilterExpression.SubAll
+ };
+ return this;
+ }
+
+ public Builder SetSubscriptionExpressions(Dictionary<string,
FilterExpression> subscriptionExpressions)
+ {
+ if (subscriptionExpressions != null)
+ {
+ _subscriptionExpressions = new
ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions);
+ }
+ else
+ {
+ _subscriptionExpressions = new
ConcurrentDictionary<string, FilterExpression>();
+ }
+ return this;
+ }
+
+ public Builder SetMessageListener(IMessageListener messageListener)
+ {
+ Preconditions.CheckArgument(messageListener != null,
"messageListener should not be null");
+ _messageListener = messageListener;
+ return this;
+ }
+
+ public Builder SetMaxCacheMessageCount(int maxCacheMessageCount)
+ {
+ Preconditions.CheckArgument(maxCacheMessageCount > 0,
"maxCacheMessageCount should be positive");
+ _maxCacheMessageCount = maxCacheMessageCount;
+ return this;
+ }
+
+ public Builder SetMaxCacheMessageSizeInBytes(int
maxCacheMessageSizeInBytes)
+ {
+ Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0,
"maxCacheMessageSizeInBytes should be positive");
+ _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
+ return this;
+ }
+
+ public Builder SetConsumptionThreadCount(int
consumptionThreadCount)
+ {
+ Preconditions.CheckArgument(consumptionThreadCount > 0,
"consumptionThreadCount should be positive");
+ _consumptionThreadCount = consumptionThreadCount;
+ return this;
+ }
+
+ public Builder SetEnableFifoConsumeAccelerator(bool
enableFifoConsumeAccelerator)
+ {
+ _enableFifoConsumeAccelerator = enableFifoConsumeAccelerator;
+ return this;
+ }
+
+ public async Task<LitePushConsumer> Build()
+ {
+ Preconditions.CheckArgument(_clientConfig != null,
"clientConfig has not been set yet");
+
Preconditions.CheckArgument(!string.IsNullOrWhiteSpace(_consumerGroup),
"consumerGroup has not been set yet");
+
Preconditions.CheckArgument(!string.IsNullOrWhiteSpace(_bindTopic), "bindTopic
has not been set yet");
+ Preconditions.CheckArgument(_messageListener != null,
"messageListener has not been set yet");
+
+ var litePushConsumer = new LitePushConsumer(
+ _clientConfig,
+ _consumerGroup,
+ _bindTopic,
+ _subscriptionExpressions,
+ _messageListener,
+ _maxCacheMessageCount,
+ _maxCacheMessageSizeInBytes,
+ _consumptionThreadCount,
+ _enableFifoConsumeAccelerator);
+
+ await litePushConsumer.Start();
+ return litePushConsumer;
+ }
+ }
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/LitePushSubscriptionSettings.cs
b/csharp/rocketmq-client-csharp/LitePushSubscriptionSettings.cs
new file mode 100644
index 00000000..d332202e
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/LitePushSubscriptionSettings.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Logging;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Settings for LitePushConsumer, extends PushSubscriptionSettings with
LITE_PUSH_CONSUMER client type.
+ /// </summary>
+ public class LitePushSubscriptionSettings : PushSubscriptionSettings
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger<LitePushSubscriptionSettings>();
+
+ public LitePushSubscriptionSettings(string namespaceName, string
clientId, Endpoints endpoints,
+ string consumerGroup, TimeSpan requestTimeout,
+ ConcurrentDictionary<string, FilterExpression>
subscriptionExpressions)
+ : base(namespaceName, clientId, ClientType.LitePushConsumer,
endpoints, consumerGroup, requestTimeout, subscriptionExpressions)
+ {
+ // LitePushConsumer uses LITE_PUSH_CONSUMER client type instead of
PUSH_CONSUMER
+ }
+
+ public override Proto.Settings ToProtobuf()
+ {
+ var settings = base.ToProtobuf();
+ // Ensure the ClientType is set to LITE_PUSH_CONSUMER
+ settings.ClientType =
ClientTypeHelper.ToProtobuf(ClientType.LitePushConsumer);
+ return settings;
+ }
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/LiteSubscriptionManager.cs
b/csharp/rocketmq-client-csharp/LiteSubscriptionManager.cs
new file mode 100644
index 00000000..8501c633
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/LiteSubscriptionManager.cs
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Org.Apache.Rocketmq.Error;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Manages lite topic subscriptions for lite push consumer.
+ /// Handles subscription synchronization, quota management, and server
notifications.
+ /// </summary>
+ internal class LiteSubscriptionManager
+ {
+ private static readonly ILogger Logger =
MqLogManager.CreateLogger<LiteSubscriptionManager>();
+
+ private readonly PushConsumer _consumer;
+ private readonly string _bindTopic;
+ private readonly string _consumerGroup;
+ private readonly ConcurrentDictionary<string, byte> _liteTopicSet;
+
+ // Client-side lite subscription quota limit
+ private volatile int _liteSubscriptionQuota;
+ private volatile int _maxLiteTopicSize = 64;
+
+ public LiteSubscriptionManager(PushConsumer consumer, string
bindTopic, string consumerGroup)
+ {
+ _consumer = consumer;
+ _bindTopic = bindTopic;
+ _consumerGroup = consumerGroup;
+ _liteTopicSet = new ConcurrentDictionary<string, byte>();
+ }
+
+ public void Start()
+ {
+ // Sync all after startup to initialize lite subscription
infrastructure
+ // This triggers the initial sync even if liteTopicSet is empty
+ Logger.LogInformation($"Starting LiteSubscriptionManager,
bindTopic={GetBindTopicName()}, consumerGroup={GetConsumerGroupName()}");
+
+ try
+ {
+ SyncAllLiteSubscription();
+ Logger.LogInformation($"Initial lite subscription sync
completed successfully");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Failed to perform initial lite
subscription sync");
+ }
+
+ // Schedule periodic sync every 30 seconds using Timer
+ var timer = new System.Threading.Timer(
+ callback: state =>
+ {
+ try
+ {
+ SyncAllLiteSubscription();
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Error in periodic
syncAllLiteSubscription, clientId={_consumer.GetClientId()}");
+ }
+ },
+ state: null,
+ dueTime: TimeSpan.FromSeconds(30),
+ period: TimeSpan.FromSeconds(30)
+ );
+
+ Logger.LogInformation($"LiteSubscriptionManager started
successfully, scheduled periodic sync every 30 seconds");
+ }
+
+ public string GetBindTopicName()
+ {
+ return _bindTopic;
+ }
+
+ public string GetConsumerGroupName()
+ {
+ return _consumerGroup;
+ }
+
+ public ISet<string> GetLiteTopicSet()
+ {
+ // Return a snapshot copy to avoid concurrent modification
+ lock (_liteTopicSet)
+ {
+ return new HashSet<string>(_liteTopicSet.Keys);
+ }
+ }
+
+ public void Sync(Proto.Settings settings)
+ {
+ // If subscription doesn't exist, return early
+ if (settings.Subscription.Equals(default(Proto.Subscription)))
+ {
+ return;
+ }
+
+ var subscription = settings.Subscription;
+ if (subscription.HasLiteSubscriptionQuota)
+ {
+ _liteSubscriptionQuota = subscription.LiteSubscriptionQuota;
+ }
+ if (subscription.HasMaxLiteTopicSize)
+ {
+ _maxLiteTopicSize = subscription.MaxLiteTopicSize;
+ }
+ }
+
+ public async Task SubscribeLite(string liteTopic, OffsetOption
offsetOption = null)
+ {
+ _consumer.CheckRunning();
+
+ if (_liteTopicSet.ContainsKey(liteTopic))
+ {
+ return;
+ }
+
+ ValidateLiteTopic(liteTopic, _maxLiteTopicSize);
+ CheckLiteSubscriptionQuota(1);
+
+ try
+ {
+ await SyncLiteSubscription(
+ Proto.LiteSubscriptionAction.PartialAdd,
+ new[] { liteTopic },
+ offsetOption);
+ }
+ catch (Exception)
+ {
+ Logger.LogError($"Failed to subscribeLite {liteTopic},
topic={GetBindTopicName()}, group={GetConsumerGroupName()},
clientId={_consumer.GetClientId()}");
+ throw;
+ }
+
+ _liteTopicSet.TryAdd(liteTopic, 0);
+ Logger.LogInformation($"SubscribeLite success,
liteTopic={liteTopic}, topic={GetBindTopicName()},
group={GetConsumerGroupName()}, clientId={_consumer.GetClientId()}");
+ }
+
+ public async Task UnsubscribeLite(string liteTopic)
+ {
+ _consumer.CheckRunning();
+
+ if (!_liteTopicSet.ContainsKey(liteTopic))
+ {
+ return;
+ }
+
+ try
+ {
+ await SyncLiteSubscription(
+ Proto.LiteSubscriptionAction.PartialRemove,
+ new[] { liteTopic },
+ null);
+ }
+ catch (Exception)
+ {
+ Logger.LogError($"Failed to unsubscribeLite {liteTopic},
topic={GetBindTopicName()}, group={GetConsumerGroupName()},
clientId={_consumer.GetClientId()}");
+ throw;
+ }
+
+ _liteTopicSet.TryRemove(liteTopic, out _);
+ Logger.LogInformation($"UnsubscribeLite success,
liteTopic={liteTopic}, topic={GetBindTopicName()},
group={GetConsumerGroupName()}, clientId={_consumer.GetClientId()}");
+ }
+
+ public void SyncAllLiteSubscription()
+ {
+ try
+ {
+ CheckLiteSubscriptionQuota(0);
+ var liteTopics = _liteTopicSet.Keys.ToList();
+ Logger.LogDebug($"Syncing all lite subscriptions,
count={liteTopics.Count}, topics=[{string.Join(", ", liteTopics)}]");
+
+ SyncLiteSubscription(
+ Proto.LiteSubscriptionAction.CompleteAdd,
+ liteTopics,
+ null).Wait();
+
+ Logger.LogDebug($"SyncAllLiteSubscription completed
successfully, clientId={_consumer.GetClientId()}");
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"Schedule syncAllLiteSubscription error,
clientId={_consumer.GetClientId()}, bindTopic={GetBindTopicName()},
consumerGroup={GetConsumerGroupName()}");
+ }
+ }
+
+ private async Task SyncLiteSubscription(
+ Proto.LiteSubscriptionAction action,
+ IEnumerable<string> diff,
+ OffsetOption offsetOption)
+ {
+ var builder = new Proto.SyncLiteSubscriptionRequest
+ {
+ Action = action,
+ Topic = new Proto.Resource
+ {
+ ResourceNamespace = _consumer.Namespace,
+ Name = _bindTopic
+ },
+ Group = new Proto.Resource
+ {
+ ResourceNamespace = _consumer.Namespace,
+ Name = _consumerGroup
+ }
+ };
+
+ builder.LiteTopicSet.AddRange(diff);
+
+ if (offsetOption != null)
+ {
+ builder.OffsetOption =
OffsetOptionHelper.ToProtobuf(offsetOption);
+ }
+
+ var requestTimeout = _consumer.GetRequestTimeout();
+ var response = await _consumer.SyncLiteSubscription(builder,
requestTimeout);
+
+ StatusChecker.Check(response.Status, builder, "");
+ }
+
+ public void
OnNotifyUnsubscribeLiteCommand(Proto.NotifyUnsubscribeLiteCommand command)
+ {
+ var liteTopic = command.LiteTopic;
+ Logger.LogInformation($"Notify unsubscribe lite command received,
liteTopic={liteTopic}, group={GetConsumerGroupName()},
bindTopic={GetBindTopicName()}");
+
+ if (!string.IsNullOrWhiteSpace(liteTopic))
+ {
+ _liteTopicSet.TryRemove(liteTopic, out _);
+ }
+ }
+
+ private void ValidateLiteTopic(string liteTopic, int maxLength)
+ {
+ if (string.IsNullOrWhiteSpace(liteTopic))
+ {
+ throw new ArgumentException("liteTopic is blank",
nameof(liteTopic));
+ }
+
+ if (liteTopic.Length > maxLength)
+ {
+ var errorMessage = $"liteTopic length exceeded max length
{maxLength}, liteTopic: {liteTopic}";
+ throw new ArgumentException(errorMessage, nameof(liteTopic));
+ }
+ }
+
+ private void CheckLiteSubscriptionQuota(int delta)
+ {
+ if (_liteTopicSet.Count + delta > _liteSubscriptionQuota)
+ {
+ throw new LiteSubscriptionQuotaExceededException(
+ $"Lite subscription quota exceeded
{_liteSubscriptionQuota}, current size: {_liteTopicSet.Count}, delta: {delta}");
+ }
+ }
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/Message.cs
b/csharp/rocketmq-client-csharp/Message.cs
index c1e2094e..85b129e6 100644
--- a/csharp/rocketmq-client-csharp/Message.cs
+++ b/csharp/rocketmq-client-csharp/Message.cs
@@ -27,7 +27,8 @@ namespace Org.Apache.Rocketmq
internal static readonly Regex TopicRegex = new
Regex("^[%a-zA-Z0-9_-]+$");
private Message(string topic, byte[] body, string tag, List<string>
keys,
- Dictionary<string, string> properties, DateTime?
deliveryTimestamp, string messageGroup)
+ Dictionary<string, string> properties, DateTime?
deliveryTimestamp, string messageGroup,
+ int? priority, string liteTopic)
{
Topic = topic;
Tag = tag;
@@ -36,6 +37,8 @@ namespace Org.Apache.Rocketmq
Properties = properties;
DeliveryTimestamp = deliveryTimestamp;
MessageGroup = messageGroup;
+ Priority = priority;
+ LiteTopic = liteTopic;
}
internal Message(Message message)
@@ -47,6 +50,8 @@ namespace Org.Apache.Rocketmq
Properties = message.Properties;
MessageGroup = message.MessageGroup;
DeliveryTimestamp = message.DeliveryTimestamp;
+ Priority = message.Priority;
+ LiteTopic = message.LiteTopic;
}
public string Topic { get; }
@@ -62,12 +67,26 @@ namespace Org.Apache.Rocketmq
public string MessageGroup { get; }
+ /// <summary>
+ /// Gets the lite topic for dynamic topic routing.
+ /// Only applicable for LITE message type. Enables flexible message
routing without pre-defining all topics.
+ /// Cannot be used with: messageGroup, deliveryTimestamp, or priority.
+ /// </summary>
+ public string LiteTopic { get; }
+
+ /// <summary>
+ /// Gets the priority of the message.
+ /// Only applicable for PRIORITY topic type. Higher values indicate
higher priority.
+ /// Priority must be >= 0 and cannot be used with: messageGroup,
deliveryTimestamp, or liteTopic.
+ /// </summary>
+ public int? Priority { get; }
+
public override string ToString()
{
return
$"{nameof(Topic)}: {Topic}, {nameof(Tag)}: {Tag},
{nameof(Keys)}: {string.Join(", ", Keys)}, {nameof(Properties)}: " +
$"{string.Join(", ", Properties.Select(kvp =>
kvp.ToString()))}, {nameof(DeliveryTimestamp)}: {DeliveryTimestamp},
{nameof(MessageGroup)}: " +
- $"{MessageGroup}";
+ $"{MessageGroup}, {nameof(Priority)}: {Priority},
{nameof(LiteTopic)}: {LiteTopic}";
}
public class Builder
@@ -79,6 +98,8 @@ namespace Org.Apache.Rocketmq
private readonly Dictionary<string, string> _properties = new
Dictionary<string, string>();
private DateTime? _deliveryTimestamp;
private string _messageGroup;
+ private int? _priority;
+ private string _liteTopic;
public Builder SetTopic(string topic)
{
@@ -131,6 +152,10 @@ namespace Org.Apache.Rocketmq
{
Preconditions.CheckArgument(null == _messageGroup,
"deliveryTimestamp and messageGroup should not be set at
same time");
+ Preconditions.CheckArgument(!_priority.HasValue,
+ "deliveryTimestamp and priority should not be set at same
time");
+ Preconditions.CheckArgument(null == _liteTopic,
+ "deliveryTimestamp and liteTopic should not be set at same
time");
_deliveryTimestamp = DateTimeKind.Utc == deliveryTimestamp.Kind
? TimeZoneInfo.ConvertTimeFromUtc(deliveryTimestamp,
TimeZoneInfo.Local)
: deliveryTimestamp;
@@ -143,15 +168,64 @@ namespace Org.Apache.Rocketmq
"messageGroup should not be null or white space");
Preconditions.CheckArgument(null == _deliveryTimestamp,
"messageGroup and deliveryTimestamp should not be set at
same time");
+ Preconditions.CheckArgument(!_priority.HasValue,
+ "messageGroup and priority should not be set at same
time");
+ Preconditions.CheckArgument(null == _liteTopic,
+ "messageGroup and liteTopic should not be set at same
time");
_messageGroup = messageGroup;
return this;
}
+ /// <summary>
+ /// Sets the lite topic for lite message.
+ /// LiteTopic enables dynamic topic routing without pre-defining
all topics.
+ /// Cannot be used together with: messageGroup, deliveryTimestamp,
or priority.
+ /// </summary>
+ /// <param name="liteTopic">The lite topic name for dynamic
routing.</param>
+ /// <returns>The builder instance for method chaining.</returns>
+ /// <exception cref="ArgumentException">Thrown when liteTopic is
empty or conflicts with other message properties.</exception>
+ public Builder SetLiteTopic(string liteTopic)
+ {
+ Preconditions.CheckArgument(null == _deliveryTimestamp,
+ "liteTopic and deliveryTimestamp should not be set at same
time");
+ Preconditions.CheckArgument(null == _messageGroup,
+ "liteTopic and messageGroup should not be set at same
time");
+ Preconditions.CheckArgument(null == _priority,
+ "liteTopic and priority should not be set at same time");
+
Preconditions.CheckArgument(!string.IsNullOrWhiteSpace(liteTopic),
+ "liteTopic should not be null or white space");
+ _liteTopic = liteTopic;
+ return this;
+ }
+
+ /// <summary>
+ /// Sets the priority for priority message.
+ /// Priority must be >= 0 (higher value = higher priority).
+ /// Cannot be used together with: messageGroup, deliveryTimestamp,
or liteTopic.
+ /// </summary>
+ /// <param name="priority">The priority level. Higher values
indicate higher priority.</param>
+ /// <returns>The builder instance for method chaining.</returns>
+ /// <exception cref="ArgumentException">Thrown when priority is
negative or conflicts with other message properties.</exception>
+ public Builder SetPriority(int priority)
+ {
+ Preconditions.CheckArgument(priority >= 0,
+ "priority must be greater than or equal to 0");
+ Preconditions.CheckArgument(null == _deliveryTimestamp,
+ "priority and deliveryTimestamp should not be set at same
time");
+ Preconditions.CheckArgument(null == _messageGroup,
+ "priority and messageGroup should not be set at same
time");
+ Preconditions.CheckArgument(null == _liteTopic,
+ "priority and liteTopic should not be set at same time");
+ _priority = priority;
+ return this;
+ }
+
public Message Build()
{
Preconditions.CheckArgument(null != _topic, "topic has not
been set yet");
Preconditions.CheckArgument(null != _body, "body has not been
set yet");
- return new Message(_topic, _body, _tag, _keys, _properties,
_deliveryTimestamp, _messageGroup);
+ return new Message(_topic, _body, _tag, _keys, _properties,
_deliveryTimestamp, _messageGroup,
+ _priority, _liteTopic);
}
}
}
diff --git a/csharp/rocketmq-client-csharp/MessageType.cs
b/csharp/rocketmq-client-csharp/MessageType.cs
index 09896a0b..11681dcc 100644
--- a/csharp/rocketmq-client-csharp/MessageType.cs
+++ b/csharp/rocketmq-client-csharp/MessageType.cs
@@ -24,7 +24,9 @@ namespace Org.Apache.Rocketmq
{
Normal,
Fifo,
+ Lite,
Delay,
+ Priority,
Transaction
}
@@ -38,8 +40,12 @@ namespace Org.Apache.Rocketmq
return MessageType.Normal;
case Proto.MessageType.Fifo:
return MessageType.Fifo;
+ case Proto.MessageType.Lite:
+ return MessageType.Lite;
case Proto.MessageType.Delay:
return MessageType.Delay;
+ case Proto.MessageType.Priority:
+ return MessageType.Priority;
case Proto.MessageType.Transaction:
return MessageType.Transaction;
case Proto.MessageType.Unspecified:
@@ -54,7 +60,9 @@ namespace Org.Apache.Rocketmq
{
MessageType.Normal => Proto.MessageType.Normal,
MessageType.Fifo => Proto.MessageType.Fifo,
+ MessageType.Lite => Proto.MessageType.Lite,
MessageType.Delay => Proto.MessageType.Delay,
+ MessageType.Priority => Proto.MessageType.Priority,
MessageType.Transaction => Proto.MessageType.Transaction,
_ => Proto.MessageType.Unspecified
};
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs
b/csharp/rocketmq-client-csharp/MessageView.cs
index d38df9e6..d20015a8 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -38,7 +38,7 @@ namespace Org.Apache.Rocketmq
private MessageView(string messageId, string topic, byte[] body,
string tag, string messageGroup,
DateTime? deliveryTimestamp, List<string> keys, Dictionary<string,
string> properties, string bornHost,
DateTime bornTime, int deliveryAttempt, MessageQueue messageQueue,
string receiptHandle, long offset,
- bool corrupted)
+ bool corrupted, int? priority = null)
{
MessageId = messageId;
Topic = topic;
@@ -55,6 +55,7 @@ namespace Org.Apache.Rocketmq
ReceiptHandle = receiptHandle;
_offset = offset;
_corrupted = corrupted;
+ Priority = priority;
}
public string MessageId { get; }
@@ -79,6 +80,11 @@ namespace Org.Apache.Rocketmq
public int DeliveryAttempt { get; set; }
+ /// <summary>
+ /// Gets the priority of the message, which makes sense only when the
topic type is PRIORITY.
+ /// </summary>
+ public int? Priority { get; }
+
public int IncrementAndGetDeliveryAttempt()
{
return ++DeliveryAttempt;
@@ -184,6 +190,7 @@ namespace Org.Apache.Rocketmq
TimeZoneInfo.ConvertTimeFromUtc(systemProperties.BornTimestamp.ToDateTime(),
TimeZoneInfo.Local);
var deliveryAttempt = systemProperties.DeliveryAttempt;
var queueOffset = systemProperties.QueueOffset;
+ var priority = systemProperties.HasPriority ?
(int?)systemProperties.Priority : null;
var properties = new Dictionary<string, string>();
foreach (var (key, value) in message.UserProperties)
{
@@ -193,7 +200,7 @@ namespace Org.Apache.Rocketmq
var receiptHandle = systemProperties.ReceiptHandle;
return new MessageView(messageId, topic, body, tag, messageGroup,
deliveryTime, keys, properties, bornHost,
- bornTime, deliveryAttempt, messageQueue, receiptHandle,
queueOffset, corrupted);
+ bornTime, deliveryAttempt, messageQueue, receiptHandle,
queueOffset, corrupted, priority);
}
public override string ToString()
@@ -201,7 +208,8 @@ namespace Org.Apache.Rocketmq
return $"{nameof(MessageId)}: {MessageId}, {nameof(Topic)}:
{Topic}, {nameof(Tag)}: {Tag}," +
$" {nameof(MessageGroup)}: {MessageGroup},
{nameof(DeliveryTimestamp)}: {DeliveryTimestamp}," +
$" {nameof(Keys)}: {string.Join(", ", Keys)},
{nameof(Properties)}: {string.Join(", ", Properties.Select(kvp =>
kvp.ToString()))}, {nameof(BornHost)}: {BornHost}, " +
- $"{nameof(BornTime)}: {BornTime},
{nameof(DeliveryAttempt)}: {DeliveryAttempt}";
+ $"{nameof(BornTime)}: {BornTime},
{nameof(DeliveryAttempt)}: {DeliveryAttempt}, " +
+ $"{nameof(Priority)}: {Priority}";
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/OffsetOption.cs
b/csharp/rocketmq-client-csharp/OffsetOption.cs
new file mode 100644
index 00000000..8909cafc
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/OffsetOption.cs
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Represents the offset option for consuming messages from a specific
position.
+ /// </summary>
+ public class OffsetOption
+ {
+ public const long PolicyLastValue = 0L;
+ public const long PolicyMinValue = 1L;
+ public const long PolicyMaxValue = 2L;
+
+ public static readonly OffsetOption LastOffset = new
OffsetOption(OffsetType.Policy, PolicyLastValue);
+ public static readonly OffsetOption MinOffset = new
OffsetOption(OffsetType.Policy, PolicyMinValue);
+ public static readonly OffsetOption MaxOffset = new
OffsetOption(OffsetType.Policy, PolicyMaxValue);
+
+ private readonly OffsetType _type;
+ private readonly long _value;
+
+ private OffsetOption(OffsetType type, long value)
+ {
+ _type = type;
+ _value = value;
+ }
+
+ /// <summary>
+ /// Creates an offset option from a specific offset value.
+ /// </summary>
+ /// <param name="offset">The offset value (must be >= 0)</param>
+ /// <returns>OffsetOption instance</returns>
+ public static OffsetOption OfOffset(long offset)
+ {
+ if (offset < 0)
+ {
+ throw new ArgumentException("offset must be greater than or
equal to 0");
+ }
+ return new OffsetOption(OffsetType.Offset, offset);
+ }
+
+ /// <summary>
+ /// Creates an offset option from tail N messages.
+ /// </summary>
+ /// <param name="tailN">Number of messages from tail (must be >=
0)</param>
+ /// <returns>OffsetOption instance</returns>
+ public static OffsetOption OfTailN(long tailN)
+ {
+ if (tailN < 0)
+ {
+ throw new ArgumentException("tailN must be greater than or
equal to 0");
+ }
+ return new OffsetOption(OffsetType.TailN, tailN);
+ }
+
+ /// <summary>
+ /// Creates an offset option from a timestamp.
+ /// </summary>
+ /// <param name="timestamp">Unix timestamp in milliseconds (must be >=
0)</param>
+ /// <returns>OffsetOption instance</returns>
+ public static OffsetOption OfTimestamp(long timestamp)
+ {
+ if (timestamp < 0)
+ {
+ throw new ArgumentException("timestamp must be greater than or
equal to 0");
+ }
+ return new OffsetOption(OffsetType.Timestamp, timestamp);
+ }
+
+ public OffsetType Type => _type;
+ public long Value => _value;
+
+ public override bool Equals(object obj)
+ {
+ if (obj == null || GetType() != obj.GetType())
+ {
+ return false;
+ }
+
+ var other = (OffsetOption)obj;
+ return _value == other._value && _type == other._type;
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ int hashCode = (int)_type;
+ hashCode = (hashCode * 397) ^ _value.GetHashCode();
+ return hashCode;
+ }
+ }
+
+ public override string ToString()
+ {
+ return $"OffsetOption(type={_type}, value={_value})";
+ }
+ }
+
+ /// <summary>
+ /// Type of offset option.
+ /// </summary>
+ public enum OffsetType
+ {
+ Policy,
+ Offset,
+ TailN,
+ Timestamp
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/OffsetOptionHelper.cs
b/csharp/rocketmq-client-csharp/OffsetOptionHelper.cs
new file mode 100644
index 00000000..79540dad
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/OffsetOptionHelper.cs
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ /// <summary>
+ /// Helper class for converting OffsetOption to Protobuf format.
+ /// </summary>
+ internal static class OffsetOptionHelper
+ {
+ public static Proto.OffsetOption ToProtobuf(OffsetOption offsetOption)
+ {
+ if (offsetOption == null)
+ {
+ return null;
+ }
+
+ var protoBuilder = new Proto.OffsetOption();
+
+ switch (offsetOption.Type)
+ {
+ case OffsetType.Policy:
+ protoBuilder.Policy = ToProtobufPolicy(offsetOption.Value);
+ break;
+ case OffsetType.Offset:
+ protoBuilder.Offset = offsetOption.Value;
+ break;
+ case OffsetType.TailN:
+ protoBuilder.TailN = offsetOption.Value;
+ break;
+ case OffsetType.Timestamp:
+ protoBuilder.Timestamp = offsetOption.Value;
+ break;
+ default:
+ throw new ArgumentException($"Unknown OffsetOption type:
{offsetOption.Type}");
+ }
+
+ return protoBuilder;
+ }
+
+ private static Proto.OffsetOption.Types.Policy ToProtobufPolicy(long
policyValue)
+ {
+ if (policyValue == OffsetOption.PolicyLastValue)
+ {
+ return Proto.OffsetOption.Types.Policy.Last;
+ }
+ else if (policyValue == OffsetOption.PolicyMinValue)
+ {
+ return Proto.OffsetOption.Types.Policy.Min;
+ }
+ else if (policyValue == OffsetOption.PolicyMaxValue)
+ {
+ return Proto.OffsetOption.Types.Policy.Max;
+ }
+ else
+ {
+ throw new ArgumentException($"Unknown policy value:
{policyValue}");
+ }
+ }
+ }
+}
diff --git a/csharp/rocketmq-client-csharp/Producer.cs
b/csharp/rocketmq-client-csharp/Producer.cs
index 0d4be45c..12513a08 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -348,6 +348,25 @@ namespace Org.Apache.Rocketmq
return recallReceipt;
}
+ /// <summary>
+ /// Recalls a timed/delay message asynchronously using the recall
handle.
+ /// This is useful for recalling messages that were scheduled for
future delivery.
+ /// </summary>
+ /// <param name="topic">The topic of the message to recall.</param>
+ /// <param name="recallhandle">The recall handle obtained when sending
the delay message.</param>
+ /// <returns>A task representing the asynchronous operation,
containing the recall receipt.</returns>
+ public Task<IRecallReceipt> RecallMessageAsync(string topic, string
recallhandle)
+ {
+ return RecallMessage0(topic, recallhandle).ContinueWith(t =>
+ {
+ if (t.IsFaulted)
+ {
+ throw t.Exception;
+ }
+ return (IRecallReceipt)t.Result;
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+
private async Task<RecallReceipt> RecallMessage0(string topic, string
recallhandle)
{
if (State.Running != State)
diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs
b/csharp/rocketmq-client-csharp/PublishingMessage.cs
index d214598d..15ba773d 100644
--- a/csharp/rocketmq-client-csharp/PublishingMessage.cs
+++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs
@@ -44,8 +44,12 @@ namespace Org.Apache.Rocketmq
// Generate message id.
MessageId = MessageIdGenerator.GetInstance().Next();
+
// For NORMAL message.
- if (string.IsNullOrEmpty(message.MessageGroup) &&
!message.DeliveryTimestamp.HasValue &&
+ if (string.IsNullOrEmpty(message.MessageGroup) &&
+ string.IsNullOrEmpty(message.LiteTopic) &&
+ !message.Priority.HasValue &&
+ !message.DeliveryTimestamp.HasValue &&
!txEnabled)
{
MessageType = MessageType.Normal;
@@ -59,6 +63,13 @@ namespace Org.Apache.Rocketmq
return;
}
+ // For LITE message.
+ if (!string.IsNullOrEmpty(message.LiteTopic) && !txEnabled)
+ {
+ MessageType = MessageType.Lite;
+ return;
+ }
+
// For DELAY message.
if (message.DeliveryTimestamp.HasValue && !txEnabled)
{
@@ -66,14 +77,31 @@ namespace Org.Apache.Rocketmq
return;
}
+ // For PRIORITY message.
+ if (message.Priority.HasValue && !txEnabled)
+ {
+ MessageType = MessageType.Priority;
+ return;
+ }
+
// For TRANSACTION message.
- if (!string.IsNullOrEmpty(message.MessageGroup) ||
message.DeliveryTimestamp.HasValue || !txEnabled)
+ if (txEnabled)
{
- throw new InternalErrorException(
- "Transactional message should not set messageGroup or
deliveryTimestamp");
+ // Transaction semantics is conflicted with
fifo/lite/delay/priority
+ if (!string.IsNullOrEmpty(message.MessageGroup) ||
+ !string.IsNullOrEmpty(message.LiteTopic) ||
+ message.Priority.HasValue ||
+ message.DeliveryTimestamp.HasValue)
+ {
+ throw new InternalErrorException(
+ "Transactional message should not set messageGroup,
liteTopic, priority or deliveryTimestamp");
+ }
+ MessageType = MessageType.Transaction;
+ return;
}
- MessageType = MessageType.Transaction;
+ // Should not reach here
+ throw new InternalErrorException("Failed to determine message
type");
}
public Proto::Message ToProtobuf(string namespaceName, int queueId)
@@ -103,6 +131,16 @@ namespace Org.Apache.Rocketmq
systemProperties.MessageGroup = MessageGroup;
}
+ if (!string.IsNullOrEmpty(LiteTopic))
+ {
+ systemProperties.LiteTopic = LiteTopic;
+ }
+
+ if (Priority.HasValue)
+ {
+ systemProperties.Priority = Priority.Value;
+ }
+
var topicResource = new Proto.Resource
{
ResourceNamespace = namespaceName,
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs
b/csharp/rocketmq-client-csharp/PushConsumer.cs
index c0cd1dbb..1f5364f4 100644
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ b/csharp/rocketmq-client-csharp/PushConsumer.cs
@@ -623,6 +623,12 @@ namespace Org.Apache.Rocketmq
}
}
+ internal override void OnSettingsCommand(Endpoints endpoints,
Proto.Settings settings)
+ {
+ base.OnSettingsCommand(endpoints, settings);
+ // LitePushConsumer will override this to handle lite subscription
quota sync
+ }
+
internal override NotifyClientTerminationRequest
WrapNotifyClientTerminationRequest()
{
return new NotifyClientTerminationRequest()
@@ -660,6 +666,15 @@ namespace Org.Apache.Rocketmq
return _pushSubscriptionSettings;
}
+ /// <summary>
+ /// Get client type for this push consumer.
+ /// </summary>
+ /// <returns>The client type (PUSH_CONSUMER).</returns>
+ protected override ClientType GetClientType()
+ {
+ return ClientType.PushConsumer;
+ }
+
/// <summary>
/// Gets the load balancing group for the consumer.
/// </summary>
@@ -693,7 +708,7 @@ namespace Org.Apache.Rocketmq
return _consumeService;
}
- private Proto.Resource GetProtobufGroup()
+ protected Proto.Resource GetProtobufGroup()
{
return new Proto.Resource()
{
@@ -702,6 +717,56 @@ namespace Org.Apache.Rocketmq
};
}
+ /// <summary>
+ /// Check if the consumer is running.
+ /// </summary>
+ internal void CheckRunning()
+ {
+ if (State != State.Running)
+ {
+ throw new InvalidOperationException("Push consumer is not
running");
+ }
+ }
+
+ /// <summary>
+ /// Get the client ID.
+ /// </summary>
+ internal string GetClientId()
+ {
+ return ClientId;
+ }
+
+ /// <summary>
+ /// Check if the consumer is disposed.
+ /// </summary>
+ internal bool IsDisposed()
+ {
+ return State == State.Terminated || State == State.Failed;
+ }
+
+ /// <summary>
+ /// Get the request timeout from client config.
+ /// </summary>
+ internal TimeSpan GetRequestTimeout()
+ {
+ return _clientConfig.RequestTimeout;
+ }
+
+ /// <summary>
+ /// Get the namespace from client config.
+ /// </summary>
+ internal string Namespace => _clientConfig.Namespace;
+
+ /// <summary>
+ /// Sync lite subscription for lite push consumer.
+ /// </summary>
+ internal async Task<Proto.SyncLiteSubscriptionResponse>
SyncLiteSubscription(
+ Proto.SyncLiteSubscriptionRequest request, TimeSpan timeout)
+ {
+ var invocation = await
ClientManager.SyncLiteSubscription(Endpoints, request, timeout);
+ return invocation.Response;
+ }
+
public class Builder
{
private ClientConfig _clientConfig;
diff --git a/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
index b2ff5193..ec64044e 100644
--- a/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
+++ b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs
@@ -28,15 +28,24 @@ namespace Org.Apache.Rocketmq
{
private static readonly ILogger Logger =
MqLogManager.CreateLogger<PushSubscriptionSettings>();
- private readonly Resource _group;
- private readonly ConcurrentDictionary<string, FilterExpression>
_subscriptionExpressions;
+ protected readonly Resource _group;
+ protected readonly ConcurrentDictionary<string, FilterExpression>
_subscriptionExpressions;
private volatile bool _fifo = false;
private volatile int _receiveBatchSize = 32;
private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30);
public PushSubscriptionSettings(string namespaceName, string clientId,
Endpoints endpoints, string consumerGroup,
TimeSpan requestTimeout, ConcurrentDictionary<string,
FilterExpression> subscriptionExpressions)
- : base(namespaceName, clientId, ClientType.PushConsumer,
endpoints, requestTimeout)
+ : this(namespaceName, clientId, ClientType.PushConsumer,
endpoints, consumerGroup, requestTimeout, subscriptionExpressions)
+ {
+ }
+
+ /// <summary>
+ /// Protected constructor for derived classes to specify custom
ClientType.
+ /// </summary>
+ protected PushSubscriptionSettings(string namespaceName, string
clientId, ClientType clientType, Endpoints endpoints, string consumerGroup,
+ TimeSpan requestTimeout, ConcurrentDictionary<string,
FilterExpression> subscriptionExpressions)
+ : base(namespaceName, clientId, clientType, endpoints,
requestTimeout)
{
_group = new Resource(namespaceName, consumerGroup);
_subscriptionExpressions = subscriptionExpressions;
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs
b/csharp/rocketmq-client-csharp/RpcClient.cs
index c6540c8e..0001ad7f 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -198,5 +198,15 @@ namespace Org.Apache.Rocketmq
var call = _stub.RecallMessageAsync(request, callOptions);
return await call.ResponseAsync;
}
+
+ public async Task<Proto::SyncLiteSubscriptionResponse>
SyncLiteSubscription(Metadata metadata,
+ Proto.SyncLiteSubscriptionRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.SyncLiteSubscriptionAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Session.cs
b/csharp/rocketmq-client-csharp/Session.cs
index d47b6060..2d898ad4 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -119,6 +119,13 @@ namespace Org.Apache.Rocketmq
_client.OnPrintThreadStackTraceCommand(_endpoints,
response.PrintThreadStackTraceCommand);
break;
}
+ case
Proto.TelemetryCommand.CommandOneofCase.NotifyUnsubscribeLiteCommand:
+ {
+ Logger.LogInformation(
+ $"Receive notify unsubscribe lite command
from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}");
+
_client.OnNotifyUnsubscribeLiteCommand(_endpoints,
response.NotifyUnsubscribeLiteCommand);
+ break;
+ }
default:
{
Logger.LogWarning(
diff --git a/protos b/protos
index 5c9f8419..68c2cc94 160000
--- a/protos
+++ b/protos
@@ -1 +1 @@
-Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19
+Subproject commit 68c2cc9442928f769f8938515a05af6fa05c9993