This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/csharp_dev by this push:
     new 761fe37  Remove unused code
761fe37 is described below

commit 761fe37acf502ec279f63cdf96aaec5f2cfffb8b
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 20:30:00 2022 +0800

    Remove unused code
---
 csharp/examples/Program.cs                         |   3 +-
 csharp/rocketmq-client-csharp/AccessPoint.cs       |  20 ++
 csharp/rocketmq-client-csharp/Client.cs            |  35 ++-
 .../rocketmq-client-csharp/ClientManagerFactory.cs |  45 ----
 csharp/rocketmq-client-csharp/Producer.cs          |   4 +-
 csharp/rocketmq-client-csharp/PushConsumer.cs      | 261 ---------------------
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    |  13 +-
 .../rocketmq-client-csharp.csproj                  |   4 +
 csharp/tests/ProducerTest.cs                       |  18 +-
 csharp/tests/PushConsumerTest.cs                   | 119 ----------
 csharp/tests/SimpleConsumerTest.cs                 |  25 +-
 11 files changed, 65 insertions(+), 482 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 9eb0dd7..9f01e29 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -28,8 +28,7 @@ namespace examples
             string accessUrl = 
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
             var topic = "sdk_standard";
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            var accessPoint = new AccessPoint(accessUrl);
-            var producer = new Producer(accessPoint, "");
+            var producer = new Producer(accessUrl);
             producer.CredentialsProvider = credentialsProvider;
             producer.AddTopicOfInterest(topic);
             
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs 
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index f4ba47c..ab29273 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -16,6 +16,9 @@
  */
 
 using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Net;
+using System.Net.Sockets;
 
 namespace Org.Apache.Rocketmq
 {
@@ -58,5 +61,22 @@ namespace Org.Apache.Rocketmq
         {
             return $"https://{_host}:{_port}";;
         }
+
+        public rmq::AddressScheme HostScheme()
+        {
+            IPAddress ip;
+            bool result = IPAddress.TryParse(_host, out ip);
+            if (!result)
+            {
+                return rmq::AddressScheme.DomainName;
+            }
+
+            return ip.AddressFamily switch
+            {
+                AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4,
+                AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6,
+                _ => rmq::AddressScheme.Unspecified
+            };
+        }
     }
 }
diff --git a/csharp/rocketmq-client-csharp/Client.cs 
b/csharp/rocketmq-client-csharp/Client.cs
index c7abc12..0fe4ec9 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -32,18 +32,17 @@ namespace Org.Apache.Rocketmq
     {
         protected static readonly Logger Logger = 
MqLogManager.Instance.GetCurrentClassLogger();
 
-        protected Client(AccessPoint accessPoint, string resourceNamespace)
+        protected Client(string accessUrl)
         {
-            AccessPoint = accessPoint;
+            AccessPoint = new AccessPoint(accessUrl);
 
-            // Support IPv4 for now
-            AccessPointScheme = rmq::AddressScheme.Ipv4;
+            AccessPointScheme = AccessPoint.HostScheme();
             var serviceEndpoint = new rmq::Address();
-            serviceEndpoint.Host = accessPoint.Host;
-            serviceEndpoint.Port = accessPoint.Port;
+            serviceEndpoint.Host = AccessPoint.Host;
+            serviceEndpoint.Port = AccessPoint.Port;
             AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
 
-            _resourceNamespace = resourceNamespace;
+            _resourceNamespace = "";
 
             ClientSettings = new rmq::Settings();
 
@@ -59,7 +58,7 @@ namespace Org.Apache.Rocketmq
             ClientSettings.UserAgent.Platform = 
Environment.OSVersion.ToString();
             ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
 
-            Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+            _manager = new ClientManager();
 
             _topicRouteTable = new ConcurrentDictionary<string, 
TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
@@ -93,7 +92,7 @@ namespace Org.Apache.Rocketmq
             Logger.Info($"Shutdown 
client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
             _telemetryCts.Cancel();
-            await Manager.Shutdown();
+            await _manager.Shutdown();
         }
 
         protected string FilterBroker(Func<string, bool> acceptor)
@@ -243,7 +242,7 @@ namespace Org.Apache.Rocketmq
             try
             {
                 Logger.Debug($"Resolving route for topic={topic}");
-                topicRouteData = await Manager.ResolveRoute(target, metadata, 
request, RequestTimeout);
+                topicRouteData = await _manager.ResolveRoute(target, metadata, 
request, RequestTimeout);
                 if (null != topicRouteData)
                 {
                     Logger.Debug($"Got route entries for {topic} from name 
server");
@@ -283,7 +282,7 @@ namespace Org.Apache.Rocketmq
             List<Task> tasks = new List<Task>();
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
+                tasks.Add(_manager.Heartbeat(endpoint, metadata, request, 
RequestTimeout));
             }
 
             await Task.WhenAll(tasks);
@@ -321,7 +320,7 @@ namespace Org.Apache.Rocketmq
             {
                 var metadata = new grpc::Metadata();
                 Signature.sign(this, metadata);
-                return await Manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
+                return await _manager.QueryLoadAssignment(target, metadata, 
request, RequestTimeout);
             }
             catch (System.Exception e)
             {
@@ -349,7 +348,7 @@ namespace Org.Apache.Rocketmq
         {
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            var stream = Manager.Telemetry(url, metadata);
+            var stream = _manager.Telemetry(url, metadata);
             var session = new Session(url, stream, this);
             _sessions.TryAdd(url, session);
             Task.Run(async () =>
@@ -369,7 +368,7 @@ namespace Org.Apache.Rocketmq
             request.Group.ResourceNamespace = _resourceNamespace;
             request.Group.Name = group;
             request.MessageQueue = assignment.MessageQueue;
-            var messages = await Manager.ReceiveMessage(targetUrl, metadata, 
request, getLongPollingTimeout());
+            var messages = await _manager.ReceiveMessage(targetUrl, metadata, 
request, getLongPollingTimeout());
             return messages;
         }
 
@@ -391,7 +390,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.Ack(target, metadata, request, 
RequestTimeout);
+            return await _manager.Ack(target, metadata, request, 
RequestTimeout);
         }
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, 
string group, string topic, string receiptHandle, String messageId)
@@ -410,7 +409,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
+            return await _manager.ChangeInvisibleDuration(target, metadata, 
request, RequestTimeout);
         }
 
         public async Task<bool> NotifyClientTermination()
@@ -426,7 +425,7 @@ namespace Org.Apache.Rocketmq
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
+                tasks.Add(_manager.NotifyClientTermination(endpoint, metadata, 
request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -455,7 +454,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        protected readonly IClientManager Manager;
+        protected readonly IClientManager _manager;
 
         private readonly HashSet<string> _topicsOfInterest = new 
HashSet<string>();
 
diff --git a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs 
b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
deleted file mode 100644
index 9d03994..0000000
--- a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace Org.Apache.Rocketmq
-{
-    public sealed class ClientManagerFactory
-    {
-        public static IClientManager getClientManager(string resourceNamespace)
-        {
-            if (clientManagers.ContainsKey(resourceNamespace))
-            {
-                return clientManagers[resourceNamespace];
-            }
-
-            var clientManager = new ClientManager();
-            // TODO: configure client managers.
-            if (clientManagers.TryAdd<string, 
IClientManager>(resourceNamespace, clientManager))
-            {
-                return clientManager;
-            }
-
-            return clientManagers[resourceNamespace];
-        }
-
-        private static ConcurrentDictionary<string, IClientManager> 
clientManagers = new ConcurrentDictionary<string, IClientManager>();
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs 
b/csharp/rocketmq-client-csharp/Producer.cs
index 333fc91..e337b1a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
 {
     public class Producer : Client, IProducer
     {
-        public Producer(AccessPoint accessPoint, string resourceNamespace) : 
base(accessPoint, resourceNamespace)
+        public Producer(string accessUrl) : base(accessUrl)
         {
             _loadBalancer = new ConcurrentDictionary<string, 
PublishLoadBalancer>();
             _sendFailureTotal = 
MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
                 {
                     var stopWatch = new Stopwatch();
                     stopWatch.Start();
-                    rmq::SendMessageResponse response = await 
Manager.SendMessage(target, metadata, request, RequestTimeout);
+                    rmq::SendMessageResponse response = await 
_manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == 
response.Status.Code)
                     {
                         var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs 
b/csharp/rocketmq-client-csharp/PushConsumer.cs
deleted file mode 100644
index 30c3c8f..0000000
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-    public class PushConsumer : Client, IConsumer
-    {
-        public PushConsumer(AccessPoint accessPoint, string resourceNamespace, 
string group) : base(accessPoint, resourceNamespace)
-        {
-            _group = group;
-            _topicFilterExpressionMap = new ConcurrentDictionary<string, 
FilterExpression>();
-            _topicAssignmentsMap = new ConcurrentDictionary<string, 
List<rmq::Assignment>>();
-            _processQueueMap = new ConcurrentDictionary<rmq::Assignment, 
ProcessQueue>();
-            _scanAssignmentCTS = new CancellationTokenSource();
-            _scanExpiredProcessQueueCTS = new CancellationTokenSource();
-        }
-
-        public override async Task Start()
-        {
-            if (null == _messageListener)
-            {
-                throw new System.Exception("Bad configuration: message 
listener is required");
-            }
-
-            await base.Start();
-
-            // Step-1: Resolve topic routes
-            List<Task<TopicRouteData>> queryRouteTasks = new 
List<Task<TopicRouteData>>();
-            foreach (var item in _topicFilterExpressionMap)
-            {
-                queryRouteTasks.Add(GetRouteFor(item.Key, true));
-            }
-            Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
-
-            // Step-2: Send heartbeats to all involving brokers so that we may 
get immediate, valid assignments.
-            await Heartbeat();
-
-            // Step-3: Scan load assignments that are assigned to current 
client
-            Schedule(async () =>
-            {
-                await scanLoadAssignments();
-            }, 10, _scanAssignmentCTS.Token);
-
-            Schedule(() =>
-            {
-                ScanExpiredProcessQueue();
-            }, 10, _scanExpiredProcessQueueCTS.Token);
-        }
-
-        public override async Task Shutdown()
-        {
-            _scanAssignmentCTS.Cancel();
-            _scanExpiredProcessQueueCTS.Cancel();
-
-            // Shutdown resources of derived class
-            await base.Shutdown();
-        }
-
-        private async Task scanLoadAssignments()
-        {
-            Logger.Debug("Start to scan load assignments from server");
-            List<Task<List<rmq::Assignment>>> tasks = new 
List<Task<List<rmq::Assignment>>>();
-            foreach (var item in _topicFilterExpressionMap)
-            {
-                tasks.Add(ScanLoadAssignment(item.Key, _group));
-            }
-            var result = await Task.WhenAll(tasks);
-
-            foreach (var assignments in result)
-            {
-                if (assignments.Count == 0)
-                {
-                    continue;
-                }
-
-                checkAndUpdateAssignments(assignments);
-            }
-            Logger.Debug("Completed scanning load assignments");
-        }
-
-        private void ScanExpiredProcessQueue()
-        {
-            foreach (var item in _processQueueMap)
-            {
-                if (item.Value.Expired())
-                {
-                    Task.Run(async () =>
-                    {
-                        await ExecutePop0(item.Key);
-                    });
-                }
-            }
-        }
-
-        private void checkAndUpdateAssignments(List<rmq::Assignment> 
assignments)
-        {
-            if (assignments.Count == 0)
-            {
-                return;
-            }
-
-            string topic = assignments[0].MessageQueue.Topic.Name;
-
-            // Compare to generate or cancel pop-cycles
-            List<rmq::Assignment> existing;
-            _topicAssignmentsMap.TryGetValue(topic, out existing);
-
-            foreach (var assignment in assignments)
-            {
-                if (null == existing || !existing.Contains(assignment))
-                {
-                    ExecutePop(assignment);
-                }
-            }
-
-            if (null != existing)
-            {
-                foreach (var assignment in existing)
-                {
-                    if (!assignments.Contains(assignment))
-                    {
-                        Logger.Info($"Stop receiving messages from 
{assignment.MessageQueue.ToString()}");
-                        CancelPop(assignment);
-                    }
-                }
-            }
-
-        }
-
-        private void ExecutePop(rmq::Assignment assignment)
-        {
-            var processQueue = new ProcessQueue();
-            if (_processQueueMap.TryAdd(assignment, processQueue))
-            {
-                Task.Run(async () =>
-                {
-                    await ExecutePop0(assignment);
-                });
-            }
-        }
-
-        private async Task ExecutePop0(rmq::Assignment assignment)
-        {
-            Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
-            while (true)
-            {
-                try
-                {
-                    ProcessQueue processQueue;
-                    if (!_processQueueMap.TryGetValue(assignment, out 
processQueue))
-                    {
-                        break;
-                    }
-
-                    if (processQueue.Dropped)
-                    {
-                        break;
-                    }
-
-                    List<Message> messages = await 
base.ReceiveMessage(assignment, _group);
-                    processQueue.LastReceiveTime = System.DateTime.UtcNow;
-
-                    // TODO: cache message and dispatch them 
-
-                    List<Message> failed = new List<Message>();
-                    await _messageListener.Consume(messages, failed);
-
-                    foreach (var message in failed)
-                    {
-                        await 
base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, 
message._receiptHandle, message.MessageId);
-                    }
-
-                    foreach (var message in messages)
-                    {
-                        if (!failed.Contains(message))
-                        {
-                            bool success = await base.Ack(message._sourceHost, 
_group, message.Topic, message._receiptHandle, message.MessageId);
-                            if (!success)
-                            {
-                                //TODO: log error.
-                            }
-                        }
-                    }
-                }
-                catch (System.Exception)
-                {
-                    // TODO: log exception raised.
-                }
-
-
-            }
-        }
-
-        private void CancelPop(rmq::Assignment assignment)
-        {
-            if (!_processQueueMap.ContainsKey(assignment))
-            {
-                return;
-            }
-
-            ProcessQueue processQueue;
-            if (_processQueueMap.Remove(assignment, out processQueue))
-            {
-                processQueue.Dropped = true;
-            }
-        }
-
-        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest 
request)
-        {
-        }
-
-        public void Subscribe(string topic, string expression, ExpressionType 
type)
-        {
-            var filterExpression = new FilterExpression(expression, type);
-            _topicFilterExpressionMap[topic] = filterExpression;
-
-        }
-
-        public void RegisterListener(IMessageListener listener)
-        {
-            if (null != listener)
-            {
-                _messageListener = listener;
-            }
-        }
-
-        private string _group;
-
-        private ConcurrentDictionary<string, FilterExpression> 
_topicFilterExpressionMap;
-        private IMessageListener _messageListener;
-
-        private CancellationTokenSource _scanAssignmentCTS;
-
-        private ConcurrentDictionary<string, List<rmq::Assignment>> 
_topicAssignmentsMap;
-
-        private ConcurrentDictionary<rmq::Assignment, ProcessQueue> 
_processQueueMap;
-
-        private CancellationTokenSource _scanExpiredProcessQueueCTS;
-
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs 
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index a0077ff..d4694ac 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -30,9 +30,8 @@ namespace Org.Apache.Rocketmq
     public class SimpleConsumer : Client
     {
 
-        public SimpleConsumer(AccessPoint accessPoint,
-        string resourceNamespace, string group)
-        : base(accessPoint, resourceNamespace)
+        public SimpleConsumer(string accessUrl, string group)
+        : base(accessUrl)
         {
             _fifo = false;
             _subscriptions = new ConcurrentDictionary<string, 
rmq.SubscriptionEntry>();
@@ -106,7 +105,7 @@ namespace Org.Apache.Rocketmq
 
                 var metadata = new Metadata();
                 Signature.sign(this, metadata);
-                tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), 
metadata, request, TimeSpan.FromSeconds(3)));
+                
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, 
request, TimeSpan.FromSeconds(3)));
             }
 
             List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
@@ -184,7 +183,7 @@ namespace Org.Apache.Rocketmq
             var metadata = new Metadata();
             Signature.sign(this, metadata);
             
-            return await Manager.ReceiveMessage(targetUrl, metadata, request, 
timeout);
+            return await _manager.ReceiveMessage(targetUrl, metadata, request, 
timeout);
         }
 
 
@@ -207,7 +206,7 @@ namespace Org.Apache.Rocketmq
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
             Signature.sign(this, metadata);
-            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+            await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
         }
 
         public async Task ChangeInvisibleDuration(Message message, TimeSpan 
invisibleDuration)
@@ -229,7 +228,7 @@ namespace Org.Apache.Rocketmq
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
             Signature.sign(this, metadata);
-            await Manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
+            await _manager.ChangeInvisibleDuration(targetUrl, metadata, 
request, RequestTimeout);
         }
         
         private rmq.MessageQueue NextQueue()
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj 
b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index baf103f..e2e1844 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -40,4 +40,8 @@
     </None>
   </ItemGroup>
 
+  <ItemGroup>
+    <Compile Remove="ClientManagerFactory.cs" />
+  </ItemGroup>
+
 </Project>
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index 663980a..b5094a2 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -47,7 +47,7 @@ namespace tests
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -57,7 +57,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendStandardMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -82,7 +82,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendMultipleMessages()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -109,7 +109,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendFifoMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -131,7 +131,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendScheduledMessage()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -154,7 +154,7 @@ namespace tests
         [TestMethod]
         public async Task TestSendMessage_Failure()
         {
-            var producer = new Producer(_accessPoint, resourceNamespace);
+            var producer = new Producer($"{HOST}:{PORT}");
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
@@ -170,14 +170,12 @@ namespace tests
                 await producer.Send(msg);
                 Assert.Fail("Should have raised an exception");
             }
-            catch (MessageException e)
+            catch (MessageException)
             {
             }
             await producer.Shutdown();
         }
-
-        private static string resourceNamespace = "";
-
+        
         private static string topic = "cpp_sdk_standard";
 
         private static string HOST = "127.0.0.1";
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
deleted file mode 100644
index 78f01de..0000000
--- a/csharp/tests/PushConsumerTest.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-using System;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-
-    public class TestMessageListener : IMessageListener
-    {
-        public Task Consume(List<Message> messages, List<Message> failed)
-        {
-            foreach (var message in messages)
-            {
-                Console.WriteLine("");
-            }
-
-            return Task.CompletedTask;
-        }
-    }
-
-    public class CountableMessageListener : IMessageListener
-    {
-        public Task Consume(List<Message> messages, List<Message> failed)
-        {
-            foreach (var message in messages)
-            {
-                Console.WriteLine("{}", message.MessageId);
-            }
-
-            return Task.CompletedTask;
-        }
-    }
-
-    [TestClass]
-    public class PushConsumerTest
-    {
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            credentialsProvider = new ConfigFileCredentialsProvider();
-
-        }
-
-        [ClassCleanup]
-        public static void TearDown()
-        {
-
-        }
-
-        [TestInitialize]
-        public void SetUp()
-        {
-            accessPoint = new AccessPoint();
-            accessPoint.Host = host;
-            accessPoint.Port = port;
-        }
-
-        [TestMethod]
-        public void testLifecycle()
-        {
-            var consumer = new PushConsumer(accessPoint, resourceNamespace, 
group);
-            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            consumer.Region = "cn-hangzhou-pre";
-            consumer.Subscribe(topic, "*", ExpressionType.TAG);
-            consumer.RegisterListener(new TestMessageListener());
-            consumer.Start();
-
-            consumer.Shutdown();
-        }
-
-
-        // [Ignore]
-        [TestMethod]
-        public void testConsumeMessage()
-        {
-            var consumer = new PushConsumer(accessPoint, resourceNamespace, 
group);
-            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            consumer.Region = "cn-hangzhou-pre";
-            consumer.Subscribe(topic, "*", ExpressionType.TAG);
-            consumer.RegisterListener(new CountableMessageListener());
-            consumer.Start();
-            System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
-            consumer.Shutdown();
-        }
-
-
-        private static string resourceNamespace = 
"MQ_INST_1080056302921134_BXuIbML7";
-
-        private static string topic = "cpp_sdk_standard";
-
-        private static string group = "GID_cpp_sdk_standard";
-
-        private static ICredentialsProvider credentialsProvider;
-        private static string host = "116.62.231.199";
-        private static int port = 80;
-
-        private AccessPoint accessPoint;
-
-    }
-
-}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs 
b/csharp/tests/SimpleConsumerTest.cs
index c986614..e5fc8f0 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -20,7 +20,6 @@ using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
 using System.Threading.Tasks;
-using Castle.Core.Logging;
 using Org.Apache.Rocketmq;
 
 namespace tests
@@ -30,26 +29,16 @@ namespace tests
     public class SimpleConsumerTest
     {
 
-        private static AccessPoint accessPoint;
-        private static string _resourceNamespace = "";
         private static string _group = "GID_cpp_sdk_standard";
         private static string _topic = "cpp_sdk_standard";
-
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            accessPoint = new AccessPoint
-            {
-                Host = "127.0.0.1",
-                Port = 8081
-            };
-        }
+        private const string HOST = "127.0.0.1";
+        private const int PORT = 8081;
+        
 
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             Thread.Sleep(1_000);
@@ -59,7 +48,7 @@ namespace tests
         [TestMethod]
         public async Task TestReceive()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;
@@ -74,7 +63,7 @@ namespace tests
         [TestMethod]
         public async Task TestAck()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;
@@ -91,7 +80,7 @@ namespace tests
         [TestMethod]
         public async Task TestChangeInvisibleDuration()
         {
-            var simpleConsumer = new SimpleConsumer(accessPoint, 
_resourceNamespace, _group);
+            var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
             simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             var batchSize = 32;

Reply via email to