This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new 83b8bc6 Refactor ClientManager and RpcClient (#9)
83b8bc6 is described below
commit 83b8bc64273b47549642a54f3e47214c57d2570c
Author: aaron ai <[email protected]>
AuthorDate: Mon Feb 21 14:35:14 2022 +0800
Refactor ClientManager and RpcClient (#9)
---
rocketmq-client-csharp/ClientManager.cs | 92 ++++++++++++++-------------------
rocketmq-client-csharp/IRpcClient.cs | 14 ++---
rocketmq-client-csharp/RpcClient.cs | 91 +++++++++++++++++++++++---------
tests/RpcClientTest.cs | 29 +++--------
tests/UnitTest1.cs | 4 +-
5 files changed, 122 insertions(+), 108 deletions(-)
diff --git a/rocketmq-client-csharp/ClientManager.cs
b/rocketmq-client-csharp/ClientManager.cs
index 59fec83..87697d0 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -15,71 +15,63 @@
* limitations under the License.
*/
-using System.Collections.Concurrent;
-using rmq = global::apache.rocketmq.v1;
-using Grpc.Net.Client;
+using rmq = apache.rocketmq.v1;
using System;
using System.Threading;
using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using System.Collections.Generic;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
namespace org.apache.rocketmq {
public class ClientManager : IClientManager {
public ClientManager() {
- rpcClients = new ConcurrentDictionary<string, RpcClient>();
+ _rpcClients = new Dictionary<string, RpcClient>();
+ _clientLock = new ReaderWriterLockSlim();
}
- public IRpcClient getRpcClient(string target) {
- if (!rpcClients.ContainsKey(target)) {
- var channel = GrpcChannel.ForAddress(target, new
GrpcChannelOptions {
- HttpHandler = createHttpHandler()
- });
- var invoker = channel.Intercept(new ClientLoggerInterceptor());
- var client = new
rmq::MessagingService.MessagingServiceClient(invoker);
- var rpcClient = new RpcClient(client);
- if(rpcClients.TryAdd(target, rpcClient)) {
- return rpcClient;
+ public IRpcClient getRpcClient(string target)
+ {
+ _clientLock.EnterReadLock();
+ try
+ {
+ if (_rpcClients.ContainsKey(target))
+ {
+ return _rpcClients[target];
}
}
- return rpcClients[target];
- }
+ finally
+ {
+ _clientLock.ExitReadLock();
+ }
- /**
- * See
https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0
for performance consideration and
- * why parameters are configured this way.
- */
- public static HttpMessageHandler createHttpHandler()
- {
- var sslOptions = new
System.Net.Security.SslClientAuthenticationOptions();
- // Disable server certificate validation during development phase.
- // Comment out the following line if server certificate validation
is required.
- sslOptions.RemoteCertificateValidationCallback = (sender, cert,
chain, sslPolicyErrors) => { return true; };
- var handler = new SocketsHttpHandler
+ _clientLock.EnterWriteLock();
+ try
+ {
+ if (_rpcClients.ContainsKey(target))
+ {
+ return _rpcClients[target];
+ }
+
+ var client = new RpcClient(target);
+ _rpcClients.Add(target, client);
+ return client;
+ }
+ finally
{
- PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
- KeepAlivePingDelay = TimeSpan.FromSeconds(60),
- KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
- EnableMultipleHttp2Connections = true,
- SslOptions = sslOptions,
- };
- return handler;
+ _clientLock.ExitWriteLock();
+ }
}
public async Task<TopicRouteData> resolveRoute(string target,
grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
{
var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var queryRouteResponse = await rpcClient.queryRoute(request,
callOptions);
+ var queryRouteResponse = await rpcClient.QueryRoute(metadata,
request, timeout);
- if (queryRouteResponse.Common.Status.Code !=
((int)Google.Rpc.Code.Ok)) {
+ if (queryRouteResponse.Common.Status.Code !=
((int)Google.Rpc.Code.Ok))
+ {
// Raise an application layer exception
-
}
var partitions = new List<Partition>();
@@ -147,9 +139,7 @@ namespace org.apache.rocketmq {
public async Task<Boolean> heartbeat(string target, grpc::Metadata
metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
{
var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc.CallOptions(metadata, deadline);
- var response = await rpcClient.heartbeat(request, callOptions);
+ var response = await rpcClient.Heartbeat(metadata, request,
timeout);
if (null == response)
{
return false;
@@ -161,22 +151,18 @@ namespace org.apache.rocketmq {
public async Task<rmq::SendMessageResponse> sendMessage(string target,
grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
{
var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = await rpcClient.sendMessage(request, callOptions);
+ var response = await rpcClient.SendMessage(metadata, request,
timeout);
return response;
}
public async Task<Boolean> notifyClientTermination(string target,
grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan
timeout)
{
var rpcClient = getRpcClient(target);
- var deadline = DateTime.UtcNow.Add(timeout);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- rmq::NotifyClientTerminationResponse response = await
rpcClient.notifyClientTermination(request, callOptions);
+ rmq::NotifyClientTerminationResponse response = await
rpcClient.NotifyClientTermination(metadata, request, timeout);
return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
}
- private ConcurrentDictionary<string, RpcClient> rpcClients;
-
+ private readonly Dictionary<string, RpcClient> _rpcClients;
+ private readonly ReaderWriterLockSlim _clientLock;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs
b/rocketmq-client-csharp/IRpcClient.cs
index 0590bb0..f46afae 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -14,21 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Threading.Tasks;
using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using Grpc.Core;
namespace org.apache.rocketmq
{
public interface IRpcClient
{
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request,
grpc::CallOptions callOptions);
+ Task<QueryRouteResponse> QueryRoute(Metadata metadata,
QueryRouteRequest request, TimeSpan timeout);
- Task<HeartbeatResponse> heartbeat(HeartbeatRequest request,
grpc::CallOptions callOptions);
+ Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest
request, TimeSpan timeout);
- Task<NotifyClientTerminationResponse>
notifyClientTermination(NotifyClientTerminationRequest request,
grpc::CallOptions callOptions);
+ Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata
metadata,
+ NotifyClientTerminationRequest request, TimeSpan timeout);
- Task<SendMessageResponse> sendMessage(SendMessageRequest request,
grpc::CallOptions callOptions);
+ Task<SendMessageResponse> SendMessage(Metadata metadata,
SendMessageRequest request, TimeSpan timeout);
}
-}
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs
b/rocketmq-client-csharp/RpcClient.cs
index 0191e91..8279953 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -15,47 +15,90 @@
* limitations under the License.
*/
+using System;
+using System.Net.Http;
+using System.Net.Security;
+using System.Threading;
using System.Threading.Tasks;
using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
-namespace org.apache.rocketmq {
- public class RpcClient : IRpcClient {
- public RpcClient(MessagingService.MessagingServiceClient client) {
- stub = client;
+namespace org.apache.rocketmq
+{
+ public class RpcClient : IRpcClient
+ {
+ private readonly MessagingService.MessagingServiceClient _stub;
+
+ public RpcClient(string target)
+ {
+ var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+ {
+ HttpHandler = CreateHttpHandler()
+ });
+ var invoker = channel.Intercept(new ClientLoggerInterceptor());
+ _stub = new MessagingService.MessagingServiceClient(invoker);
}
- public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest
request, grpc::CallOptions callOptions)
+ /**
+ * See
https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0
for performance consideration and
+ * why parameters are configured this way.
+ */
+ private HttpMessageHandler CreateHttpHandler()
{
- var call = stub.QueryRouteAsync(request, callOptions);
- var response = await call.ResponseAsync;
- var status = call.GetStatus();
- if (status.StatusCode != grpc.StatusCode.OK) {
- //TODO: Something is wrong, raise an exception here.
- }
- return response;
+ var sslOptions = new SslClientAuthenticationOptions();
+ // Disable server certificate validation during development phase.
+ // Comment out the following line if server certificate validation
is required.
+ sslOptions.RemoteCertificateValidationCallback = (sender, cert,
chain, sslPolicyErrors) => { return true; };
+ var handler = new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+ KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+ KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+ EnableMultipleHttp2Connections = true,
+ SslOptions = sslOptions,
+ };
+ return handler;
}
- public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest
request, grpc::CallOptions callOptions)
+ public async Task<QueryRouteResponse> QueryRoute(Metadata metadata,
QueryRouteRequest request, TimeSpan timeout)
{
- var call = stub.HeartbeatAsync(request, callOptions);
- var response = await call.ResponseAsync;
- return response;
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.QueryRouteAsync(request, callOptions);
+ return await call.ResponseAsync;
}
- public async Task<NotifyClientTerminationResponse>
notifyClientTermination(NotifyClientTerminationRequest request,
grpc::CallOptions callOptions)
+
+ public async Task<HeartbeatResponse> Heartbeat(Metadata metadata,
HeartbeatRequest request, TimeSpan timeout)
{
- var call = stub.NotifyClientTerminationAsync(request, callOptions);
- var response = await call.ResponseAsync;
- return response;
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.HeartbeatAsync(request, callOptions);
+ return await call.ResponseAsync;
}
- public async Task<SendMessageResponse> sendMessage(SendMessageRequest
request, grpc::CallOptions callOptions)
+ public async Task<SendMessageResponse> SendMessage(Metadata metadata,
SendMessageRequest request,
+ TimeSpan timeout)
{
- var call = stub.SendMessageAsync(request, callOptions);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.SendMessageAsync(request, callOptions);
return await call.ResponseAsync;
}
- private MessagingService.MessagingServiceClient stub;
+ public async Task<NotifyClientTerminationResponse>
NotifyClientTermination(Metadata metadata,
+ NotifyClientTerminationRequest request, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new CallOptions(metadata, deadline);
+
+ var call = _stub.NotifyClientTerminationAsync(request,
callOptions);
+ return await call.ResponseAsync;
+ }
}
}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 5425973..f06603f 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -35,13 +35,7 @@ namespace org.apache.rocketmq
public static void SetUp(TestContext context)
{
string target = string.Format("https://{0}:{1}", host, port);
- var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
- {
- HttpHandler = ClientManager.createHttpHandler()
- });
- var invoker = channel.Intercept(new ClientLoggerInterceptor());
- var client = new
rmq::MessagingService.MessagingServiceClient(invoker);
- rpcClient = new RpcClient(client);
+ rpcClient = new RpcClient(target);
clientConfig = new ClientConfig();
var credentialsProvider = new ConfigFileCredentialsProvider();
@@ -72,10 +66,8 @@ namespace org.apache.rocketmq
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.queryRoute(request,
callOptions).GetAwaiter().GetResult();
+
+ var response = rpcClient.QueryRoute(metadata, request,
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
}
@@ -92,10 +84,8 @@ namespace org.apache.rocketmq
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
-
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.heartbeat(request,
callOptions).GetAwaiter().GetResult();
+
+ var response = rpcClient.Heartbeat(metadata, request,
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
}
[TestMethod]
@@ -122,10 +112,7 @@ namespace org.apache.rocketmq
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
- var deadline = DateTime.UtcNow.AddSeconds(3);
- var callOptions = new grpc::CallOptions(metadata, deadline);
-
- var response = rpcClient.sendMessage(request,
callOptions).GetAwaiter().GetResult();
+ var response = rpcClient.SendMessage(metadata, request,
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
}
// Remove the Ignore annotation if server has fixed
@@ -142,9 +129,7 @@ namespace org.apache.rocketmq
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
- var deadline = DateTime.UtcNow.AddSeconds(3);
- var callOptions = new grpc::CallOptions(metadata, deadline);
- var response = rpcClient.notifyClientTermination(request,
callOptions).GetAwaiter().GetResult();
+ var response = rpcClient.NotifyClientTermination(metadata,
request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
}
private static string resourceNamespace =
"MQ_INST_1080056302921134_BXuIbML7";
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index 4689e52..6ea8edf 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -42,9 +42,7 @@ namespace tests
[TestMethod]
public void TestRpcClientImplCtor() {
- using var channel =
GrpcChannel.ForAddress("https://localhost:5001");
- var client = new MessagingService.MessagingServiceClient(channel);
- RpcClient impl = new RpcClient(client);
+ RpcClient impl = new RpcClient("https://localhost:5001");
}
}
}