This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch observability in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit 4433efa4b1710dc0c3116d89f4f957c02c31bdc3 Author: Li Zhanhui <[email protected]> AuthorDate: Tue Jun 21 11:15:30 2022 +0800 WIP: write unit tests --- examples/Program.cs | 15 ++-- rocketmq-client-csharp/AccessPoint.cs | 5 ++ rocketmq-client-csharp/Client.cs | 28 ++++++- rocketmq-client-csharp/ClientConfig.cs | 66 ++++++++++------ rocketmq-client-csharp/ClientManager.cs | 2 +- .../ConfigFileCredentialsProvider.cs | 30 ++++--- rocketmq-client-csharp/Credentials.cs | 32 +++++--- rocketmq-client-csharp/IClient.cs | 4 +- rocketmq-client-csharp/IClientConfig.cs | 6 +- rocketmq-client-csharp/IClientManager.cs | 6 +- rocketmq-client-csharp/ICredentialsProvider.cs | 6 +- rocketmq-client-csharp/IProducer.cs | 6 +- rocketmq-client-csharp/Message.cs | 32 +++++--- rocketmq-client-csharp/MessageType.cs | 3 +- rocketmq-client-csharp/MetadataConstants.cs | 6 +- rocketmq-client-csharp/Producer.cs | 2 +- rocketmq-client-csharp/PushConsumer.cs | 2 +- rocketmq-client-csharp/RpcClient.cs | 3 +- rocketmq-client-csharp/SendStatus.cs | 6 +- rocketmq-client-csharp/Session.cs | 39 +++++---- rocketmq-client-csharp/Signature.cs | 26 +++--- rocketmq-client-csharp/SimpleConsumer.cs | 92 ++++++++++++++++++++++ .../StaticCredentialsProvider.cs | 12 ++- tests/ClientConfigTest.cs | 9 ++- tests/ClientManagerTest.cs | 11 ++- tests/ConfigFileCredentialsProviderTest.cs | 9 ++- tests/DateTimeTest.cs | 17 ++-- tests/MessageTest.cs | 21 +++-- tests/PushConsumerTest.cs | 10 ++- tests/SendResultTest.cs | 14 ++-- tests/SignatureTest.cs | 8 +- tests/{SendResultTest.cs => SimpleConsumerTest.cs} | 39 +++++---- tests/StaticCredentialsProviderTest.cs | 9 ++- tests/TopicTest.cs | 23 +++--- tests/UnitTest1.cs | 46 +++++------ 35 files changed, 447 insertions(+), 198 deletions(-) diff --git a/examples/Program.cs b/examples/Program.cs index d96f41e..09a1674 100644 --- a/examples/Program.cs +++ b/examples/Program.cs @@ -5,20 +5,24 @@ using System.Threading; namespace examples { - class Foo { + class Foo + { public int bar = 1; } class Program { - static void RT(Action action, int seconds, CancellationToken token) { - if (null == action) { + static void RT(Action action, int seconds, CancellationToken token) + { + if (null == action) + { return; } Task.Run(async () => { - while(!token.IsCancellationRequested) { + while (!token.IsCancellationRequested) + { action(); await Task.Delay(TimeSpan.FromSeconds(seconds), token); } @@ -44,7 +48,8 @@ namespace examples ThreadPool.QueueUserWorkItem((Object stateInfo) => { Console.WriteLine("From ThreadPool"); - if (stateInfo is Foo) { + if (stateInfo is Foo) + { Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar); } }, new Foo()); diff --git a/rocketmq-client-csharp/AccessPoint.cs b/rocketmq-client-csharp/AccessPoint.cs index f97d216..cf4e1f4 100644 --- a/rocketmq-client-csharp/AccessPoint.cs +++ b/rocketmq-client-csharp/AccessPoint.cs @@ -33,5 +33,10 @@ namespace Org.Apache.Rocketmq get { return _port; } set { _port = value; } } + + public string TargetUrl() + { + return $"https://{_host}:{_port}"; + } } } diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs index 1d32095..1eb368e 100644 --- a/rocketmq-client-csharp/Client.cs +++ b/rocketmq-client-csharp/Client.cs @@ -33,15 +33,31 @@ namespace Org.Apache.Rocketmq protected Client(AccessPoint accessPoint, string resourceNamespace) { + _accessPoint = accessPoint; + // Support IPv4 for now AccessPointScheme = rmq::AddressScheme.Ipv4; - var serviceEndpoint = new rmq::Address(); serviceEndpoint.Host = accessPoint.Host; serviceEndpoint.Port = accessPoint.Port; AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint }; _resourceNamespace = resourceNamespace; + + _clientSettings = new rmq::Settings(); + + _clientSettings.AccessPoint = new rmq::Endpoints(); + _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4; + _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint); + + _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3)); + + _clientSettings.UserAgent = new rmq.UA(); + _clientSettings.UserAgent.Language = rmq::Language.DotNet; + _clientSettings.UserAgent.Version = "5.0.0"; + _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString(); + _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName(); + Manager = ClientManagerFactory.getClientManager(resourceNamespace); _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>(); @@ -283,9 +299,9 @@ namespace Org.Apache.Rocketmq return $"https://{address.Host}:{address.Port}"; } - public void buildClientSetting(rmq::Settings settings) + public virtual void BuildClientSetting(rmq::Settings settings) { - + settings.MergeFrom(_clientSettings); } public void createSession(string url) @@ -388,6 +404,12 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _updateTopicRouteCts; private readonly CancellationTokenSource _healthCheckCts; + + protected readonly AccessPoint _accessPoint; + + // This field is subject changes from servers. + protected rmq::Settings _clientSettings; + private Random random = new Random(); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs index 6dc3eba..0d99cb1 100644 --- a/rocketmq-client-csharp/ClientConfig.cs +++ b/rocketmq-client-csharp/ClientConfig.cs @@ -18,11 +18,14 @@ using System; using System.Collections.Generic; using rmq = Apache.Rocketmq.V2; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ - public class ClientConfig : IClientConfig { + public class ClientConfig : IClientConfig + { - public ClientConfig() { + public ClientConfig() + { var hostName = System.Net.Dns.GetHostName(); var pid = System.Diagnostics.Process.GetCurrentProcess().Id; this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_); @@ -34,39 +37,50 @@ namespace Org.Apache.Rocketmq { this._publishing = new Publishing(); } - public string region() { + public string region() + { return _region; } - public string Region { + public string Region + { set { _region = value; } } - public string serviceName() { + public string serviceName() + { return _serviceName; } - public string ServiceName { + public string ServiceName + { set { _serviceName = value; } } - public string resourceNamespace() { + public string resourceNamespace() + { return _resourceNamespace; } - public string ResourceNamespace { + public string ResourceNamespace + { + get { return _resourceNamespace; } set { _resourceNamespace = value; } } - public ICredentialsProvider credentialsProvider() { + public ICredentialsProvider credentialsProvider() + { return credentialsProvider_; } - - public ICredentialsProvider CredentialsProvider { + + public ICredentialsProvider CredentialsProvider + { set { credentialsProvider_ = value; } } - public string tenantId() { + public string tenantId() + { return _tenantId; } - public string TenantId { + public string TenantId + { set { _tenantId = value; } } @@ -82,32 +96,40 @@ namespace Org.Apache.Rocketmq { } } - public TimeSpan getLongPollingTimeout() { + public TimeSpan getLongPollingTimeout() + { return longPollingIoTimeout_; } - public TimeSpan LongPollingTimeout { + public TimeSpan LongPollingTimeout + { set { longPollingIoTimeout_ = value; } } - public string getGroupName() { + public string getGroupName() + { return groupName_; } - public string GroupName { + public string GroupName + { set { groupName_ = value; } } - public string clientId() { + public string clientId() + { return clientId_; } - public bool isTracingEnabled() { + public bool isTracingEnabled() + { return tracingEnabled_; } - public bool TracingEnabled { + public bool TracingEnabled + { set { tracingEnabled_ = value; } } - public void setInstanceName(string instanceName) { + public void setInstanceName(string instanceName) + { this.instanceName_ = instanceName; } diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs index a0b377b..54ceff2 100644 --- a/rocketmq-client-csharp/ClientManager.cs +++ b/rocketmq-client-csharp/ClientManager.cs @@ -152,7 +152,7 @@ namespace Org.Apache.Rocketmq // TODO: List<Message> messages = new List<Message>(); - + return messages; } diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs index 9a3baa3..39dfd7e 100644 --- a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs +++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs @@ -19,36 +19,46 @@ using System; using System.Text.Json; using System.Collections.Generic; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ /** * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config * A sample config content is as follows: * {"AccessKey": "key", "AccessSecret": "secret"} */ - public class ConfigFileCredentialsProvider : ICredentialsProvider { + public class ConfigFileCredentialsProvider : ICredentialsProvider + { - public ConfigFileCredentialsProvider() { + public ConfigFileCredentialsProvider() + { var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile); string configFileRelativePath = "/.rocketmq/config"; - if (!File.Exists(home + configFileRelativePath)) { + if (!File.Exists(home + configFileRelativePath)) + { return; } - try { - using (var reader = new StreamReader(home + configFileRelativePath)) { + try + { + using (var reader = new StreamReader(home + configFileRelativePath)) + { string json = reader.ReadToEnd(); var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json); accessKey = kv["AccessKey"]; accessSecret = kv["AccessSecret"]; valid = true; - } - } catch (IOException e) { + } + } + catch (IOException) + { } } - public Credentials getCredentials() { - if (!valid) { + public Credentials getCredentials() + { + if (!valid) + { return null; } diff --git a/rocketmq-client-csharp/Credentials.cs b/rocketmq-client-csharp/Credentials.cs index 2ccafc8..a73b000 100644 --- a/rocketmq-client-csharp/Credentials.cs +++ b/rocketmq-client-csharp/Credentials.cs @@ -17,27 +17,34 @@ using System; -namespace Org.Apache.Rocketmq { - public class Credentials { +namespace Org.Apache.Rocketmq +{ + public class Credentials + { - public Credentials(string accessKey, string accessSecret) { + public Credentials(string accessKey, string accessSecret) + { this.accessKey = accessKey; this.accessSecret = accessSecret; } - public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) { + public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) + { this.accessKey = accessKey; this.accessSecret = accessSecret; this.sessionToken = sessionToken; this.expirationInstant = expirationInstant; } - public bool empty() { + public bool empty() + { return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret); } - public bool expired() { - if (DateTime.MinValue == expirationInstant) { + public bool expired() + { + if (DateTime.MinValue == expirationInstant) + { return false; } @@ -45,17 +52,20 @@ namespace Org.Apache.Rocketmq { } private string accessKey; - public string AccessKey { + public string AccessKey + { get { return accessKey; } } - + private string accessSecret; - public string AccessSecret { + public string AccessSecret + { get { return accessSecret; } } private string sessionToken; - public string SessionToken { + public string SessionToken + { get { return sessionToken; } } diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs index f4115a2..abdcc21 100644 --- a/rocketmq-client-csharp/IClient.cs +++ b/rocketmq-client-csharp/IClient.cs @@ -28,6 +28,8 @@ namespace Org.Apache.Rocketmq Task<bool> NotifyClientTermination(); - void buildClientSetting(rmq::Settings settings); + void BuildClientSetting(rmq::Settings settings); + + } } \ No newline at end of file diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs index 3726ac4..438d7a8 100644 --- a/rocketmq-client-csharp/IClientConfig.cs +++ b/rocketmq-client-csharp/IClientConfig.cs @@ -16,8 +16,10 @@ */ using System; -namespace Org.Apache.Rocketmq { - public interface IClientConfig { +namespace Org.Apache.Rocketmq +{ + public interface IClientConfig + { string region(); string serviceName(); diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs index d5c3ea3..afccfde 100644 --- a/rocketmq-client-csharp/IClientManager.cs +++ b/rocketmq-client-csharp/IClientManager.cs @@ -22,8 +22,10 @@ using grpc = global::Grpc.Core; using rmq = Apache.Rocketmq.V2; -namespace Org.Apache.Rocketmq { - public interface IClientManager { +namespace Org.Apache.Rocketmq +{ + public interface IClientManager + { IRpcClient GetRpcClient(string target); grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata); diff --git a/rocketmq-client-csharp/ICredentialsProvider.cs b/rocketmq-client-csharp/ICredentialsProvider.cs index 80e908f..1fb892b 100644 --- a/rocketmq-client-csharp/ICredentialsProvider.cs +++ b/rocketmq-client-csharp/ICredentialsProvider.cs @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -namespace Org.Apache.Rocketmq { - public interface ICredentialsProvider { +namespace Org.Apache.Rocketmq +{ + public interface ICredentialsProvider + { Credentials getCredentials(); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs index cbb82d4..9c30c6c 100644 --- a/rocketmq-client-csharp/IProducer.cs +++ b/rocketmq-client-csharp/IProducer.cs @@ -17,8 +17,10 @@ using System.Threading.Tasks; -namespace Org.Apache.Rocketmq { - public interface IProducer { +namespace Org.Apache.Rocketmq +{ + public interface IProducer + { void Start(); void Shutdown(); diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs index 5cbf1aa..b8b0e98 100644 --- a/rocketmq-client-csharp/Message.cs +++ b/rocketmq-client-csharp/Message.cs @@ -19,16 +19,20 @@ using System.Collections.Generic; namespace Org.Apache.Rocketmq { - public class Message { - public Message() : this(null, null) { + public class Message + { + public Message() : this(null, null) + { } - public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {} + public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { } - public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) { + public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) + { } - public Message(string topic, string tag, List<string> keys, byte[] body) { + public Message(string topic, string tag, List<string> keys, byte[] body) + { this.messageId = SequenceGenerator.Instance.Next(); this.maxAttemptTimes = 3; this.topic = topic; @@ -58,37 +62,43 @@ namespace Org.Apache.Rocketmq private string topic; - public string Topic { + public string Topic + { get { return topic; } set { this.topic = value; } } private byte[] body; - public byte[] Body { + public byte[] Body + { get { return body; } set { this.body = value; } } private string tag; - public string Tag { + public string Tag + { get { return tag; } set { this.tag = value; } } private List<string> keys; - public List<string> Keys{ + public List<string> Keys + { get { return keys; } set { this.keys = value; } } private Dictionary<string, string> userProperties; - public Dictionary<string, string> UserProperties { + public Dictionary<string, string> UserProperties + { get { return userProperties; } set { this.userProperties = value; } } private Dictionary<string, string> systemProperties; - internal Dictionary<string, string> SystemProperties { + internal Dictionary<string, string> SystemProperties + { get { return systemProperties; } set { this.systemProperties = value; } } diff --git a/rocketmq-client-csharp/MessageType.cs b/rocketmq-client-csharp/MessageType.cs index 8373496..a459e93 100644 --- a/rocketmq-client-csharp/MessageType.cs +++ b/rocketmq-client-csharp/MessageType.cs @@ -17,7 +17,8 @@ namespace Org.Apache.Rocketmq { - public enum MessageType { + public enum MessageType + { Normal, Fifo, Delay, diff --git a/rocketmq-client-csharp/MetadataConstants.cs b/rocketmq-client-csharp/MetadataConstants.cs index 33bd66e..5381595 100644 --- a/rocketmq-client-csharp/MetadataConstants.cs +++ b/rocketmq-client-csharp/MetadataConstants.cs @@ -17,8 +17,10 @@ using System; -namespace Org.Apache.Rocketmq { - public class MetadataConstants { +namespace Org.Apache.Rocketmq +{ + public class MetadataConstants + { public const string TENANT_ID_KEY = "x-mq-tenant-id"; public const string NAMESPACE_KEY = "x-mq-namespace"; public const string AUTHORIZATION = "authorization"; diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs index ccddc8c..bfcc1d3 100644 --- a/rocketmq-client-csharp/Producer.cs +++ b/rocketmq-client-csharp/Producer.cs @@ -139,6 +139,6 @@ namespace Org.Apache.Rocketmq } private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer; - private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + private static new readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs index def9be4..909e7a2 100644 --- a/rocketmq-client-csharp/PushConsumer.cs +++ b/rocketmq-client-csharp/PushConsumer.cs @@ -201,7 +201,7 @@ namespace Org.Apache.Rocketmq } } } - catch (System.Exception e) + catch (System.Exception) { // TODO: log exception raised. } diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs index f56a07f..e0a2caf 100644 --- a/rocketmq-client-csharp/RpcClient.cs +++ b/rocketmq-client-csharp/RpcClient.cs @@ -125,7 +125,8 @@ namespace Org.Apache.Rocketmq var callOptions = new CallOptions(metadata, deadline); var call = _stub.ReceiveMessage(request, callOptions); var result = new List<rmq::ReceiveMessageResponse>(); - while(await call.ResponseStream.MoveNext()) { + while (await call.ResponseStream.MoveNext()) + { result.Add(call.ResponseStream.Current); } return result; diff --git a/rocketmq-client-csharp/SendStatus.cs b/rocketmq-client-csharp/SendStatus.cs index b20e1c5..7586d22 100644 --- a/rocketmq-client-csharp/SendStatus.cs +++ b/rocketmq-client-csharp/SendStatus.cs @@ -15,8 +15,10 @@ * limitations under the License. */ -namespace Org.Apache.Rocketmq { - public enum SendStatus { +namespace Org.Apache.Rocketmq +{ + public enum SendStatus + { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs index dc13dc9..3e234f2 100644 --- a/rocketmq-client-csharp/Session.cs +++ b/rocketmq-client-csharp/Session.cs @@ -33,6 +33,7 @@ namespace Org.Apache.Rocketmq { this._target = target; this._stream = stream; + this._client = client; } public async Task Loop() @@ -41,36 +42,40 @@ namespace Org.Apache.Rocketmq var writer = this._stream.RequestStream; var request = new rmq::TelemetryCommand(); request.Settings = new rmq::Settings(); - _client.buildClientSetting(request.Settings); + _client.BuildClientSetting(request.Settings); await writer.WriteAsync(request); + Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}"); while (!_cts.IsCancellationRequested) { if (await reader.MoveNext(_cts.Token)) { var cmd = reader.Current; + Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}"); switch (cmd.CommandCase) { case rmq::TelemetryCommand.CommandOneofCase.None: - { - Logger.Warn($"Telemetry failed: {cmd.Status}"); - break; - } + { + Logger.Warn($"Telemetry failed: {cmd.Status}"); + break; + } case rmq::TelemetryCommand.CommandOneofCase.Settings: - { - break; - } + { + + Logger.Info($"Received settings from server {cmd.Settings.ToString()}"); + break; + } case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand: - { - break; - } + { + break; + } case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand: - { - break; - } + { + break; + } case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand: - { - break; - } + { + break; + } } } } diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs index c249253..2331b53 100644 --- a/rocketmq-client-csharp/Signature.cs +++ b/rocketmq-client-csharp/Signature.cs @@ -19,30 +19,38 @@ using System.Text; using grpc = global::Grpc.Core; using System.Security.Cryptography; -namespace Org.Apache.Rocketmq { - public class Signature { - public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) { +namespace Org.Apache.Rocketmq +{ + public class Signature + { + public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) + { metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET"); metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0"); metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId()); - if (!String.IsNullOrEmpty(clientConfig.tenantId())) { + if (!String.IsNullOrEmpty(clientConfig.tenantId())) + { metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId()); } - if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) { + if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) + { metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace()); } string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT); metadata.Add(MetadataConstants.DATE_TIME_KEY, time); - if (null != clientConfig.credentialsProvider()) { + if (null != clientConfig.credentialsProvider()) + { var credentials = clientConfig.credentialsProvider().getCredentials(); - if (null == credentials || credentials.expired()) { + if (null == credentials || credentials.expired()) + { return; } - if (!String.IsNullOrEmpty(credentials.SessionToken)) { + if (!String.IsNullOrEmpty(credentials.SessionToken)) + { metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken); } @@ -51,7 +59,7 @@ namespace Org.Apache.Rocketmq { HMACSHA1 signer = new HMACSHA1(secretData); byte[] digest = signer.ComputeHash(data); string hmac = BitConverter.ToString(digest).Replace("-", ""); - string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}", + string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}", MetadataConstants.ALGORITHM_KEY, MetadataConstants.CREDENTIAL_KEY, credentials.AccessKey, diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs new file mode 100644 index 0000000..4c447c9 --- /dev/null +++ b/rocketmq-client-csharp/SimpleConsumer.cs @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using rmq = Apache.Rocketmq.V2; +using NLog; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace Org.Apache.Rocketmq +{ + public class SimpleConsumer : Client + { + + public SimpleConsumer(AccessPoint accessPoint, + string resourceNamespace, string group) + : base(accessPoint, resourceNamespace) + { + fifo_ = false; + subscriptions_ = new ConcurrentDictionary<string, rmq.SubscriptionEntry>(); + group_ = group; + } + + public override void BuildClientSetting(rmq::Settings settings) + { + base.BuildClientSetting(settings); + + settings.ClientType = rmq::ClientType.SimpleConsumer; + settings.Subscription = new rmq::Subscription(); + settings.Subscription.Group = new rmq::Resource(); + settings.Subscription.Group.Name = Group; + settings.Subscription.Group.ResourceNamespace = ResourceNamespace; + + foreach (var entry in subscriptions_) + { + settings.Subscription.Subscriptions.Add(entry.Value); + } + } + + public override void Start() + { + base.Start(); + base.createSession(_accessPoint.TargetUrl()); + } + + public override void Shutdown() + { + base.Shutdown(); + } + + protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request) + { + } + + public void Subscribe(string topic, rmq::FilterType filterType, string expression) + { + var entry = new rmq::SubscriptionEntry(); + entry.Topic = new rmq::Resource(); + entry.Topic.Name = topic; + entry.Topic.ResourceNamespace = ResourceNamespace; + entry.Expression = new rmq::FilterExpression(); + entry.Expression.Type = filterType; + entry.Expression.Expression = expression; + subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; }); + } + + private string group_; + + public string Group + { + get { return group_; } + } + + private bool fifo_; + + private ConcurrentDictionary<string, rmq::SubscriptionEntry> subscriptions_; + + } +} \ No newline at end of file diff --git a/rocketmq-client-csharp/StaticCredentialsProvider.cs b/rocketmq-client-csharp/StaticCredentialsProvider.cs index d00dba6..edd810d 100644 --- a/rocketmq-client-csharp/StaticCredentialsProvider.cs +++ b/rocketmq-client-csharp/StaticCredentialsProvider.cs @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -namespace Org.Apache.Rocketmq { - public class StaticCredentialsProvider : ICredentialsProvider { +namespace Org.Apache.Rocketmq +{ + public class StaticCredentialsProvider : ICredentialsProvider + { - public StaticCredentialsProvider(string accessKey, string accessSecret) { + public StaticCredentialsProvider(string accessKey, string accessSecret) + { this.accessKey = accessKey; this.accessSecret = accessSecret; } - public Credentials getCredentials() { + public Credentials getCredentials() + { return new Credentials(accessKey, accessSecret); } diff --git a/tests/ClientConfigTest.cs b/tests/ClientConfigTest.cs index 427d1d2..4d8dec1 100644 --- a/tests/ClientConfigTest.cs +++ b/tests/ClientConfigTest.cs @@ -17,11 +17,14 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using System; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class ClientConfigTest { + public class ClientConfigTest + { [TestMethod] - public void testClientId() { + public void testClientId() + { var clientConfig = new ClientConfig(); string clientId = clientConfig.clientId(); Assert.IsTrue(clientId.Contains("@")); diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs index 850db63..af5983c 100644 --- a/tests/ClientManagerTest.cs +++ b/tests/ClientManagerTest.cs @@ -19,13 +19,16 @@ using Grpc.Core; using Microsoft.VisualStudio.TestTools.UnitTesting; using rmq = Apache.Rocketmq.V2; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class ClientManagerTest { - + public class ClientManagerTest + { + [TestMethod] - public void TestResolveRoute() { + public void TestResolveRoute() + { string topic = "cpp_sdk_standard"; string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7"; var request = new rmq::QueryRouteRequest(); diff --git a/tests/ConfigFileCredentialsProviderTest.cs b/tests/ConfigFileCredentialsProviderTest.cs index 0d46b98..7741295 100644 --- a/tests/ConfigFileCredentialsProviderTest.cs +++ b/tests/ConfigFileCredentialsProviderTest.cs @@ -18,11 +18,14 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using System; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class ConfigFileCredentialsProviderTest { + public class ConfigFileCredentialsProviderTest + { [TestMethod] - public void testGetCredentials() { + public void testGetCredentials() + { var provider = new ConfigFileCredentialsProvider(); var credentials = provider.getCredentials(); Assert.IsNotNull(credentials); diff --git a/tests/DateTimeTest.cs b/tests/DateTimeTest.cs index 0d9a2a7..fdf7d53 100644 --- a/tests/DateTimeTest.cs +++ b/tests/DateTimeTest.cs @@ -14,18 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - using Microsoft.VisualStudio.TestTools.UnitTesting; +using Microsoft.VisualStudio.TestTools.UnitTesting; using System; -namespace Org.Apache.Rocketmq { - +namespace Org.Apache.Rocketmq +{ + [TestClass] - public class DateTimeTest { - + public class DateTimeTest + { + [TestMethod] - public void testFormat() { + public void testFormat() + { DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56); - string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT); + string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT); string expected = "20220215T083156Z"; Assert.AreEqual(time, expected); } diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs index 2de9f54..f1c71f8 100644 --- a/tests/MessageTest.cs +++ b/tests/MessageTest.cs @@ -19,12 +19,15 @@ using System; using System.Text; using System.Collections.Generic; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class MessageTest { + public class MessageTest + { [TestMethod] - public void testCtor() { + public void testCtor() + { var msg1 = new Message(); Assert.IsNotNull(msg1.MessageId); Assert.IsTrue(msg1.MessageId.StartsWith("01")); @@ -36,7 +39,8 @@ namespace Org.Apache.Rocketmq { } [TestMethod] - public void testCtor2() { + public void testCtor2() + { string topic = "T1"; string bodyString = "body"; byte[] body = Encoding.ASCII.GetBytes(bodyString); @@ -49,7 +53,8 @@ namespace Org.Apache.Rocketmq { } [TestMethod] - public void testCtor3() { + public void testCtor3() + { string topic = "T1"; string bodyString = "body"; byte[] body = Encoding.ASCII.GetBytes(bodyString); @@ -63,7 +68,8 @@ namespace Org.Apache.Rocketmq { } [TestMethod] - public void testCtor4() { + public void testCtor4() + { string topic = "T1"; string bodyString = "body"; byte[] body = Encoding.ASCII.GetBytes(bodyString); @@ -81,7 +87,8 @@ namespace Org.Apache.Rocketmq { } [TestMethod] - public void testCtor5() { + public void testCtor5() + { string topic = "T1"; string bodyString = "body"; byte[] body = Encoding.ASCII.GetBytes(bodyString); diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs index 444530b..78f01de 100644 --- a/tests/PushConsumerTest.cs +++ b/tests/PushConsumerTest.cs @@ -24,26 +24,28 @@ namespace Org.Apache.Rocketmq public class TestMessageListener : IMessageListener { - public async Task Consume(List<Message> messages, List<Message> failed) + public Task Consume(List<Message> messages, List<Message> failed) { foreach (var message in messages) { Console.WriteLine(""); } - } + return Task.CompletedTask; + } } public class CountableMessageListener : IMessageListener { - public async Task Consume(List<Message> messages, List<Message> failed) + public Task Consume(List<Message> messages, List<Message> failed) { foreach (var message in messages) { Console.WriteLine("{}", message.MessageId); } - } + return Task.CompletedTask; + } } [TestClass] diff --git a/tests/SendResultTest.cs b/tests/SendResultTest.cs index 475cf6d..4e3d9a0 100644 --- a/tests/SendResultTest.cs +++ b/tests/SendResultTest.cs @@ -17,13 +17,16 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class SendResultTest { + public class SendResultTest + { [TestMethod] - public void testCtor() { + public void testCtor() + { string messageId = new string("abc"); var sendResult = new SendReceipt(messageId); Assert.AreEqual(messageId, sendResult.MessageId); @@ -32,7 +35,8 @@ namespace Org.Apache.Rocketmq { [TestMethod] - public void testCtor2() { + public void testCtor2() + { string messageId = new string("abc"); var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT); Assert.AreEqual(messageId, sendResult.MessageId); @@ -40,5 +44,5 @@ namespace Org.Apache.Rocketmq { } } - + } \ No newline at end of file diff --git a/tests/SignatureTest.cs b/tests/SignatureTest.cs index fd6b525..16d0f46 100644 --- a/tests/SignatureTest.cs +++ b/tests/SignatureTest.cs @@ -19,10 +19,12 @@ using grpc = global::Grpc.Core; using Moq; using System; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class SignatureTest { + public class SignatureTest + { [TestMethod] public void testSign() @@ -33,7 +35,7 @@ namespace Org.Apache.Rocketmq { mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:"); mock.Setup(x => x.serviceName()).Returns("mq"); mock.Setup(x => x.region()).Returns("cn-hangzhou"); - + string accessKey = "key"; string accessSecret = "secret"; var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret); diff --git a/tests/SendResultTest.cs b/tests/SimpleConsumerTest.cs similarity index 55% copy from tests/SendResultTest.cs copy to tests/SimpleConsumerTest.cs index 475cf6d..1bc1a45 100644 --- a/tests/SendResultTest.cs +++ b/tests/SimpleConsumerTest.cs @@ -14,31 +14,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; +using rmq = Apache.Rocketmq.V2; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class SendResultTest { + public class SimpleConsumerTest + { [TestMethod] - public void testCtor() { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId); - Assert.AreEqual(messageId, sendResult.MessageId); - Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status); - } - + public void TestStart() + { + var accessPoint = new AccessPoint(); + var host = "11.166.42.94"; + var port = 8081; + accessPoint.Host = host; + accessPoint.Port = port; + var resourceNamespace = ""; + var group = "GID_cpp_sdk_standard"; + var topic = "cpp_sdk_standard"; - [TestMethod] - public void testCtor2() { - string messageId = new string("abc"); - var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT); - Assert.AreEqual(messageId, sendResult.MessageId); - Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status); + var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group); + simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*"); + simpleConsumer.Start(); + Thread.Sleep(10_000); } } - + + } \ No newline at end of file diff --git a/tests/StaticCredentialsProviderTest.cs b/tests/StaticCredentialsProviderTest.cs index 20b9450..8b5f012 100644 --- a/tests/StaticCredentialsProviderTest.cs +++ b/tests/StaticCredentialsProviderTest.cs @@ -17,12 +17,15 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; -namespace Org.Apache.Rocketmq { +namespace Org.Apache.Rocketmq +{ [TestClass] - public class StaticCredentialsProviderTest { + public class StaticCredentialsProviderTest + { [TestMethod] - public void testGetCredentials() { + public void testGetCredentials() + { var accessKey = "key"; var accessSecret = "secret"; var provider = new StaticCredentialsProvider(accessKey, accessSecret); diff --git a/tests/TopicTest.cs b/tests/TopicTest.cs index 7d9f3f4..9f386de 100644 --- a/tests/TopicTest.cs +++ b/tests/TopicTest.cs @@ -17,13 +17,16 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using System.Collections.Generic; -namespace Org.Apache.Rocketmq { - - [TestClass] - public class TopicTest { +namespace Org.Apache.Rocketmq +{ - [TestMethod] - public void testCompareTo() { + [TestClass] + public class TopicTest + { + + [TestMethod] + public void testCompareTo() + { List<Topic> topics = new List<Topic>(); topics.Add(new Topic("ns1", "t1")); topics.Add(new Topic("ns0", "t1")); @@ -36,13 +39,13 @@ namespace Org.Apache.Rocketmq { Assert.AreEqual(topics[1].ResourceNamespace, "ns0"); Assert.AreEqual(topics[1].Name, "t1"); - + Assert.AreEqual(topics[2].ResourceNamespace, "ns1"); Assert.AreEqual(topics[2].Name, "t1"); - + } - } - } \ No newline at end of file + } +} \ No newline at end of file diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs index c0b9357..bbf537a 100644 --- a/tests/UnitTest1.cs +++ b/tests/UnitTest1.cs @@ -16,36 +16,38 @@ namespace tests public void TestMethod1() { rmq::Permission perm = rmq::Permission.None; - switch(perm) { - case rmq::Permission.None: - { - Console.WriteLine("None"); - break; - } + switch (perm) + { + case rmq::Permission.None: + { + Console.WriteLine("None"); + break; + } - case rmq::Permission.Read: - { - Console.WriteLine("Read"); - break; - } + case rmq::Permission.Read: + { + Console.WriteLine("Read"); + break; + } - case rmq::Permission.Write: - { - Console.WriteLine("Write"); - break; - } + case rmq::Permission.Write: + { + Console.WriteLine("Write"); + break; + } - case rmq::Permission.ReadWrite: - { - Console.WriteLine("ReadWrite"); - break; - } + case rmq::Permission.ReadWrite: + { + Console.WriteLine("ReadWrite"); + break; + } } } [TestMethod] - public void TestRpcClientImplCtor() { + public void TestRpcClientImplCtor() + { RpcClient impl = new RpcClient("https://localhost:5001"); }
