This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 26bb67c51c279013fabd32e4eeb7809aaf9e4f56 Author: Aaron Ai <[email protected]> AuthorDate: Mon Feb 13 17:13:57 2023 +0800 Implement simple consumer --- csharp/examples/ProducerDelayMessageExample.cs | 77 +++++---- csharp/examples/ProducerNormalMessageExample.cs | 6 +- csharp/examples/SimpleConsumerExample.cs | 68 ++++---- csharp/rocketmq-client-csharp/Address.cs | 25 ++- csharp/rocketmq-client-csharp/AddressScheme.cs | 10 +- csharp/rocketmq-client-csharp/Broker.cs | 10 ++ csharp/rocketmq-client-csharp/Client.cs | 25 +-- csharp/rocketmq-client-csharp/ClientManager.cs | 33 ++-- csharp/rocketmq-client-csharp/Consumer.cs | 99 +++++++++++ .../rocketmq-client-csharp/Error/StatusChecker.cs | 41 ----- csharp/rocketmq-client-csharp/FilterExpression.cs | 6 + csharp/rocketmq-client-csharp/IClientManager.cs | 33 ++++ csharp/rocketmq-client-csharp/MessageQueue.cs | 27 ++- csharp/rocketmq-client-csharp/MessageView.cs | 44 ++--- csharp/rocketmq-client-csharp/MqEncoding.cs | 10 +- csharp/rocketmq-client-csharp/Permission.cs | 22 ++- csharp/rocketmq-client-csharp/Producer.cs | 8 +- csharp/rocketmq-client-csharp/Publishing.cs | 4 +- .../PublishingLoadBalancer.cs | 33 ++-- .../rocketmq-client-csharp/PublishingSettings.cs | 17 +- .../{IProducer.cs => ReceiveMessageResult.cs} | 14 +- csharp/rocketmq-client-csharp/Resource.cs | 8 +- csharp/rocketmq-client-csharp/RpcClient.cs | 42 ++--- csharp/rocketmq-client-csharp/Settings.cs | 10 +- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 192 +++++++++++++++++++++ .../SimpleSubscriptionSettings.cs | 95 ++++++++++ csharp/rocketmq-client-csharp/StatusChecker.cs | 1 + .../SubscriptionLoadBalancer.cs | 38 ++-- csharp/rocketmq-client-csharp/TopicRouteData.cs | 10 +- csharp/rocketmq-client-csharp/Utilities.cs | 5 +- csharp/tests/UnitTest1.cs | 51 +++--- 31 files changed, 748 insertions(+), 316 deletions(-) diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs index 27e32e76..9ad5fbb9 100644 --- a/csharp/examples/ProducerDelayMessageExample.cs +++ b/csharp/examples/ProducerDelayMessageExample.cs @@ -24,49 +24,50 @@ using Org.Apache.Rocketmq; namespace examples { - static class ProducerDelayMessageExample + internal static class ProducerDelayMessageExample { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); internal static async Task QuickStart() { - // string accessKey = "yourAccessKey"; - // string secretKey = "yourSecretKey"; - // // Credential provider is optional for client configuration. - // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - // string endpoints = "foobar.com:8080"; - // // In most case, you don't need to create too many producers, single pattern is recommended. - // var producer = new Producer(endpoints) - // { - // CredentialsProvider = credentialsProvider - // }; - // string topic = "yourDelayTopic"; - // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch - // // the topic route before message publishing. - // producer.AddTopicOfInterest(topic); - // - // await producer.Start(); - // // Define your message body. - // var bytes = Encoding.UTF8.GetBytes("foobar"); - // string tag = "yourMessageTagA"; - // // You could set multiple keys for the single message. - // var keys = new List<string> - // { - // "yourMessageKey-2f00df144e48", - // "yourMessageKey-49df1dd332b7" - // }; - // // Set topic for current message. - // var message = new Message(topic, bytes) - // { - // Tag = tag, - // Keys = keys, - // // Essential for DELAY message. - // DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30) - // }; - // var sendReceipt = await producer.Send(message); - // Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); - // // Close the producer if you don't need it anymore. - // await producer.Shutdown(); + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); + const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; + var clientConfig = new ClientConfig(endpoints) + { + CredentialsProvider = credentialsProvider + }; + // In most case, you don't need to create too many producers, single pattern is recommended. + var producer = new Producer(clientConfig); + const string topic = "yourDelayTopic"; + // Set the topic name(s), which is optional but recommended. It makes producer could prefetch + // the topic route before message publishing. + producer.SetTopics(topic); + + await producer.Start(); + // Define your message body. + var bytes = Encoding.UTF8.GetBytes("foobar"); + const string tag = "yourMessageTagA"; + // You could set multiple keys for the single message. + var keys = new List<string> + { + "yourMessageKey-2f00df144e48", + "yourMessageKey-49df1dd332b7" + }; + // Set topic for current message. + var message = new Message(topic, bytes) + { + Tag = tag, + Keys = keys, + // Essential for DELAY message. + DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30) + }; + var sendReceipt = await producer.Send(message); + Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); + // Close the producer if you don't need it anymore. + await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs index 16791a13..3274ade2 100644 --- a/csharp/examples/ProducerNormalMessageExample.cs +++ b/csharp/examples/ProducerNormalMessageExample.cs @@ -32,6 +32,7 @@ namespace examples { const string accessKey = "5jFk0wK7OU6Uq395"; const string secretKey = "V1u8z19URHs4o6RQ"; + // Credential provider is optional for client configuration. var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080"; @@ -41,12 +42,11 @@ namespace examples }; // In most case, you don't need to create too many producers, single pattern is recommended. var producer = new Producer(clientConfig); - + const string topic = "lingchu_normal_topic"; producer.SetTopics(topic); // Set the topic name(s), which is optional but recommended. It makes producer could prefetch // the topic route before message publishing. - await producer.Start(); // Define your message body. var bytes = Encoding.UTF8.GetBytes("foobar"); @@ -67,7 +67,7 @@ namespace examples Logger.Info($"Send message successfully, sendReceipt={sendReceipt}"); Thread.Sleep(9999999); // Close the producer if you don't need it anymore. - // await producer.Shutdown(); + await producer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs index 80299fbb..b41125c8 100644 --- a/csharp/examples/SimpleConsumerExample.cs +++ b/csharp/examples/SimpleConsumerExample.cs @@ -23,47 +23,43 @@ using Org.Apache.Rocketmq; namespace examples { - static class SimpleConsumerExample + internal static class SimpleConsumerExample { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); internal static async Task QuickStart() { - // string accessKey = "yourAccessKey"; - // string secretKey = "yourSecretKey"; - // // Credential provider is optional for client configuration. - // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); - // string endpoints = "foobar.com:8080"; - // - // string consumerGroup = "yourConsumerGroup"; - // SimpleConsumer simpleConsumer = new SimpleConsumer(endpoints, consumerGroup) - // { - // CredentialsProvider = credentialsProvider - // }; - // - // string topic = "yourTopic"; - // string tag = "tagA"; - // // Set topic subscription for consumer. - // simpleConsumer.Subscribe(topic, new FilterExpression(tag, ExpressionType.Tag)); - // await simpleConsumer.Start(); - // - // int maxMessageNum = 16; - // TimeSpan invisibleDuration = TimeSpan.FromSeconds(15); - // var messages = await simpleConsumer.Receive(maxMessageNum, invisibleDuration); - // Logger.Info($"{messages.Count} messages has been received."); - // - // var tasks = new List<Task>(); - // foreach (var message in messages) - // { - // Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}."); - // var task = simpleConsumer.Ack(message); - // tasks.Add(task); - // } - // - // await Task.WhenAll(tasks); - // Logger.Info($"{tasks.Count} messages have been acknowledged."); - // // Close the consumer if you don't need it anymore. - // await simpleConsumer.Shutdown(); + const string accessKey = "yourAccessKey"; + const string secretKey = "yourSecretKey"; + + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey); + const string endpoints = "foobar.com:8080"; + var clientConfig = new ClientConfig(endpoints) + { + CredentialsProvider = credentialsProvider + }; + // Add your subscriptions. + const string consumerGroup = "yourConsumerGroup"; + var subscription = new Dictionary<string, FilterExpression> + { { consumerGroup, new FilterExpression("*") } }; + // In most case, you don't need to create too many consumers, single pattern is recommended. + var simpleConsumer = + new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription); + + await simpleConsumer.Start(); + var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15)); + foreach (var message in messageViews) + { + Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}"); + await simpleConsumer.Ack(message); + Logger.Info($"Message is acknowledged successfully, message-id={message.MessageId}"); + // await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15)); + // Logger.Info($"Changing message invisible duration successfully, message=id={message.MessageId}"); + } + + // Close the consumer if you don't need it anymore. + await simpleConsumer.Shutdown(); } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Address.cs b/csharp/rocketmq-client-csharp/Address.cs index fca83530..a66a6369 100644 --- a/csharp/rocketmq-client-csharp/Address.cs +++ b/csharp/rocketmq-client-csharp/Address.cs @@ -1,11 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + using System; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class Address : IEquatable<Address> { - public Address(rmq.Address address) + public Address(Proto.Address address) { Host = address.Host; Port = address.Port; @@ -21,9 +38,9 @@ namespace Org.Apache.Rocketmq public int Port { get; } - public rmq.Address ToProtobuf() + public Proto.Address ToProtobuf() { - return new rmq.Address + return new Proto.Address { Host = Host, Port = Port diff --git a/csharp/rocketmq-client-csharp/AddressScheme.cs b/csharp/rocketmq-client-csharp/AddressScheme.cs index 6f36f546..06a5ea32 100644 --- a/csharp/rocketmq-client-csharp/AddressScheme.cs +++ b/csharp/rocketmq-client-csharp/AddressScheme.cs @@ -15,7 +15,7 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -28,17 +28,17 @@ namespace Org.Apache.Rocketmq public static class AddressSchemeHelper { - public static rmq.AddressScheme ToProtobuf(AddressScheme scheme) + public static Proto.AddressScheme ToProtobuf(AddressScheme scheme) { switch (scheme) { case AddressScheme.Ipv4: - return rmq.AddressScheme.Ipv4; + return Proto.AddressScheme.Ipv4; case AddressScheme.Ipv6: - return rmq.AddressScheme.Ipv6; + return Proto.AddressScheme.Ipv6; case AddressScheme.DomainName: default: - return rmq.AddressScheme.DomainName; + return Proto.AddressScheme.DomainName; } } } diff --git a/csharp/rocketmq-client-csharp/Broker.cs b/csharp/rocketmq-client-csharp/Broker.cs index 6b426a5a..6a2f957a 100644 --- a/csharp/rocketmq-client-csharp/Broker.cs +++ b/csharp/rocketmq-client-csharp/Broker.cs @@ -31,5 +31,15 @@ namespace Org.Apache.Rocketmq public string Name { get; } public int Id { get; } public Endpoints Endpoints { get; } + + public Proto.Broker ToProtobuf() + { + return new Proto.Broker() + { + Name = Name, + Id = Id, + Endpoints = Endpoints.ToProtobuf() + }; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index 51ecb936..3b260022 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -44,8 +44,8 @@ namespace Org.Apache.Rocketmq protected readonly IClientManager ClientManager; protected readonly string ClientId; - protected readonly ConcurrentDictionary<string, bool> Topics; - + protected readonly ICollection<string> Topics; + protected readonly ConcurrentDictionary<Endpoints, bool> Isolated; private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache; private readonly CancellationTokenSource _telemetryCts; @@ -53,7 +53,7 @@ namespace Org.Apache.Rocketmq private readonly Dictionary<Endpoints, Session> _sessionsTable; private readonly ReaderWriterLockSlim _sessionLock; - protected Client(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics) + protected Client(ClientConfig clientConfig, ICollection<string> topics) { ClientConfig = clientConfig; Topics = topics; @@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, _topicRouteUpdateCtx.Token); ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, _heartbeatCts.Token); ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, _settingsSyncCtx.Token); - foreach (var topic in Topics.Keys) + foreach (var topic in Topics) { await FetchTopicRoute(topic); } @@ -135,7 +135,7 @@ namespace Org.Apache.Rocketmq protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); - protected abstract void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData); + protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData); private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData) @@ -159,7 +159,7 @@ namespace Org.Apache.Rocketmq } _topicRouteCache[topic] = topicRouteData; - OnTopicRouteDataFetched0(topic, topicRouteData); + OnTopicRouteDataUpdated0(topic, topicRouteData); } @@ -183,7 +183,7 @@ namespace Org.Apache.Rocketmq private async void UpdateTopicRouteCache() { Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}"); - foreach (var topic in Topics.Keys) + foreach (var topic in Topics) { var topicRouteData = await FetchTopicRoute(topic); _topicRouteCache[topic] = topicRouteData; @@ -250,6 +250,7 @@ namespace Org.Apache.Rocketmq Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " + $"statusMessage={response.Status.Message}"); } + StatusChecker.Check(response.Status, request); var messageQueues = response.MessageQueues.ToList(); @@ -266,13 +267,14 @@ namespace Org.Apache.Rocketmq } var request = WrapHeartbeatRequest(); - Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new (); + Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new(); // Collect task into a map. foreach (var item in endpoints) { var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); - responses[item]= task; + responses[item] = task; } + foreach (var item in responses.Keys) { var response = await responses[item]; @@ -291,11 +293,10 @@ namespace Org.Apache.Rocketmq } var statusMessage = response.Status.Message; - Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); + Logger.Info( + $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}"); } } - - public grpc.Metadata Sign() diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index 3eef2fe6..967a2ac1 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -15,7 +15,7 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using System; using System.Threading; using System.Threading.Tasks; @@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq _clientLock.EnterReadLock(); try { - List<Task> tasks = new List<Task>(); + var tasks = new List<Task>(); foreach (var item in _rpcClients) { tasks.Add(item.Value.Shutdown()); @@ -92,56 +92,57 @@ namespace Org.Apache.Rocketmq } } - public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry( + public grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry( Endpoints endpoints) { return GetRpcClient(endpoints).Telemetry(_client.Sign()); } - public async Task<rmq.QueryRouteResponse> QueryRoute(Endpoints endpoints, rmq.QueryRouteRequest request, + public async Task<Proto.QueryRouteResponse> QueryRoute(Endpoints endpoints, Proto.QueryRouteRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).QueryRoute(_client.Sign(), request, timeout); } - public async Task<rmq.HeartbeatResponse> Heartbeat(Endpoints endpoints, rmq.HeartbeatRequest request, + public async Task<Proto.HeartbeatResponse> Heartbeat(Endpoints endpoints, Proto.HeartbeatRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).Heartbeat(_client.Sign(), request, timeout); } - public async Task<rmq.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints, - rmq.NotifyClientTerminationRequest request, TimeSpan timeout) + public async Task<Proto.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints, + Proto.NotifyClientTerminationRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).NotifyClientTermination(_client.Sign(), request, timeout); } - public async Task<rmq::SendMessageResponse> SendMessage(Endpoints endpoints, rmq::SendMessageRequest request, + public async Task<Proto::SendMessageResponse> SendMessage(Endpoints endpoints, + Proto::SendMessageRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).SendMessage(_client.Sign(), request, timeout); } - public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints, - rmq.QueryAssignmentRequest request, TimeSpan timeout) + public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints, + Proto.QueryAssignmentRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).QueryAssignment(_client.Sign(), request, timeout); } - public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints, - rmq.ReceiveMessageRequest request, TimeSpan timeout) + public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints, + Proto.ReceiveMessageRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).ReceiveMessage(_client.Sign(), request, timeout); } - public async Task<rmq::AckMessageResponse> AckMessage(Endpoints endpoints, - rmq.AckMessageRequest request, TimeSpan timeout) + public async Task<Proto::AckMessageResponse> AckMessage(Endpoints endpoints, + Proto.AckMessageRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).AckMessage(_client.Sign(), request, timeout); } - public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints, - rmq.ChangeInvisibleDurationRequest request, TimeSpan timeout) + public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints, + Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout) { return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout); } diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs new file mode 100644 index 00000000..25df84d3 --- /dev/null +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using Proto = Apache.Rocketmq.V2; + +namespace Org.Apache.Rocketmq +{ + public abstract class Consumer : Client + { + protected readonly string ConsumerGroup; + + protected Consumer(ClientConfig clientConfig, string consumerGroup, ICollection<string> topics) : base( + clientConfig, topics) + { + ConsumerGroup = consumerGroup; + } + + protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq, + TimeSpan awaitDuration) + { + var tolerance = ClientConfig.RequestTimeout; + var timeout = tolerance.Add(awaitDuration); + var response = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout); + var status = new Proto.Status() + { + Code = Proto.Code.InternalServerError, + Message = "Status was not set by server" + }; + var messageList = new List<Proto.Message>(); + foreach (var entry in response) + { + switch (entry.ContentCase) + { + case Proto.ReceiveMessageResponse.ContentOneofCase.Status: + status = entry.Status; + break; + case Proto.ReceiveMessageResponse.ContentOneofCase.Message: + messageList.Add(entry.Message); + break; + case Proto.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp: + case Proto.ReceiveMessageResponse.ContentOneofCase.None: + default: + break; + } + } + + var messages = messageList.Select(message => MessageView.FromProtobuf(message, mq)).ToList(); + StatusChecker.Check(status, request); + return new ReceiveMessageResult(mq.Broker.Endpoints, messages); + } + + private static Proto.FilterExpression WrapFilterExpression(FilterExpression filterExpression) + { + var filterType = Proto.FilterType.Tag; + if (ExpressionType.Sql92.Equals(filterExpression.Type)) + { + filterType = Proto.FilterType.Sql; + } + + return new Proto.FilterExpression + { + Type = filterType, + Expression = filterExpression.Expression + }; + } + + protected static Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, + FilterExpression filterExpression, TimeSpan invisibleDuration) + { + return new Proto.ReceiveMessageRequest() + { + MessageQueue = mq.ToProtobuf(), + FilterExpression = WrapFilterExpression(filterExpression), + BatchSize = batchSize, + AutoRenew = false, + InvisibleDuration = Duration.FromTimeSpan(invisibleDuration) + }; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs b/csharp/rocketmq-client-csharp/Error/StatusChecker.cs deleted file mode 100644 index 55f56d57..00000000 --- a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs +++ /dev/null @@ -1,41 +0,0 @@ -using Apache.Rocketmq.V2; -using Google.Protobuf; - -namespace Org.Apache.Rocketmq.Error -{ - public class StatusChecker - { - public static void check(Status status, IMessage message) - { - // var code = status.Code; - // switch (code) - // { - // case Code.Ok: - // case Code.MultipleResults: - // return; - // case Code.BadRequest: - // case Code.IllegalAccessPoint: - // case Code.IllegalTopic: - // case Code.IllegalConsumerGroup: - // case Code.IllegalMessageTag: - // case Code.IllegalMessageKey: - // case Code.IllegalMessageGroup: - // case Code.IllegalMessagePropertyKey: - // case Code.InvalidTransactionId: - // case Code.IllegalMessageId: - // case Code.IllegalFilterExpression: - // case Code.IllegalInvisibleTime: - // case Code.IllegalDeliveryTime: - // case Code.InvalidReceiptHandle: - // case Code.MessagePropertyConflictWithType: - // case Code.UnrecognizedClientType: - // case Code.MessageCorrupted: - // case Code.ClientIdRequired: - // case Code.IllegalPollingTime: - // throw new BadRequestException(code) - // - // case ILLEGAL_POLLING_TIME: - // } - } - } -} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/FilterExpression.cs b/csharp/rocketmq-client-csharp/FilterExpression.cs index 3bd432da..07a4af5a 100644 --- a/csharp/rocketmq-client-csharp/FilterExpression.cs +++ b/csharp/rocketmq-client-csharp/FilterExpression.cs @@ -25,6 +25,12 @@ namespace Org.Apache.Rocketmq Type = type; } + public FilterExpression(string expression) + { + Expression = expression; + Type = ExpressionType.Tag; + } + public ExpressionType Type { get; } public string Expression { get; } } diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index f2e48e36..df2035ab 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -25,15 +25,48 @@ namespace Org.Apache.Rocketmq { public interface IClientManager { + /// <summary> + /// Establish a telemetry channel between client and remote endpoints. + /// </summary> + /// <param name="endpoints">The target endpoints.</param> + /// <returns>gRPC bi-directional stream.</returns> AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints); + /// <summary> + /// Query topic route info from remote. + /// </summary> + /// <param name="endpoints">The target endpoints.</param> + /// <param name="request">gRPC request of querying topic route.</param> + /// <param name="timeout">Request max duration.</param> + /// <returns>Task of response.</returns> Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout); + /// <summary> + /// Send heartbeat to remote endpoints. + /// </summary> + /// <param name="endpoints">The target endpoints.</param> + /// <param name="request">gRPC request of heartbeat.</param> + /// <param name="timeout">Request max duration.</param> + /// <returns>Task of response.</returns> Task<HeartbeatResponse> Heartbeat(Endpoints endpoints, HeartbeatRequest request, TimeSpan timeout); + /// <summary> + /// Notify client's termination. + /// </summary> + /// <param name="endpoints">The target endpoints.</param> + /// <param name="request">gRPC request of notifying client's termination.</param> + /// <param name="timeout">Request max duration.</param> + /// <returns>Task of response.</returns> Task<NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout); + /// <summary> + /// Send message to remote endpoints. + /// </summary> + /// <param name="endpoints"></param> + /// <param name="request"></param> + /// <param name="timeout"></param> + /// <returns></returns> Task<SendMessageResponse> SendMessage(Endpoints endpoints, SendMessageRequest request, TimeSpan timeout); diff --git a/csharp/rocketmq-client-csharp/MessageQueue.cs b/csharp/rocketmq-client-csharp/MessageQueue.cs index cd6f0ce3..580fbafe 100644 --- a/csharp/rocketmq-client-csharp/MessageQueue.cs +++ b/csharp/rocketmq-client-csharp/MessageQueue.cs @@ -16,23 +16,20 @@ */ using System.Collections.Generic; -using rmq = Apache.Rocketmq.V2; +using System.Linq; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class MessageQueue { - public MessageQueue(rmq::MessageQueue messageQueue) + public MessageQueue(Proto::MessageQueue messageQueue) { TopicResource = new Resource(messageQueue.Topic); QueueId = messageQueue.Id; Permission = PermissionHelper.FromProtobuf(messageQueue.Permission); - var messageTypes = new List<MessageType>(); - foreach (var acceptMessageType in messageQueue.AcceptMessageTypes) - { - var messageType = MessageTypeHelper.FromProtobuf(acceptMessageType); - messageTypes.Add(messageType); - } + var messageTypes = messageQueue.AcceptMessageTypes + .Select(MessageTypeHelper.FromProtobuf).ToList(); AcceptMessageTypes = messageTypes; Broker = new Broker(messageQueue.Broker); @@ -57,5 +54,19 @@ namespace Org.Apache.Rocketmq { return $"{Broker.Name}.{TopicResource}.{QueueId}"; } + + public Proto.MessageQueue ToProtobuf() + { + var messageTypes = AcceptMessageTypes.Select(MessageTypeHelper.ToProtobuf).ToList(); + + return new Proto.MessageQueue + { + Topic = TopicResource.ToProtobuf(), + Id = QueueId, + Permission = PermissionHelper.ToProtobuf(Permission), + Broker = Broker.ToProtobuf(), + AcceptMessageTypes = { messageTypes } + }; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index b790fcb9..dfb45e45 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -15,9 +15,10 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using System; using System.Collections.Generic; +using System.Linq; using System.Security.Cryptography; using NLog; @@ -30,14 +31,14 @@ namespace Org.Apache.Rocketmq { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - private readonly rmq.MessageQueue _messageQueue; - private readonly string _receiptHandle; + internal readonly MessageQueue MessageQueue; + internal readonly string ReceiptHandle; private readonly long _offset; private readonly bool _corrupted; internal MessageView(string messageId, string topic, byte[] body, string tag, string messageGroup, DateTime deliveryTime, List<string> keys, Dictionary<string, string> properties, string bornHost, - DateTime bornTime, int deliveryAttempt, rmq.MessageQueue messageQueue, string receiptHandle, long offset, + DateTime bornTime, int deliveryAttempt, MessageQueue messageQueue, string receiptHandle, long offset, bool corrupted) { MessageId = messageId; @@ -51,8 +52,8 @@ namespace Org.Apache.Rocketmq BornHost = bornHost; BornTime = bornTime; DeliveryAttempt = deliveryAttempt; - _messageQueue = messageQueue; - _receiptHandle = receiptHandle; + MessageQueue = messageQueue; + ReceiptHandle = receiptHandle; _offset = offset; _corrupted = corrupted; } @@ -79,7 +80,7 @@ namespace Org.Apache.Rocketmq public int DeliveryAttempt { get; } - public static MessageView FromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue) + public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue) { var topic = message.Topic.Name; var systemProperties = message.SystemProperties; @@ -87,11 +88,11 @@ namespace Org.Apache.Rocketmq var bodyDigest = systemProperties.BodyDigest; var checkSum = bodyDigest.Checksum; var raw = message.Body.ToByteArray(); - bool corrupted = false; + var corrupted = false; var type = bodyDigest.Type; switch (type) { - case rmq.DigestType.Crc32: + case Proto.DigestType.Crc32: { var expectedCheckSum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length).ToString("X"); if (!expectedCheckSum.Equals(checkSum)) @@ -101,7 +102,7 @@ namespace Org.Apache.Rocketmq break; } - case rmq.DigestType.Md5: + case Proto.DigestType.Md5: { var expectedCheckSum = Convert.ToHexString(MD5.HashData(raw)); if (!expectedCheckSum.Equals(checkSum)) @@ -111,7 +112,7 @@ namespace Org.Apache.Rocketmq break; } - case rmq.DigestType.Sha1: + case Proto.DigestType.Sha1: { var expectedCheckSum = Convert.ToHexString(SHA1.HashData(raw)); if (!expectedCheckSum.Equals(checkSum)) @@ -121,6 +122,7 @@ namespace Org.Apache.Rocketmq break; } + case Proto.DigestType.Unspecified: default: { Logger.Error( @@ -131,18 +133,19 @@ namespace Org.Apache.Rocketmq } var bodyEncoding = systemProperties.BodyEncoding; - byte[] body = raw; + var body = raw; switch (bodyEncoding) { - case rmq.Encoding.Gzip: + case Proto.Encoding.Gzip: { body = Utilities.DecompressBytesGzip(message.Body.ToByteArray()); break; } - case rmq.Encoding.Identity: + case Proto.Encoding.Identity: { break; } + case Proto.Encoding.Unspecified: default: { Logger.Error($"Unsupported message encoding algorithm," + @@ -151,25 +154,22 @@ namespace Org.Apache.Rocketmq } } - string tag = systemProperties.HasTag ? systemProperties.Tag : null; - string messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null; + var tag = systemProperties.HasTag ? systemProperties.Tag : null; + var messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null; var deliveryTime = systemProperties.DeliveryTimestamp.ToDateTime(); - List<string> keys = new List<string>(); - foreach (var key in systemProperties.Keys) - { - keys.Add(key); - } + var keys = systemProperties.Keys.ToList(); var bornHost = systemProperties.BornHost; var bornTime = systemProperties.BornTimestamp.ToDateTime(); var deliveryAttempt = systemProperties.DeliveryAttempt; var queueOffset = systemProperties.QueueOffset; - Dictionary<string, string> properties = new Dictionary<string, string>(); + var properties = new Dictionary<string, string>(); foreach (var (key, value) in message.UserProperties) { properties.Add(key, value); } + var receiptHandle = systemProperties.ReceiptHandle; return new MessageView(messageId, topic, body, tag, messageGroup, deliveryTime, keys, properties, bornHost, bornTime, deliveryAttempt, messageQueue, receiptHandle, queueOffset, corrupted); diff --git a/csharp/rocketmq-client-csharp/MqEncoding.cs b/csharp/rocketmq-client-csharp/MqEncoding.cs index 27dfb052..ce6f4779 100644 --- a/csharp/rocketmq-client-csharp/MqEncoding.cs +++ b/csharp/rocketmq-client-csharp/MqEncoding.cs @@ -15,7 +15,7 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -27,13 +27,13 @@ namespace Org.Apache.Rocketmq public static class EncodingHelper { - public static rmq.Encoding ToProtobuf(MqEncoding mqEncoding) + public static Proto.Encoding ToProtobuf(MqEncoding mqEncoding) { return mqEncoding switch { - MqEncoding.Gzip => rmq.Encoding.Gzip, - MqEncoding.Identity => rmq.Encoding.Identity, - _ => rmq.Encoding.Unspecified + MqEncoding.Gzip => Proto.Encoding.Gzip, + MqEncoding.Identity => Proto.Encoding.Identity, + _ => Proto.Encoding.Unspecified }; } } diff --git a/csharp/rocketmq-client-csharp/Permission.cs b/csharp/rocketmq-client-csharp/Permission.cs index eedd6247..20d2828d 100644 --- a/csharp/rocketmq-client-csharp/Permission.cs +++ b/csharp/rocketmq-client-csharp/Permission.cs @@ -46,8 +46,24 @@ namespace Org.Apache.Rocketmq throw new InternalErrorException("Permission is not specified"); } } - - public static bool IsWritable(Permission permission) { + + public static Proto.Permission ToProtobuf(Permission permission) + { + switch (permission) + { + case Permission.Read: + return Proto.Permission.Read; + case Permission.Write: + return Proto.Permission.Write; + case Permission.ReadWrite: + return Proto.Permission.ReadWrite; + default: + throw new InternalErrorException("Permission is not specified"); + } + } + + public static bool IsWritable(Permission permission) + { switch (permission) { case Permission.Write: @@ -74,6 +90,4 @@ namespace Org.Apache.Rocketmq } } } - - } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index fc41926b..62387a98 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -25,7 +25,7 @@ using NLog; namespace Org.Apache.Rocketmq { - public class Producer : Client, IProducer + public class Producer : Client { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache; @@ -42,7 +42,7 @@ namespace Org.Apache.Rocketmq } private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) : - base(clientConfig, topics) + base(clientConfig, topics.Keys) { var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts); _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy, @@ -54,7 +54,7 @@ namespace Org.Apache.Rocketmq { foreach (var topic in topics) { - Topics[topic] = false; + Topics.Add(topic); } } @@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq return publishingLoadBalancer; } - protected override void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData) + protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) { var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer); diff --git a/csharp/rocketmq-client-csharp/Publishing.cs b/csharp/rocketmq-client-csharp/Publishing.cs index ffedd177..055e36ce 100644 --- a/csharp/rocketmq-client-csharp/Publishing.cs +++ b/csharp/rocketmq-client-csharp/Publishing.cs @@ -15,7 +15,7 @@ * limitations under the License. */ -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using System.Collections.Generic; namespace Org.Apache.Rocketmq @@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq // Settings for publishing public class Publishing { - public List<rmq::Resource> Topics { get; set; } + public List<Proto::Resource> Topics { get; set; } public int CompressBodyThreshold { get; set; } public int MaxBodySize { get; set; } diff --git a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs index c33bc7dc..2133af5a 100644 --- a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs +++ b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs @@ -18,23 +18,24 @@ using System; using System.Collections.Generic; using System.Linq; -using rmq = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class PublishingLoadBalancer { private readonly List<MessageQueue> _messageQueues; + + // TODO private int _roundRobinIndex; public PublishingLoadBalancer(TopicRouteData route) { _messageQueues = new List<MessageQueue>(); - foreach (var messageQueue in route.MessageQueues.Where(messageQueue => + foreach (var mq in route.MessageQueues.Where(messageQueue => PermissionHelper.IsWritable(messageQueue.Permission) && Utilities.MasterBrokerId == messageQueue.Broker.Id)) { - _messageQueues.Add(messageQueue); + _messageQueues.Add(mq); } var random = new Random(); @@ -70,21 +71,23 @@ namespace Org.Apache.Rocketmq } } - if (candidates.Count != 0) return candidates; + if (candidates.Count != 0) + { + return candidates; + } + + foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count)) + .Select(positiveMod => _messageQueues[positiveMod])) { - foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count)) - .Select(positiveMod => _messageQueues[positiveMod])) + if (!candidateBrokerNames.Contains(mq.Broker.Name)) { - if (!candidateBrokerNames.Contains(mq.Broker.Name)) - { - candidateBrokerNames.Add(mq.Broker.Name); - candidates.Add(mq); - } + candidateBrokerNames.Add(mq.Broker.Name); + candidates.Add(mq); + } - if (candidates.Count >= count) - { - break; - } + if (candidates.Count >= count) + { + break; } } diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs index 023b0be3..b9f8f454 100644 --- a/csharp/rocketmq-client-csharp/PublishingSettings.cs +++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs @@ -17,7 +17,7 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; +using System.Linq; using Google.Protobuf.WellKnownTypes; using Proto = Apache.Rocketmq.V2; @@ -28,8 +28,8 @@ namespace Org.Apache.Rocketmq private volatile int _maxBodySizeBytes = 4 * 1024 * 1024; private volatile bool _validateMessageType = true; - public PublishingSettings(string clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy, - TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, accessPoint, + public PublishingSettings(string clientId, Endpoints endpoints, ExponentialBackoffRetryPolicy retryPolicy, + TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, endpoints, retryPolicy, requestTimeout) { Topics = topics; @@ -54,14 +54,7 @@ namespace Org.Apache.Rocketmq public override Proto.Settings ToProtobuf() { - List<Proto.Resource> topics = new List<Proto.Resource>(); - foreach (var topic in Topics) - { - topics.Add(new Proto.Resource - { - Name = topic.Key - }); - } + var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList(); var publishing = new Proto.Publishing(); publishing.Topics.Add(topics); @@ -69,7 +62,7 @@ namespace Org.Apache.Rocketmq return new Proto.Settings { Publishing = publishing, - AccessPoint = AccessPoint.ToProtobuf(), + AccessPoint = Endpoints.ToProtobuf(), ClientType = ClientTypeHelper.ToProtobuf(ClientType), RequestTimeout = Duration.FromTimeSpan(RequestTimeout), BackoffPolicy = RetryPolicy.ToProtobuf(), diff --git a/csharp/rocketmq-client-csharp/IProducer.cs b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs similarity index 71% rename from csharp/rocketmq-client-csharp/IProducer.cs rename to csharp/rocketmq-client-csharp/ReceiveMessageResult.cs index 420af202..1898838b 100644 --- a/csharp/rocketmq-client-csharp/IProducer.cs +++ b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs @@ -15,16 +15,20 @@ * limitations under the License. */ -using System.Threading.Tasks; +using System.Collections.Generic; namespace Org.Apache.Rocketmq { - public interface IProducer + public class ReceiveMessageResult { - Task Start(); + public ReceiveMessageResult(Endpoints endpoints, List<MessageView> messages) + { + Endpoints = endpoints; + Messages = messages; + } - Task Shutdown(); + public Endpoints Endpoints { get; } - Task<SendReceipt> Send(Message message); + public List<MessageView> Messages { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs index a1825f15..76c0ba9e 100644 --- a/csharp/rocketmq-client-csharp/Resource.cs +++ b/csharp/rocketmq-client-csharp/Resource.cs @@ -11,6 +11,12 @@ namespace Org.Apache.Rocketmq Name = resource.Name; } + public Resource(string name) + { + Namespace = ""; + Name = name; + } + public string Namespace { get; } public string Name { get; } @@ -22,7 +28,7 @@ namespace Org.Apache.Rocketmq Name = Name }; } - + public override string ToString() { return String.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}"; diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs index de28c184..bf45410c 100644 --- a/csharp/rocketmq-client-csharp/RpcClient.cs +++ b/csharp/rocketmq-client-csharp/RpcClient.cs @@ -21,7 +21,7 @@ using System.Net.Http; using System.Net.Security; using System.Threading; using System.Threading.Tasks; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; using Grpc.Core; using Grpc.Core.Interceptors; using Grpc.Net.Client; @@ -32,7 +32,7 @@ namespace Org.Apache.Rocketmq public class RpcClient : IRpcClient { private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); - private readonly rmq::MessagingService.MessagingServiceClient _stub; + private readonly Proto::MessagingService.MessagingServiceClient _stub; private readonly GrpcChannel _channel; private readonly string _target; @@ -44,7 +44,7 @@ namespace Org.Apache.Rocketmq HttpHandler = CreateHttpHandler() }); var invoker = _channel.Intercept(new ClientLoggerInterceptor()); - _stub = new rmq::MessagingService.MessagingServiceClient(invoker); + _stub = new Proto::MessagingService.MessagingServiceClient(invoker); } public async Task Shutdown() @@ -74,14 +74,14 @@ namespace Org.Apache.Rocketmq return handler; } - public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata) + public AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(Metadata metadata) { var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650)); var callOptions = new CallOptions(metadata, deadline); return _stub.Telemetry(callOptions); } - public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, + public async Task<Proto::QueryRouteResponse> QueryRoute(Metadata metadata, Proto::QueryRouteRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -92,7 +92,7 @@ namespace Org.Apache.Rocketmq } - public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request, + public async Task<Proto::HeartbeatResponse> Heartbeat(Metadata metadata, Proto::HeartbeatRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -102,7 +102,7 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request, + public async Task<Proto::SendMessageResponse> SendMessage(Metadata metadata, Proto::SendMessageRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -112,8 +112,8 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata, - rmq::QueryAssignmentRequest request, + public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Metadata metadata, + Proto::QueryAssignmentRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -123,14 +123,14 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, - rmq::ReceiveMessageRequest request, TimeSpan timeout) + public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, + Proto::ReceiveMessageRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); var callOptions = new CallOptions(metadata, deadline); var call = _stub.ReceiveMessage(request, callOptions); Logger.Debug($"ReceiveMessageRequest has been written to {_target}"); - var result = new List<rmq::ReceiveMessageResponse>(); + var result = new List<Proto::ReceiveMessageResponse>(); var stream = call.ResponseStream; while (await stream.MoveNext()) { @@ -143,7 +143,7 @@ namespace Org.Apache.Rocketmq return result; } - public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request, + public async Task<Proto::AckMessageResponse> AckMessage(Metadata metadata, Proto::AckMessageRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -153,8 +153,8 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, - rmq::ChangeInvisibleDurationRequest request, + public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, + Proto::ChangeInvisibleDurationRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -164,9 +164,9 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue( + public async Task<Proto::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue( Metadata metadata, - rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout) + Proto::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); var callOptions = new CallOptions(metadata, deadline); @@ -175,8 +175,8 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata, - rmq::EndTransactionRequest request, + public async Task<Proto::EndTransactionResponse> EndTransaction(Metadata metadata, + Proto::EndTransactionRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); @@ -186,8 +186,8 @@ namespace Org.Apache.Rocketmq return await call.ResponseAsync; } - public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata, - rmq::NotifyClientTerminationRequest request, TimeSpan timeout) + public async Task<Proto::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata, + Proto::NotifyClientTerminationRequest request, TimeSpan timeout) { var deadline = DateTime.UtcNow.Add(timeout); var callOptions = new CallOptions(metadata, deadline); diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs index 7716fc2d..491aa564 100644 --- a/csharp/rocketmq-client-csharp/Settings.cs +++ b/csharp/rocketmq-client-csharp/Settings.cs @@ -24,25 +24,25 @@ namespace Org.Apache.Rocketmq { protected readonly string ClientId; protected readonly ClientType ClientType; - protected readonly Endpoints AccessPoint; + protected readonly Endpoints Endpoints; protected volatile IRetryPolicy RetryPolicy; protected readonly TimeSpan RequestTimeout; - public Settings(string clientId, ClientType clientType, Endpoints accessPoint, IRetryPolicy retryPolicy, + public Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy, TimeSpan requestTimeout) { ClientId = clientId; ClientType = clientType; - AccessPoint = accessPoint; + Endpoints = endpoints; RetryPolicy = retryPolicy; RequestTimeout = requestTimeout; } - public Settings(string clientId, ClientType clientType, Endpoints accessPoint, TimeSpan requestTimeout) + public Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout) { ClientId = clientId; ClientType = clientType; - AccessPoint = accessPoint; + Endpoints = endpoints; RetryPolicy = null; RequestTimeout = requestTimeout; } diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs new file mode 100644 index 00000000..1a0f0ec2 --- /dev/null +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -0,0 +1,192 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using Proto = Apache.Rocketmq.V2; +using NLog; +using Org.Apache.Rocketmq.Error; + +namespace Org.Apache.Rocketmq +{ + public class SimpleConsumer : Consumer + { + private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache; + private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions; + private readonly TimeSpan _awaitDuration; + private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings; + private int _topicRoundRobinIndex; + + public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration, + Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup, + awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions)) + { + } + + private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration, + ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup, + subscriptionExpressions.Keys) + { + _awaitDuration = awaitDuration; + _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>(); + _subscriptionExpressions = subscriptionExpressions; + _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, clientConfig.Endpoints, + ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions); + _topicRoundRobinIndex = 0; + } + + public async Task Subscribe(string topic, FilterExpression filterExpression) + { + // TODO: check running status. + await GetSubscriptionLoadBalancer(topic); + _subscriptionExpressions.TryAdd(topic, filterExpression); + } + + public void Unsubscribe(string topic) + { + _subscriptionExpressions.TryRemove(topic, out _); + } + + public override async Task Start() + { + Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}"); + await base.Start(); + Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + } + + public override async Task Shutdown() + { + Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}"); + await base.Shutdown(); + Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + } + + protected override Proto.HeartbeatRequest WrapHeartbeatRequest() + { + return new Proto::HeartbeatRequest + { + ClientType = Proto.ClientType.SimpleConsumer + }; + } + + protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) + { + var subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData); + _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer); + } + + public override Proto.Settings GetSettings() + { + return _simpleSubscriptionSettings.ToProtobuf(); + } + + public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) + { + _simpleSubscriptionSettings.Sync(settings); + } + + + private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic) + { + if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer)) + { + return subscriptionLoadBalancer; + } + + var topicRouteData = await FetchTopicRoute(topic); + subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData); + _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer); + + return subscriptionLoadBalancer; + } + + + public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration) + { + if (maxMessageNum <= 0) + { + throw new InternalErrorException("maxMessageNum must be greater than 0"); + } + + var copy = new ConcurrentDictionary<string, FilterExpression>(_subscriptionExpressions); + var topics = new List<string>(copy.Keys); + if (topics.Count <= 0) + { + throw new ArgumentException("There is no topic to receive message"); + } + + var index = Utilities.GetPositiveMod(Interlocked.Increment(ref _topicRoundRobinIndex), topics.Count); + var topic = topics[index]; + var filterExpression = _subscriptionExpressions[topic]; + var subscriptionLoadBalancer = await GetSubscriptionLoadBalancer(topic); + + var mq = subscriptionLoadBalancer.TakeMessageQueue(); + var request = WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration); + var receiveMessageResult = await ReceiveMessage(request, mq, _awaitDuration); + return receiveMessageResult.Messages; + } + + public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) + { + var request = WrapChangeInvisibleDuration(messageView, invisibleDuration); + var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints, + request, ClientConfig.RequestTimeout); + StatusChecker.Check(response.Status, request); + } + + + public async Task Ack(MessageView messageView) + { + var request = WrapAckMessageRequest(messageView); + var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request, + ClientConfig.RequestTimeout); + StatusChecker.Check(response.Status, request); + } + + private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView) + { + var topicResource = new Proto.Resource + { + Name = messageView.Topic + }; + var entry = new Proto.AckMessageEntry + { + MessageId = messageView.MessageId, + ReceiptHandle = messageView.ReceiptHandle, + }; + return new Proto.AckMessageRequest + { + Group = GetProtobufGroup(), + Topic = topicResource, + Entries = { entry } + }; + } + + private Proto.ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView, + TimeSpan invisibleDuration) + { + var topicResource = new Proto.Resource + { + Name = messageView.Topic + }; + return new Proto.ChangeInvisibleDurationRequest + { + Topic = topicResource, + Group = GetProtobufGroup(), + ReceiptHandle = messageView.ReceiptHandle, + InvisibleDuration = Duration.FromTimeSpan(invisibleDuration), + MessageId = messageView.MessageId + }; + } + + private Proto.Resource GetProtobufGroup() + { + return new Proto.Resource() + { + Name = ConsumerGroup + }; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs new file mode 100644 index 00000000..a6a409d7 --- /dev/null +++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Google.Protobuf.WellKnownTypes; +using NLog; +using Proto = Apache.Rocketmq.V2; + +namespace Org.Apache.Rocketmq +{ + public class SimpleSubscriptionSettings : Settings + { + private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); + + private readonly Resource _group; + private readonly TimeSpan _longPollingTimeout; + private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions; + + public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup, + TimeSpan requestTimeout, TimeSpan longPollingTimeout, + ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base( + clientId, ClientType.SimpleConsumer, endpoints, requestTimeout) + { + _group = new Resource(consumerGroup); + _longPollingTimeout = longPollingTimeout; + _subscriptionExpressions = subscriptionExpressions; + } + + public override void Sync(Proto::Settings settings) + { + // TODO + } + + public override Proto.Settings ToProtobuf() + { + var subscriptionEntries = new List<Proto.SubscriptionEntry>(); + foreach (var (key, value) in _subscriptionExpressions) + { + var topic = new Proto.Resource() + { + Name = key, + }; + var subscriptionEntry = new Proto.SubscriptionEntry(); + var filterExpression = new Proto.FilterExpression(); + switch (value.Type) + { + case ExpressionType.Tag: + filterExpression.Type = Proto.FilterType.Tag; + break; + case ExpressionType.Sql92: + filterExpression.Type = Proto.FilterType.Sql; + break; + default: + Logger.Warn($"[Bug] Unrecognized filter type={value.Type} for simple consumer"); + break; + } + + filterExpression.Expression = value.Expression; + subscriptionEntry.Topic = topic; + subscriptionEntries.Add(subscriptionEntry); + } + + var subscription = new Proto.Subscription + { + Group = _group.ToProtobuf(), + Subscriptions = { subscriptionEntries }, + LongPollingTimeout = Duration.FromTimeSpan(_longPollingTimeout) + }; + return new Proto.Settings + { + AccessPoint = Endpoints.ToProtobuf(), + ClientType = ClientTypeHelper.ToProtobuf(ClientType), + RequestTimeout = Duration.FromTimeSpan(RequestTimeout), + Subscription = subscription, + UserAgent = UserAgent.Instance.ToProtobuf() + }; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs index 641fd097..2802fd78 100644 --- a/csharp/rocketmq-client-csharp/StatusChecker.cs +++ b/csharp/rocketmq-client-csharp/StatusChecker.cs @@ -31,6 +31,7 @@ namespace Org.Apache.Rocketmq var statusCode = status.Code; var statusMessage = status.Message; + // TODO switch (statusCode) { case Proto.Code.Ok: diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs index b77da833..a8b1963e 100644 --- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs +++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs @@ -15,37 +15,35 @@ * limitations under the License. */ +using System; using System.Collections.Generic; -using System.Threading; -using rmq = Apache.Rocketmq.V2; +using System.Linq; namespace Org.Apache.Rocketmq { internal sealed class SubscriptionLoadBalancer { - public List<rmq.Assignment> Assignments { get; private set; } - private uint index = 0; + private readonly List<MessageQueue> _messageQueues; + private int _roundRobinIndex; - public SubscriptionLoadBalancer(List<rmq.Assignment> assignments) + public SubscriptionLoadBalancer(TopicRouteData topicRouteData) { - Assignments = assignments; - } + _messageQueues = new List<MessageQueue>(); + foreach (var mq in topicRouteData.MessageQueues.Where(mq => PermissionHelper.IsReadable(mq.Permission)) + .Where(mq => Utilities.MasterBrokerId == mq.Broker.Id)) + { + _messageQueues.Add(mq); + } - private SubscriptionLoadBalancer(uint oldIndex, List<rmq.Assignment> assignments) - { - index = oldIndex; - Assignments = assignments; - } - - public SubscriptionLoadBalancer Update(List<rmq.Assignment> newAssignments) - { - return new SubscriptionLoadBalancer(index, newAssignments); + var random = new Random(); + _roundRobinIndex = random.Next(0, _messageQueues.Count); } - public rmq.MessageQueue TakeMessageQueue() + public MessageQueue TakeMessageQueue() { - var i = Interlocked.Increment(ref index); - return Assignments[(int)(i % Assignments.Count)].MessageQueue; + var next = ++_roundRobinIndex; + var index = Utilities.GetPositiveMod(next, _messageQueues.Count); + return _messageQueues[index]; } } -} +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs index 2be2b9a5..885db5f6 100644 --- a/csharp/rocketmq-client-csharp/TopicRouteData.cs +++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs @@ -18,19 +18,15 @@ using System; using System.Collections.Generic; using System.Linq; -using rmq = Apache.Rocketmq.V2; +using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class TopicRouteData : IEquatable<TopicRouteData> { - public TopicRouteData(List<rmq::MessageQueue> messageQueues) + public TopicRouteData(IEnumerable<Proto.MessageQueue> messageQueues) { - var messageQueuesList = new List<MessageQueue>(); - foreach (var mq in messageQueues) - { - messageQueuesList.Add(new MessageQueue(mq)); - } + var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList(); MessageQueues = messageQueuesList; } diff --git a/csharp/rocketmq-client-csharp/Utilities.cs b/csharp/rocketmq-client-csharp/Utilities.cs index 592f364e..d032ae1e 100644 --- a/csharp/rocketmq-client-csharp/Utilities.cs +++ b/csharp/rocketmq-client-csharp/Utilities.cs @@ -22,7 +22,6 @@ using System; using System.IO; using System.IO.Compression; using System.Threading; -using rmq = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { @@ -81,10 +80,10 @@ namespace Org.Apache.Rocketmq public static string ByteArrayToHexString(byte[] bytes) { - StringBuilder result = new StringBuilder(bytes.Length * 2); + var result = new StringBuilder(bytes.Length * 2); const string hexAlphabet = "0123456789ABCDEF"; - foreach (byte b in bytes) + foreach (var b in bytes) { result.Append(hexAlphabet[(int)(b >> 4)]); result.Append(hexAlphabet[(int)(b & 0xF)]); diff --git a/csharp/tests/UnitTest1.cs b/csharp/tests/UnitTest1.cs index af13d3dc..2f6b468a 100644 --- a/csharp/tests/UnitTest1.cs +++ b/csharp/tests/UnitTest1.cs @@ -14,11 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.Rocketmq; -using Grpc.Net.Client; -using rmq = Apache.Rocketmq.V2; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Proto = Apache.Rocketmq.V2; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -31,33 +29,32 @@ namespace tests [TestMethod] public void TestMethod1() { - rmq::Permission perm = rmq::Permission.None; + Proto::Permission perm = Proto::Permission.None; switch (perm) { - case rmq::Permission.None: - { - Console.WriteLine("None"); - break; - } - - case rmq::Permission.Read: - { - Console.WriteLine("Read"); - break; - } + case Proto::Permission.None: + { + Console.WriteLine("None"); + break; + } - case rmq::Permission.Write: - { - Console.WriteLine("Write"); - break; - } + case Proto::Permission.Read: + { + Console.WriteLine("Read"); + break; + } - case rmq::Permission.ReadWrite: - { - Console.WriteLine("ReadWrite"); - break; - } + case Proto::Permission.Write: + { + Console.WriteLine("Write"); + break; + } + case Proto::Permission.ReadWrite: + { + Console.WriteLine("ReadWrite"); + break; + } } } @@ -82,4 +79,4 @@ namespace tests Assert.AreEqual(1, list.Count); } } -} +} \ No newline at end of file
