This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 708f051  Clean up unused code: Remove PushConsumer (#207)
708f051 is described below

commit 708f051fa147ff328623261eb1c8f846d6581d3e
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 20:35:57 2022 +0800

    Clean up unused code: Remove PushConsumer (#207)
    
    * Clean up code
    
    * Remove unused code
---
 csharp/examples/Program.cs                         |  11 +-
 csharp/rocketmq-client-csharp/AccessPoint.cs       |  20 ++
 csharp/rocketmq-client-csharp/Client.cs            | 106 ++++-----
 .../rocketmq-client-csharp/ClientManagerFactory.cs |  45 ----
 csharp/rocketmq-client-csharp/Producer.cs          |   8 +-
 csharp/rocketmq-client-csharp/PushConsumer.cs      | 261 ---------------------
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    |  19 +-
 .../rocketmq-client-csharp.csproj                  |   4 +
 csharp/tests/ProducerTest.cs                       |  18 +-
 csharp/tests/PushConsumerTest.cs                   | 119 ----------
 csharp/tests/SimpleConsumerTest.cs                 |  25 +-
 11 files changed, 108 insertions(+), 528 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 28ad6f3..9f01e29 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -17,7 +17,6 @@
 using System;
 using System.Collections.Generic;
 using System.Threading.Tasks;
-using System.Threading;
 using Org.Apache.Rocketmq;
 
 namespace examples
@@ -26,15 +25,15 @@ namespace examples
     {
         static async Task Main(string[] args)
         {
-            string accessUrl = 
"rmq-cn-7mz2uk4nn0p.cn-hangzhou.rmq.aliyuncs.com:8080";
+            string accessUrl = 
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+            var topic = "sdk_standard";
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            var accessPoint = new AccessPoint(accessUrl);
-            var producer = new Producer(accessPoint, "");
+            var producer = new Producer(accessUrl);
             producer.CredentialsProvider = credentialsProvider;
+            producer.AddTopicOfInterest(topic);
+            
             await producer.Start();
 
-            var topic = "sdk_standard";
-            
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             // Associate the message with one or multiple keys
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs 
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index f4ba47c..ab29273 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -16,6 +16,9 @@
  */
 
 using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Net;
+using System.Net.Sockets;
 
 namespace Org.Apache.Rocketmq
 {
@@ -58,5 +61,22 @@ namespace Org.Apache.Rocketmq
         {
             return $"https://{_host}:{_port}";;
         }
+
+        public rmq::AddressScheme HostScheme()
+        {
+            IPAddress ip;
+            bool result = IPAddress.TryParse(_host, out ip);
+            if (!result)
+            {
+                return rmq::AddressScheme.DomainName;
+            }
+
+            return ip.AddressFamily switch
+            {
+                AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4,
+                AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6,
+                _ => rmq::AddressScheme.Unspecified
+            };
+        }
     }
 }
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index 32dffae..0fe4ec9 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -25,9 +25,6 @@ using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
 using NLog;
 using System.Diagnostics.Metrics;
-using OpenTelemetry;
-using OpenTelemetry.Metrics;
-
 
 namespace Org.Apache.Rocketmq
 {
@@ -35,46 +32,45 @@ namespace Org.Apache.Rocketmq
     {
         protected static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
 
-        protected Client(AccessPoint accessPoint, string resourceNamespace)
+        protected Client(string accessUrl)
         {
-            _accessPoint = accessPoint;
+            AccessPoint = new AccessPoint(accessUrl);
 
-            // Support IPv4 for now
-            AccessPointScheme = rmq::AddressScheme.Ipv4;
+            AccessPointScheme = AccessPoint.HostScheme();
             var serviceEndpoint = new rmq::Address();
-            serviceEndpoint.Host = accessPoint.Host;
-            serviceEndpoint.Port = accessPoint.Port;
+            serviceEndpoint.Host = AccessPoint.Host;
+            serviceEndpoint.Port = AccessPoint.Port;
             AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
 
-            _resourceNamespace = resourceNamespace;
+            _resourceNamespace = "";
 
-            _clientSettings = new rmq::Settings();
+            ClientSettings = new rmq::Settings();
 
-            _clientSettings.AccessPoint = new rmq::Endpoints();
-            _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
-            _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+            ClientSettings.AccessPoint = new rmq::Endpoints();
+            ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
 
-            _clientSettings.RequestTimeout = 
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+            ClientSettings.RequestTimeout = 
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
 
-            _clientSettings.UserAgent = new rmq.UA();
-            _clientSettings.UserAgent.Language = rmq::Language.DotNet;
-            _clientSettings.UserAgent.Version = "5.0.0";
-            _clientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
-            _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            ClientSettings.UserAgent = new rmq.UA();
+            ClientSettings.UserAgent.Language = rmq::Language.DotNet;
+            ClientSettings.UserAgent.Version = "5.0.0";
+            ClientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
+            ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
 
-            Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+            _manager = new ClientManager();
 
             _topicRouteTable = new ConcurrentDictionary<string, 
TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
 
             _healthCheckCts = new CancellationTokenSource();
 
-            telemetryCts_ = new CancellationTokenSource();
+            _telemetryCts = new CancellationTokenSource();
         }
 
         public virtual async Task Start()
         {
-            schedule(async () =>
+            Schedule(async () =>
             {
                 await UpdateTopicRoute();
 
@@ -83,8 +79,8 @@ namespace Org.Apache.Rocketmq
             // Get routes for topics of interest.
             await UpdateTopicRoute();
 
-            string accessPointUrl = _accessPoint.TargetUrl();
-            createSession(accessPointUrl);
+            string accessPointUrl = AccessPoint.TargetUrl();
+            CreateSession(accessPointUrl);
 
             await 
_sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
 
@@ -95,8 +91,8 @@ namespace Org.Apache.Rocketmq
         {
             Logger.Info($"Shutdown 
client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
-            telemetryCts_.Cancel();
-            await Manager.Shutdown();
+            _telemetryCts.Cancel();
+            await _manager.Shutdown();
         }
 
         protected string FilterBroker(Func<string, bool> acceptor)
@@ -138,7 +134,7 @@ namespace Org.Apache.Rocketmq
         private async Task UpdateTopicRoute()
         {
             HashSet<string> topics = new HashSet<string>();
-            foreach (var topic in topicsOfInterest_)
+            foreach (var topic in _topicsOfInterest)
             {
                 topics.Add(topic);
             }
@@ -192,7 +188,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public void schedule(Action action, int seconds, CancellationToken 
token)
+        public void Schedule(Action action, int seconds, CancellationToken 
token)
         {
             if (null == action)
             {
@@ -246,7 +242,7 @@ namespace Org.Apache.Rocketmq
             try
             {
                 Logger.Debug($"Resolving route for topic={topic}");
-                topicRouteData = await Manager.ResolveRoute(target, metadata, 
request, RequestTimeout);
+                topicRouteData = await _manager.ResolveRoute(target, metadata, 
request, RequestTimeout);
                 if (null != topicRouteData)
                 {
                     Logger.Debug($"Got route entries for {topic} from name 
server");
@@ -286,7 +282,7 @@ namespace Org.Apache.Rocketmq
             List<Task> tasks = new List<Task>();
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
+                tasks.Add(_manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
             }
 
             await Task.WhenAll(tasks);
@@ -303,7 +299,7 @@ namespace Org.Apache.Rocketmq
 
         }
 
-        protected async Task<List<rmq::Assignment>> scanLoadAssignment(string 
topic, string group)
+        protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string 
topic, string group)
         {
             // Pick a broker randomly
             string target = FilterBroker((s) => true);
@@ -324,7 +320,7 @@ namespace Org.Apache.Rocketmq
             {
                 var metadata = new grpc::Metadata();
                 Signature.sign(this, metadata);
-                return await Manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
+                return await _manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
             }
             catch (System.Exception e)
             {
@@ -345,14 +341,14 @@ namespace Org.Apache.Rocketmq
 
         public virtual void BuildClientSetting(rmq::Settings settings)
         {
-            settings.MergeFrom(_clientSettings);
+            settings.MergeFrom(ClientSettings);
         }
 
-        public void createSession(string url)
+        public void CreateSession(string url)
         {
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            var stream = Manager.Telemetry(url, metadata);
+            var stream = _manager.Telemetry(url, metadata);
             var session = new Session(url, stream, this);
             _sessions.TryAdd(url, session);
             Task.Run(async () =>
@@ -372,7 +368,7 @@ namespace Org.Apache.Rocketmq
             request.Group.ResourceNamespace = _resourceNamespace;
             request.Group.Name = group;
             request.MessageQueue = assignment.MessageQueue;
-            var messages = await Manager.ReceiveMessage(targetUrl, metadata, 
request, getLongPollingTimeout());
+            var messages = await _manager.ReceiveMessage(targetUrl, metadata, 
request, getLongPollingTimeout());
             return messages;
         }
 
@@ -394,7 +390,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.Ack(target, metadata, request, 
RequestTimeout);
+            return await _manager.Ack(target, metadata, request, 
RequestTimeout);
         }
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, 
string group, string topic, string receiptHandle, String messageId)
@@ -413,7 +409,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
+            return await _manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
         }
 
         public async Task<bool> NotifyClientTermination()
@@ -429,7 +425,7 @@ namespace Org.Apache.Rocketmq
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
+                tasks.Add(_manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -447,24 +443,24 @@ namespace Org.Apache.Rocketmq
         {
             if (null != settings.Metric)
             {
-                _clientSettings.Metric = new rmq::Metric();
-                _clientSettings.Metric.MergeFrom(settings.Metric);
+                ClientSettings.Metric = new rmq::Metric();
+                ClientSettings.Metric.MergeFrom(settings.Metric);
             }
 
             if (null != settings.BackoffPolicy)
             {
-                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
-                
_clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+                ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
             }
         }
 
-        protected readonly IClientManager Manager;
+        protected readonly IClientManager _manager;
 
-        private readonly HashSet<string> topicsOfInterest_ = new 
HashSet<string>();
+        private readonly HashSet<string> _topicsOfInterest = new 
HashSet<string>();
 
         public void AddTopicOfInterest(string topic)
         {
-            topicsOfInterest_.Add(topic);
+            _topicsOfInterest.Add(topic);
         }
 
         private readonly ConcurrentDictionary<string, TopicRouteData> 
_topicRouteTable;
@@ -472,24 +468,24 @@ namespace Org.Apache.Rocketmq
 
         private readonly CancellationTokenSource _healthCheckCts;
 
-        private readonly CancellationTokenSource telemetryCts_ = new 
CancellationTokenSource();
+        private readonly CancellationTokenSource _telemetryCts;
 
         public CancellationTokenSource TelemetryCts()
         {
-            return telemetryCts_;
+            return _telemetryCts;
         }
 
-        protected readonly AccessPoint _accessPoint;
+        protected readonly AccessPoint AccessPoint;
 
         // This field is subject changes from servers.
-        protected readonly rmq::Settings _clientSettings;
+        protected readonly rmq::Settings ClientSettings;
 
         private readonly Random _random = new Random();
-        
-        protected readonly ConcurrentDictionary<string, Session> _sessions = 
new ConcurrentDictionary<string, Session>();
 
-        public static readonly string MeterName = "Apache.RocketMQ.Client";
-        
+        private readonly ConcurrentDictionary<string, Session> _sessions = new 
ConcurrentDictionary<string, Session>();
+
+        protected const string MeterName = "Apache.RocketMQ.Client";
+
         protected static readonly Meter MetricMeter = new(MeterName, "1.0");
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs 
b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
deleted file mode 100644
index 9d03994..0000000
--- a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace Org.Apache.Rocketmq
-{
-    public sealed class ClientManagerFactory
-    {
-        public static IClientManager getClientManager(string resourceNamespace)
-        {
-            if (clientManagers.ContainsKey(resourceNamespace))
-            {
-                return clientManagers[resourceNamespace];
-            }
-
-            var clientManager = new ClientManager();
-            // TODO: configure client managers.
-            if (clientManagers.TryAdd<string, 
IClientManager>(resourceNamespace, clientManager))
-            {
-                return clientManager;
-            }
-
-            return clientManagers[resourceNamespace];
-        }
-
-        private static ConcurrentDictionary<string, IClientManager> 
clientManagers = new ConcurrentDictionary<string, IClientManager>();
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 37aebb9..e337b1a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
 {
     public class Producer : Client, IProducer
     {
-        public Producer(AccessPoint accessPoint, string resourceNamespace) : 
base(accessPoint, resourceNamespace)
+        public Producer(string accessUrl) : base(accessUrl)
         {
             _loadBalancer = new ConcurrentDictionary<string, 
PublishLoadBalancer>();
             _sendFailureTotal = 
MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
@@ -51,8 +51,8 @@ namespace Org.Apache.Rocketmq
                 .AddOtlpExporter(delegate(OtlpExporterOptions options, 
MetricReaderOptions readerOptions)
                 {
                     options.Protocol = OtlpExportProtocol.Grpc;
-                    options.Endpoint = new Uri(_accessPoint.TargetUrl());
-                    options.TimeoutMilliseconds = (int) 
_clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+                    options.Endpoint = new Uri(AccessPoint.TargetUrl());
+                    options.TimeoutMilliseconds = (int) 
ClientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
 
                     
readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 
60 * 1000;
                 })
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
                 {
                     var stopWatch = new Stopwatch();
                     stopWatch.Start();
-                    rmq::SendMessageResponse response = await 
Manager.SendMessage(target, metadata, request, RequestTimeout);
+                    rmq::SendMessageResponse response = await 
_manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == 
response.Status.Code)
                     {
                         var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs 
b/csharp/rocketmq-client-csharp/PushConsumer.cs
deleted file mode 100644
index cc30943..0000000
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-    public class PushConsumer : Client, IConsumer
-    {
-        public PushConsumer(AccessPoint accessPoint, string resourceNamespace, 
string group) : base(accessPoint, resourceNamespace)
-        {
-            _group = group;
-            _topicFilterExpressionMap = new ConcurrentDictionary<string, 
FilterExpression>();
-            _topicAssignmentsMap = new ConcurrentDictionary<string, 
List<rmq::Assignment>>();
-            _processQueueMap = new ConcurrentDictionary<rmq::Assignment, 
ProcessQueue>();
-            _scanAssignmentCTS = new CancellationTokenSource();
-            _scanExpiredProcessQueueCTS = new CancellationTokenSource();
-        }
-
-        public override async Task Start()
-        {
-            if (null == _messageListener)
-            {
-                throw new System.Exception("Bad configuration: message 
listener is required");
-            }
-
-            await base.Start();
-
-            // Step-1: Resolve topic routes
-            List<Task<TopicRouteData>> queryRouteTasks = new 
List<Task<TopicRouteData>>();
-            foreach (var item in _topicFilterExpressionMap)
-            {
-                queryRouteTasks.Add(GetRouteFor(item.Key, true));
-            }
-            Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
-
-            // Step-2: Send heartbeats to all involving brokers so that we may 
get immediate, valid assignments.
-            await Heartbeat();
-
-            // Step-3: Scan load assignments that are assigned to current 
client
-            schedule(async () =>
-            {
-                await scanLoadAssignments();
-            }, 10, _scanAssignmentCTS.Token);
-
-            schedule(() =>
-            {
-                ScanExpiredProcessQueue();
-            }, 10, _scanExpiredProcessQueueCTS.Token);
-        }
-
-        public override async Task Shutdown()
-        {
-            _scanAssignmentCTS.Cancel();
-            _scanExpiredProcessQueueCTS.Cancel();
-
-            // Shutdown resources of derived class
-            await base.Shutdown();
-        }
-
-        private async Task scanLoadAssignments()
-        {
-            Logger.Debug("Start to scan load assignments from server");
-            List<Task<List<rmq::Assignment>>> tasks = new 
List<Task<List<rmq::Assignment>>>();
-            foreach (var item in _topicFilterExpressionMap)
-            {
-                tasks.Add(scanLoadAssignment(item.Key, _group));
-            }
-            var result = await Task.WhenAll(tasks);
-
-            foreach (var assignments in result)
-            {
-                if (assignments.Count == 0)
-                {
-                    continue;
-                }
-
-                checkAndUpdateAssignments(assignments);
-            }
-            Logger.Debug("Completed scanning load assignments");
-        }
-
-        private void ScanExpiredProcessQueue()
-        {
-            foreach (var item in _processQueueMap)
-            {
-                if (item.Value.Expired())
-                {
-                    Task.Run(async () =>
-                    {
-                        await ExecutePop0(item.Key);
-                    });
-                }
-            }
-        }
-
-        private void checkAndUpdateAssignments(List<rmq::Assignment> 
assignments)
-        {
-            if (assignments.Count == 0)
-            {
-                return;
-            }
-
-            string topic = assignments[0].MessageQueue.Topic.Name;
-
-            // Compare to generate or cancel pop-cycles
-            List<rmq::Assignment> existing;
-            _topicAssignmentsMap.TryGetValue(topic, out existing);
-
-            foreach (var assignment in assignments)
-            {
-                if (null == existing || !existing.Contains(assignment))
-                {
-                    ExecutePop(assignment);
-                }
-            }
-
-            if (null != existing)
-            {
-                foreach (var assignment in existing)
-                {
-                    if (!assignments.Contains(assignment))
-                    {
-                        Logger.Info($"Stop receiving messages from 
{assignment.MessageQueue.ToString()}");
-                        CancelPop(assignment);
-                    }
-                }
-            }
-
-        }
-
-        private void ExecutePop(rmq::Assignment assignment)
-        {
-            var processQueue = new ProcessQueue();
-            if (_processQueueMap.TryAdd(assignment, processQueue))
-            {
-                Task.Run(async () =>
-                {
-                    await ExecutePop0(assignment);
-                });
-            }
-        }
-
-        private async Task ExecutePop0(rmq::Assignment assignment)
-        {
-            Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
-            while (true)
-            {
-                try
-                {
-                    ProcessQueue processQueue;
-                    if (!_processQueueMap.TryGetValue(assignment, out 
processQueue))
-                    {
-                        break;
-                    }
-
-                    if (processQueue.Dropped)
-                    {
-                        break;
-                    }
-
-                    List<Message> messages = await 
base.ReceiveMessage(assignment, _group);
-                    processQueue.LastReceiveTime = System.DateTime.UtcNow;
-
-                    // TODO: cache message and dispatch them 
-
-                    List<Message> failed = new List<Message>();
-                    await _messageListener.Consume(messages, failed);
-
-                    foreach (var message in failed)
-                    {
-                        await 
base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, 
message._receiptHandle, message.MessageId);
-                    }
-
-                    foreach (var message in messages)
-                    {
-                        if (!failed.Contains(message))
-                        {
-                            bool success = await base.Ack(message._sourceHost, 
_group, message.Topic, message._receiptHandle, message.MessageId);
-                            if (!success)
-                            {
-                                //TODO: log error.
-                            }
-                        }
-                    }
-                }
-                catch (System.Exception)
-                {
-                    // TODO: log exception raised.
-                }
-
-
-            }
-        }
-
-        private void CancelPop(rmq::Assignment assignment)
-        {
-            if (!_processQueueMap.ContainsKey(assignment))
-            {
-                return;
-            }
-
-            ProcessQueue processQueue;
-            if (_processQueueMap.Remove(assignment, out processQueue))
-            {
-                processQueue.Dropped = true;
-            }
-        }
-
-        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest 
request)
-        {
-        }
-
-        public void Subscribe(string topic, string expression, ExpressionType 
type)
-        {
-            var filterExpression = new FilterExpression(expression, type);
-            _topicFilterExpressionMap[topic] = filterExpression;
-
-        }
-
-        public void RegisterListener(IMessageListener listener)
-        {
-            if (null != listener)
-            {
-                _messageListener = listener;
-            }
-        }
-
-        private string _group;
-
-        private ConcurrentDictionary<string, FilterExpression> 
_topicFilterExpressionMap;
-        private IMessageListener _messageListener;
-
-        private CancellationTokenSource _scanAssignmentCTS;
-
-        private ConcurrentDictionary<string, List<rmq::Assignment>> 
_topicAssignmentsMap;
-
-        private ConcurrentDictionary<rmq::Assignment, ProcessQueue> 
_processQueueMap;
-
-        private CancellationTokenSource _scanExpiredProcessQueueCTS;
-
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 154efa0..d4694ac 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -30,9 +30,8 @@ namespace Org.Apache.Rocketmq
     public class SimpleConsumer : Client
     {
 
-        public SimpleConsumer(AccessPoint accessPoint,
-        string resourceNamespace, string group)
-        : base(accessPoint, resourceNamespace)
+        public SimpleConsumer(string accessUrl, string group)
+        : base(accessUrl)
         {
             _fifo = false;
             _subscriptions = new ConcurrentDictionary<string, 
rmq.SubscriptionEntry>();
@@ -61,7 +60,7 @@ namespace Org.Apache.Rocketmq
             await base.Start();
             
             // Scan load assignment periodically
-            schedule(async () =>
+            Schedule(async () =>
             {
                 while (!_scanAssignmentCts.IsCancellationRequested)
                 {
@@ -100,13 +99,13 @@ namespace Org.Apache.Rocketmq
                 request.Endpoints = new rmq::Endpoints();
                 request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
                 var address = new rmq::Address();
-                address.Host = _accessPoint.Host;
-                address.Port = _accessPoint.Port;
+                address.Host = AccessPoint.Host;
+                address.Port = AccessPoint.Port;
                 request.Endpoints.Addresses.Add(address);
 
                 var metadata = new Metadata();
                 Signature.sign(this, metadata);
-                
tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, 
request, TimeSpan.FromSeconds(3)));
+                
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, 
request, TimeSpan.FromSeconds(3)));
             }
 
             List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
@@ -184,7 +183,7 @@ namespace Org.Apache.Rocketmq
             var metadata = new Metadata();
             Signature.sign(this, metadata);
             
-            return await Manager.ReceiveMessage(targetUrl, metadata, request, 
timeout);
+            return await _manager.ReceiveMessage(targetUrl, metadata, request, 
timeout);
         }
 
 
@@ -207,7 +206,7 @@ namespace Org.Apache.Rocketmq
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
             Signature.sign(this, metadata);
-            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+            await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
         }
 
         public async Task ChangeInvisibleDuration(Message message, TimeSpan 
invisibleDuration)
@@ -229,7 +228,7 @@ namespace Org.Apache.Rocketmq
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
             Signature.sign(this, metadata);
-            await Manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
+            await _manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
         }
         
         private rmq.MessageQueue NextQueue()
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj 
b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index baf103f..e2e1844 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -40,4 +40,8 @@
     </None>
   </ItemGroup>
 
+  <ItemGroup>
+    <Compile Remove="ClientManagerFactory.cs" />
+  </ItemGroup>
+
 </Project>
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index 663980a..b5094a2 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -47,7 +47,7 @@ namespace tests
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -57,7 +57,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendStandardMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -82,7 +82,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendMultipleMessages()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -109,7 +109,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendFifoMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -131,7 +131,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendScheduledMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -154,7 +154,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendMessage_Failure()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -170,14 +170,12 @@ namespace tests
                 await producer.Send(msg);
                 Assert.Fail("Should have raised an exception");
             }
-            catch (MessageException e)
+            catch (MessageException)
             {
             }
             await producer.Shutdown();
         }
-
-        private static string resourceNamespace = "";
-
+        
         private static string topic = "cpp_sdk_standard";
 
         private static string HOST = "127.0.0.1";
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
deleted file mode 100644
index 78f01de..0000000
--- a/csharp/tests/PushConsumerTest.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-using System;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-
-    public class TestMessageListener : IMessageListener
-    {
-        public Task Consume(List<Message> messages, List<Message> failed)
-        {
-            foreach (var message in messages)
-            {
-                Console.WriteLine("");
-            }
-
-            return Task.CompletedTask;
-        }
-    }
-
-    public class CountableMessageListener : IMessageListener
-    {
-        public Task Consume(List<Message> messages, List<Message> failed)
-        {
-            foreach (var message in messages)
-            {
-                Console.WriteLine("{}", message.MessageId);
-            }
-
-            return Task.CompletedTask;
-        }
-    }
-
-    [TestClass]
-    public class PushConsumerTest
-    {
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            credentialsProvider = new ConfigFileCredentialsProvider();
-
-        }
-
-        [ClassCleanup]
-        public static void TearDown()
-        {
-
-        }
-
-        [TestInitialize]
-        public void SetUp()
-        {
-            accessPoint = new AccessPoint();
-            accessPoint.Host = host;
-            accessPoint.Port = port;
-        }
-
-        [TestMethod]
-        public void testLifecycle()
-        {
-            var consumer = new PushConsumer(accessPoint, resourceNamespace, 
group);
-            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            consumer.Region = "cn-hangzhou-pre";
-            consumer.Subscribe(topic, "*", ExpressionType.TAG);
-            consumer.RegisterListener(new TestMessageListener());
-            consumer.Start();
-
-            consumer.Shutdown();
-        }
-
-
-        // [Ignore]
-        [TestMethod]
-        public void testConsumeMessage()
-        {
-            var consumer = new PushConsumer(accessPoint, resourceNamespace, 
group);
-            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            consumer.Region = "cn-hangzhou-pre";
-            consumer.Subscribe(topic, "*", ExpressionType.TAG);
-            consumer.RegisterListener(new CountableMessageListener());
-            consumer.Start();
-            System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
-            consumer.Shutdown();
-        }
-
-
-        private static string resourceNamespace = 
"MQ_INST_1080056302921134_BXuIbML7";
-
-        private static string topic = "cpp_sdk_standard";
-
-        private static string group = "GID_cpp_sdk_standard";
-
-        private static ICredentialsProvider credentialsProvider;
-        private static string host = "116.62.231.199";
-        private static int port = 80;
-
-        private AccessPoint accessPoint;
-
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs 
b/csharp/tests/SimpleConsumerTest.cs
index c986614..e5fc8f0 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -20,7 +20,6 @@ using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
 using System.Threading.Tasks;
-using Castle.Core.Logging;
 using Org.Apache.Rocketmq;
 
 namespace tests
@@ -30,26 +29,16 @@ namespace tests
     public class SimpleConsumerTest
     {
 
-        private static AccessPoint accessPoint;
-        private static string _resourceNamespace = "";
         private static string _group = "GID_cpp_sdk_standard";
         private static string _topic = "cpp_sdk_standard";
-
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            accessPoint = new AccessPoint
-            {
-                Host = "127.0.0.1",
-                Port = 8081
-            };
-        }
+        private const string HOST = "127.0.0.1";
+        private const int PORT = 8081;
+        
 
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             Thread.Sleep(1_000);
@@ -59,7 +48,7 @@ namespace tests
         [TestMethod]
         public async Task TestReceive()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;
@@ -74,7 +63,7 @@ namespace tests
         [TestMethod]
         public async Task TestAck()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;
@@ -91,7 +80,7 @@ namespace tests
         [TestMethod]
         public async Task TestChangeInvisibleDuration()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;

Reply via email to