This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83b8bc6  Refactor ClientManager and RpcClient (#9)
83b8bc6 is described below

commit 83b8bc64273b47549642a54f3e47214c57d2570c
Author: aaron ai <[email protected]>
AuthorDate: Mon Feb 21 14:35:14 2022 +0800

    Refactor ClientManager and RpcClient (#9)
---
 rocketmq-client-csharp/ClientManager.cs | 92 ++++++++++++++-------------------
 rocketmq-client-csharp/IRpcClient.cs    | 14 ++---
 rocketmq-client-csharp/RpcClient.cs     | 91 +++++++++++++++++++++++---------
 tests/RpcClientTest.cs                  | 29 +++--------
 tests/UnitTest1.cs                      |  4 +-
 5 files changed, 122 insertions(+), 108 deletions(-)

diff --git a/rocketmq-client-csharp/ClientManager.cs 
b/rocketmq-client-csharp/ClientManager.cs
index 59fec83..87697d0 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -15,71 +15,63 @@
  * limitations under the License.
  */
 
-using System.Collections.Concurrent;
 
-using rmq = global::apache.rocketmq.v1;
-using Grpc.Net.Client;
+using rmq = apache.rocketmq.v1;
 using System;
 using System.Threading;
 using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using System.Collections.Generic;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
 
 namespace org.apache.rocketmq {
     public class ClientManager : IClientManager {
 
         public ClientManager() {
-            rpcClients = new ConcurrentDictionary<string, RpcClient>();
+            _rpcClients = new Dictionary<string, RpcClient>();
+            _clientLock = new ReaderWriterLockSlim();
         }
 
-        public IRpcClient getRpcClient(string target) {
-            if (!rpcClients.ContainsKey(target)) {
-                var channel = GrpcChannel.ForAddress(target, new 
GrpcChannelOptions {
-                    HttpHandler = createHttpHandler()
-                });
-                var invoker = channel.Intercept(new ClientLoggerInterceptor());
-                var client = new 
rmq::MessagingService.MessagingServiceClient(invoker);
-                var rpcClient = new RpcClient(client);
-                if(rpcClients.TryAdd(target, rpcClient)) {
-                    return rpcClient;
+        public IRpcClient getRpcClient(string target)
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
                 }
             }
-            return rpcClients[target];
-        }
+            finally
+            {
+                _clientLock.ExitReadLock();
+            }
 
-        /**
-         * See 
https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0
 for performance consideration and
-         * why parameters are configured this way.
-         */
-        public static HttpMessageHandler createHttpHandler()
-        {
-            var sslOptions = new 
System.Net.Security.SslClientAuthenticationOptions();
-            // Disable server certificate validation during development phase.
-            // Comment out the following line if server certificate validation 
is required. 
-            sslOptions.RemoteCertificateValidationCallback = (sender, cert, 
chain, sslPolicyErrors) => { return true; };
-            var handler = new SocketsHttpHandler
+            _clientLock.EnterWriteLock();
+            try
+            {
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
+                }
+
+                var client = new RpcClient(target);
+                _rpcClients.Add(target, client);
+                return client;
+            }
+            finally
             {
-                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
-                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
-                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
-                EnableMultipleHttp2Connections = true,
-                SslOptions = sslOptions,
-            };
-            return handler;
+                _clientLock.ExitWriteLock();
+            }
         }
 
         public async Task<TopicRouteData> resolveRoute(string target, 
grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
         {
             var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var queryRouteResponse = await rpcClient.queryRoute(request, 
callOptions);
+            var queryRouteResponse = await rpcClient.QueryRoute(metadata, 
request, timeout);
 
-            if (queryRouteResponse.Common.Status.Code != 
((int)Google.Rpc.Code.Ok)) {
+            if (queryRouteResponse.Common.Status.Code != 
((int)Google.Rpc.Code.Ok))
+            {
                 // Raise an application layer exception
-
             }
 
             var partitions = new List<Partition>();
@@ -147,9 +139,7 @@ namespace org.apache.rocketmq {
         public async Task<Boolean> heartbeat(string target, grpc::Metadata 
metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
         {
             var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc.CallOptions(metadata, deadline);
-            var response = await rpcClient.heartbeat(request, callOptions);
+            var response = await rpcClient.Heartbeat(metadata, request, 
timeout);
             if (null == response)
             {
                 return false;
@@ -161,22 +151,18 @@ namespace org.apache.rocketmq {
         public async Task<rmq::SendMessageResponse> sendMessage(string target, 
grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
         {
             var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = await rpcClient.sendMessage(request, callOptions);
+            var response = await rpcClient.SendMessage(metadata, request, 
timeout);
             return response;
         }
 
         public async Task<Boolean> notifyClientTermination(string target, 
grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan 
timeout)
         {
             var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            rmq::NotifyClientTerminationResponse response = await 
rpcClient.notifyClientTermination(request, callOptions);
+            rmq::NotifyClientTerminationResponse response = await 
rpcClient.NotifyClientTermination(metadata, request, timeout);
             return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
         }
 
-        private ConcurrentDictionary<string, RpcClient> rpcClients;
-
+        private readonly Dictionary<string, RpcClient> _rpcClients;
+        private readonly ReaderWriterLockSlim _clientLock;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs 
b/rocketmq-client-csharp/IRpcClient.cs
index 0590bb0..f46afae 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -14,21 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
 using System.Threading.Tasks;
 using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using Grpc.Core;
 
 namespace org.apache.rocketmq
 {
     public interface IRpcClient
     {
-        Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, 
grpc::CallOptions callOptions);
+        Task<QueryRouteResponse> QueryRoute(Metadata metadata, 
QueryRouteRequest request, TimeSpan timeout);
 
-        Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, 
grpc::CallOptions callOptions);
+        Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest 
request, TimeSpan timeout);
 
-        Task<NotifyClientTerminationResponse> 
notifyClientTermination(NotifyClientTerminationRequest request, 
grpc::CallOptions callOptions);
+        Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata 
metadata,
+            NotifyClientTerminationRequest request, TimeSpan timeout);
 
-        Task<SendMessageResponse> sendMessage(SendMessageRequest request, 
grpc::CallOptions callOptions);
+        Task<SendMessageResponse> SendMessage(Metadata metadata, 
SendMessageRequest request, TimeSpan timeout);
     }
-}
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs 
b/rocketmq-client-csharp/RpcClient.cs
index 0191e91..8279953 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -15,47 +15,90 @@
  * limitations under the License.
  */
 
+using System;
+using System.Net.Http;
+using System.Net.Security;
+using System.Threading;
 using System.Threading.Tasks;
 using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
 
-namespace org.apache.rocketmq {
-    public class RpcClient : IRpcClient {
-        public RpcClient(MessagingService.MessagingServiceClient client) {
-            stub = client;
+namespace org.apache.rocketmq
+{
+    public class RpcClient : IRpcClient
+    {
+        private readonly MessagingService.MessagingServiceClient _stub;
+
+        public RpcClient(string target)
+        {
+            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            {
+                HttpHandler = CreateHttpHandler()
+            });
+            var invoker = channel.Intercept(new ClientLoggerInterceptor());
+            _stub = new MessagingService.MessagingServiceClient(invoker);
         }
 
-        public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest 
request, grpc::CallOptions callOptions)
+        /**
+         * See 
https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0
 for performance consideration and
+         * why parameters are configured this way.
+         */
+        private HttpMessageHandler CreateHttpHandler()
         {
-            var call = stub.QueryRouteAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            var status = call.GetStatus();
-            if (status.StatusCode != grpc.StatusCode.OK) {
-                //TODO: Something is wrong, raise an exception here.
-            }
-            return response;
+            var sslOptions = new SslClientAuthenticationOptions();
+            // Disable server certificate validation during development phase.
+            // Comment out the following line if server certificate validation 
is required. 
+            sslOptions.RemoteCertificateValidationCallback = (sender, cert, 
chain, sslPolicyErrors) => { return true; };
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+                EnableMultipleHttp2Connections = true,
+                SslOptions = sslOptions,
+            };
+            return handler;
         }
 
-        public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest 
request, grpc::CallOptions callOptions)
+        public async Task<QueryRouteResponse> QueryRoute(Metadata metadata, 
QueryRouteRequest request, TimeSpan timeout)
         {
-            var call = stub.HeartbeatAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.QueryRouteAsync(request, callOptions);
+            return await call.ResponseAsync;
         }
 
-        public async Task<NotifyClientTerminationResponse> 
notifyClientTermination(NotifyClientTerminationRequest request, 
grpc::CallOptions callOptions)
+
+        public async Task<HeartbeatResponse> Heartbeat(Metadata metadata, 
HeartbeatRequest request, TimeSpan timeout)
         {
-            var call = stub.NotifyClientTerminationAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.HeartbeatAsync(request, callOptions);
+            return await call.ResponseAsync;
         }
 
-        public async Task<SendMessageResponse> sendMessage(SendMessageRequest 
request, grpc::CallOptions callOptions)
+        public async Task<SendMessageResponse> SendMessage(Metadata metadata, 
SendMessageRequest request,
+            TimeSpan timeout)
         {
-            var call = stub.SendMessageAsync(request, callOptions);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.SendMessageAsync(request, callOptions);
             return await call.ResponseAsync;
         }
 
-        private MessagingService.MessagingServiceClient stub;
+        public async Task<NotifyClientTerminationResponse> 
NotifyClientTermination(Metadata metadata,
+            NotifyClientTerminationRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.NotifyClientTerminationAsync(request, 
callOptions);
+            return await call.ResponseAsync;
+        }
     }
 }
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 5425973..f06603f 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -35,13 +35,7 @@ namespace org.apache.rocketmq
         public static void SetUp(TestContext context)
         {
             string target = string.Format("https://{0}:{1}";, host, port);
-            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
-            {
-                HttpHandler = ClientManager.createHttpHandler()
-            });
-            var invoker = channel.Intercept(new ClientLoggerInterceptor());
-            var client = new 
rmq::MessagingService.MessagingServiceClient(invoker);
-            rpcClient = new RpcClient(client);
+            rpcClient = new RpcClient(target);
 
             clientConfig = new ClientConfig();
             var credentialsProvider = new ConfigFileCredentialsProvider();
@@ -72,10 +66,8 @@ namespace org.apache.rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.queryRoute(request, 
callOptions).GetAwaiter().GetResult();
+            
+            var response = rpcClient.QueryRoute(metadata, request, 
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
         }
 
 
@@ -92,10 +84,8 @@ namespace org.apache.rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.heartbeat(request, 
callOptions).GetAwaiter().GetResult();
+            
+            var response = rpcClient.Heartbeat(metadata, request, 
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
         }
 
         [TestMethod]
@@ -122,10 +112,7 @@ namespace org.apache.rocketmq
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
 
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-
-            var response = rpcClient.sendMessage(request, 
callOptions).GetAwaiter().GetResult();
+            var response = rpcClient.SendMessage(metadata, request, 
TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
         }
 
         // Remove the Ignore annotation if server has fixed
@@ -142,9 +129,7 @@ namespace org.apache.rocketmq
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
 
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.notifyClientTermination(request, 
callOptions).GetAwaiter().GetResult();
+            var response = rpcClient.NotifyClientTermination(metadata, 
request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
         }
 
         private static string resourceNamespace = 
"MQ_INST_1080056302921134_BXuIbML7";
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index 4689e52..6ea8edf 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -42,9 +42,7 @@ namespace tests
 
         [TestMethod]
         public void TestRpcClientImplCtor() {
-            using var channel = 
GrpcChannel.ForAddress("https://localhost:5001";);
-            var client = new MessagingService.MessagingServiceClient(channel);
-            RpcClient impl = new RpcClient(client);
+            RpcClient impl = new RpcClient("https://localhost:5001";);
         }
     }
 }

Reply via email to