This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new eecb3a67 [ISSUE #891] Implement message recalling API in C# SDK
eecb3a67 is described below
commit eecb3a678abf2acc38d741fe6f23c0bc33d1d925
Author: Jack Tsai <[email protected]>
AuthorDate: Wed Jan 22 09:56:15 2025 +0800
[ISSUE #891] Implement message recalling API in C# SDK
---
csharp/rocketmq-client-csharp/ClientManager.cs | 9 +++++
csharp/rocketmq-client-csharp/IClientManager.cs | 10 ++++++
.../{ISendReceipt.cs => IRecallReceipt.cs} | 2 +-
csharp/rocketmq-client-csharp/IRpcClient.cs | 2 ++
csharp/rocketmq-client-csharp/ISendReceipt.cs | 1 +
csharp/rocketmq-client-csharp/Producer.cs | 27 ++++++++++++++
.../{ISendReceipt.cs => RecallReceipt.cs} | 14 ++++++--
csharp/rocketmq-client-csharp/RpcClient.cs | 9 +++++
csharp/rocketmq-client-csharp/SendReceipt.cs | 12 +++++--
csharp/tests/ClientManagerTest.cs | 9 +++++
csharp/tests/ProducerTest.cs | 42 ++++++++++++++++++++++
11 files changed, 131 insertions(+), 6 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs
b/csharp/rocketmq-client-csharp/ClientManager.cs
index e42a29da..b061b5c9 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -120,6 +120,15 @@ namespace Org.Apache.Rocketmq
request, response, metadata);
}
+ public async Task<RpcInvocation<Proto.RecallMessageRequest,
Proto.RecallMessageResponse>>
+ RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest
request, TimeSpan timeout)
+ {
+ var metadata = _client.Sign();
+ var response = await
GetRpcClient(endpoints).RecallMessage(metadata, request, timeout);
+ return new RpcInvocation<Proto.RecallMessageRequest,
Proto.RecallMessageResponse>(
+ request, response, metadata);
+ }
+
public async Task<RpcInvocation<Proto.SendMessageRequest,
Proto.SendMessageResponse>> SendMessage(
Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan
timeout)
{
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs
b/csharp/rocketmq-client-csharp/IClientManager.cs
index 743df9fe..62d733e9 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -62,6 +62,16 @@ namespace Org.Apache.Rocketmq
Task<RpcInvocation<NotifyClientTerminationRequest,
NotifyClientTerminationResponse>> NotifyClientTermination(
Endpoints endpoints, NotifyClientTerminationRequest request,
TimeSpan timeout);
+ /// <summary>
+ /// Recall messages.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <param name="request">gRPC request of recalling messages.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns>Task of response.</returns>
+ Task<RpcInvocation<RecallMessageRequest, RecallMessageResponse>>
RecallMessage(
+ Endpoints endpoints, RecallMessageRequest request, TimeSpan
timeout);
+
/// <summary>
/// Send message to remote endpoints.
/// </summary>
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs
b/csharp/rocketmq-client-csharp/IRecallReceipt.cs
similarity index 95%
copy from csharp/rocketmq-client-csharp/ISendReceipt.cs
copy to csharp/rocketmq-client-csharp/IRecallReceipt.cs
index f1004b5b..8291cd66 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs
@@ -17,7 +17,7 @@
namespace Org.Apache.Rocketmq
{
- public interface ISendReceipt
+ public interface IRecallReceipt
{
string MessageId { get; }
}
diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs
b/csharp/rocketmq-client-csharp/IRpcClient.cs
index 8145ea18..eb369c2d 100644
--- a/csharp/rocketmq-client-csharp/IRpcClient.cs
+++ b/csharp/rocketmq-client-csharp/IRpcClient.cs
@@ -52,6 +52,8 @@ namespace Org.Apache.Rocketmq
Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata
metadata,
NotifyClientTerminationRequest request, TimeSpan timeout);
+ Task<RecallMessageResponse> RecallMessage(Metadata metadata,
RecallMessageRequest request, TimeSpan timeout);
+
Task Shutdown();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs
b/csharp/rocketmq-client-csharp/ISendReceipt.cs
index f1004b5b..eeba4e03 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs
@@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq
public interface ISendReceipt
{
string MessageId { get; }
+ string RecallHandle { get; }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs
b/csharp/rocketmq-client-csharp/Producer.cs
index 1986182f..0d4be45c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -342,6 +342,33 @@ namespace Org.Apache.Rocketmq
StatusChecker.Check(invocation.Response.Status, request,
invocation.RequestId);
}
+ public async Task<IRecallReceipt> RecallMessage(string topic, string
recallhandle)
+ {
+ var recallReceipt = await RecallMessage0(topic, recallhandle);
+ return recallReceipt;
+ }
+
+ private async Task<RecallReceipt> RecallMessage0(string topic, string
recallhandle)
+ {
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Producer is not running");
+ }
+ if (recallhandle == null)
+ {
+ throw new InvalidOperationException("Recall handle is
invalid");
+ }
+ var request = new Proto.RecallMessageRequest
+ {
+ Topic = new Proto.Resource { ResourceNamespace =
ClientConfig.Namespace, Name = topic },
+ RecallHandle = recallhandle
+ };
+ var invocation =
+ await ClientManager.RecallMessage(new
Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout);
+ StatusChecker.Check(invocation.Response.Status, request,
invocation.RequestId);
+ return new RecallReceipt(invocation.Response.MessageId);
+ }
+
public class Builder
{
private ClientConfig _clientConfig;
diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs
b/csharp/rocketmq-client-csharp/RecallReceipt.cs
similarity index 72%
copy from csharp/rocketmq-client-csharp/ISendReceipt.cs
copy to csharp/rocketmq-client-csharp/RecallReceipt.cs
index f1004b5b..80cf120c 100644
--- a/csharp/rocketmq-client-csharp/ISendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs
@@ -17,8 +17,18 @@
namespace Org.Apache.Rocketmq
{
- public interface ISendReceipt
+ public sealed class RecallReceipt : IRecallReceipt
{
- string MessageId { get; }
+ public RecallReceipt(string messageId)
+ {
+ MessageId = messageId;
+ }
+
+ public string MessageId { get; }
+
+ public override string ToString()
+ {
+ return $"{nameof(MessageId)}: {MessageId}";
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs
b/csharp/rocketmq-client-csharp/RpcClient.cs
index eeff96e5..c6540c8e 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -189,5 +189,14 @@ namespace Org.Apache.Rocketmq
var call = _stub.NotifyClientTerminationAsync(request,
callOptions);
return await call.ResponseAsync;
}
+
+ public async Task<Proto::RecallMessageResponse> RecallMessage(Metadata
metadata, Proto.RecallMessageRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.RecallMessageAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs
b/csharp/rocketmq-client-csharp/SendReceipt.cs
index c9fe8014..2a49a817 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -23,15 +23,21 @@ namespace Org.Apache.Rocketmq
{
public sealed class SendReceipt : ISendReceipt
{
- private SendReceipt(string messageId, string transactionId,
MessageQueue messageQueue)
+ private SendReceipt(string messageId, string transactionId,
MessageQueue messageQueue, long offset, string recallHandle)
{
MessageId = messageId;
TransactionId = transactionId;
MessageQueue = messageQueue;
+ Offset = offset;
+ RecallHandle = recallHandle;
}
public string MessageId { get; }
+ public string RecallHandle { get; }
+
+ public long Offset { get; }
+
public string TransactionId { get; }
private MessageQueue MessageQueue { get; }
@@ -40,7 +46,7 @@ namespace Org.Apache.Rocketmq
public override string ToString()
{
- return $"{nameof(MessageId)}: {MessageId}";
+ return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}:
{RecallHandle}";
}
public static IEnumerable<SendReceipt>
ProcessSendMessageResponse(MessageQueue mq,
@@ -58,7 +64,7 @@ namespace Org.Apache.Rocketmq
// May throw exception.
StatusChecker.Check(status, invocation.Request,
invocation.RequestId);
- return invocation.Response.Entries.Select(entry => new
SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
+ return invocation.Response.Entries.Select(entry => new
SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset,
entry.RecallHandle)).ToList();
}
}
}
\ No newline at end of file
diff --git a/csharp/tests/ClientManagerTest.cs
b/csharp/tests/ClientManagerTest.cs
index 5e4e7eef..be3697d6 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -121,6 +121,15 @@ namespace tests
// Expect no exception thrown.
}
+ [TestMethod]
+ public void TestRecallMessage()
+ {
+ var request = new RecallMessageRequest();
+ _clientManager.RecallMessage(FakeEndpoints, request,
TimeSpan.FromSeconds(1));
+ _clientManager.RecallMessage(null, request,
TimeSpan.FromSeconds(1));
+ // Expect no exception thrown.
+ }
+
private Client CreateTestClient()
{
return new Producer(_clientConfig, new
ConcurrentDictionary<string, bool>(), 1, null);
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index ce0cca15..8fe53000 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -96,6 +96,48 @@ namespace tests
It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()),
Times.Exactly(maxAttempts));
}
+ [TestMethod]
+ public async Task TestRecall()
+ {
+ var producer = CreateTestClient();
+ producer.State = State.Running;
+ var metadata = producer.Sign();
+ var recallReceipt = new
RecallReceipt(MessageIdGenerator.GetInstance().Next());
+ var recallMessageResponse = new Proto.RecallMessageResponse
+ {
+ Status = new Proto.Status
+ {
+ Code = Proto.Code.Ok
+ },
+ MessageId = recallReceipt.MessageId
+ };
+ var recallMessageInvocation = new
RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(null,
+ recallMessageResponse, metadata);
+ var mockClientManager = new Mock<IClientManager>();
+ producer.SetClientManager(mockClientManager.Object);
+ mockClientManager.Setup(cm =>
cm.RecallMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.RecallMessageRequest>(),
It.IsAny<TimeSpan>())).Returns(Task.FromResult(recallMessageInvocation));
+ await producer.RecallMessage("testTopic", "handle");
+ mockClientManager.Verify(cm =>
cm.RecallMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()),
Times.Once);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentException))]
+ public async Task TestRecallFailure()
+ {
+ var producer = CreateTestClient();
+ producer.State = State.Running;
+ var mockClientManager = new Mock<IClientManager>();
+ producer.SetClientManager(mockClientManager.Object);
+ var exception = new ArgumentException();
+ mockClientManager.Setup(cm =>
cm.RecallMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.RecallMessageRequest>(),
It.IsAny<TimeSpan>())).Throws(exception);
+ await producer.RecallMessage("testTopic", "handle");
+ mockClientManager.Verify(cm =>
cm.RecallMessage(It.IsAny<Endpoints>(),
+ It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()),
Times.Once);
+ }
+
private Producer CreateTestClient()
{
const string host0 = "127.0.0.1";