This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/csharp_dev by this push:
new 761fe37 Remove unused code
761fe37 is described below
commit 761fe37acf502ec279f63cdf96aaec5f2cfffb8b
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 20:30:00 2022 +0800
Remove unused code
---
csharp/examples/Program.cs | 3 +-
csharp/rocketmq-client-csharp/AccessPoint.cs | 20 ++
csharp/rocketmq-client-csharp/Client.cs | 35 ++-
.../rocketmq-client-csharp/ClientManagerFactory.cs | 45 ----
csharp/rocketmq-client-csharp/Producer.cs | 4 +-
csharp/rocketmq-client-csharp/PushConsumer.cs | 261 ---------------------
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 13 +-
.../rocketmq-client-csharp.csproj | 4 +
csharp/tests/ProducerTest.cs | 18 +-
csharp/tests/PushConsumerTest.cs | 119 ----------
csharp/tests/SimpleConsumerTest.cs | 25 +-
11 files changed, 65 insertions(+), 482 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 9eb0dd7..9f01e29 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -28,8 +28,7 @@ namespace examples
string accessUrl =
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
var topic = "sdk_standard";
var credentialsProvider = new ConfigFileCredentialsProvider();
- var accessPoint = new AccessPoint(accessUrl);
- var producer = new Producer(accessPoint, "");
+ var producer = new Producer(accessUrl);
producer.CredentialsProvider = credentialsProvider;
producer.AddTopicOfInterest(topic);
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index f4ba47c..ab29273 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -16,6 +16,9 @@
*/
using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Net;
+using System.Net.Sockets;
namespace Org.Apache.Rocketmq
{
@@ -58,5 +61,22 @@ namespace Org.Apache.Rocketmq
{
return $"https://{_host}:{_port}";
}
+
+ public rmq::AddressScheme HostScheme()
+ {
+ IPAddress ip;
+ bool result = IPAddress.TryParse(_host, out ip);
+ if (!result)
+ {
+ return rmq::AddressScheme.DomainName;
+ }
+
+ return ip.AddressFamily switch
+ {
+ AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4,
+ AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6,
+ _ => rmq::AddressScheme.Unspecified
+ };
+ }
}
}
diff --git a/csharp/rocketmq-client-csharp/Client.cs
b/csharp/rocketmq-client-csharp/Client.cs
index c7abc12..0fe4ec9 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -32,18 +32,17 @@ namespace Org.Apache.Rocketmq
{
protected static readonly Logger Logger =
MqLogManager.Instance.GetCurrentClassLogger();
- protected Client(AccessPoint accessPoint, string resourceNamespace)
+ protected Client(string accessUrl)
{
- AccessPoint = accessPoint;
+ AccessPoint = new AccessPoint(accessUrl);
- // Support IPv4 for now
- AccessPointScheme = rmq::AddressScheme.Ipv4;
+ AccessPointScheme = AccessPoint.HostScheme();
var serviceEndpoint = new rmq::Address();
- serviceEndpoint.Host = accessPoint.Host;
- serviceEndpoint.Port = accessPoint.Port;
+ serviceEndpoint.Host = AccessPoint.Host;
+ serviceEndpoint.Port = AccessPoint.Port;
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
- _resourceNamespace = resourceNamespace;
+ _resourceNamespace = "";
ClientSettings = new rmq::Settings();
@@ -59,7 +58,7 @@ namespace Org.Apache.Rocketmq
ClientSettings.UserAgent.Platform =
Environment.OSVersion.ToString();
ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
- Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+ _manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string,
TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
@@ -93,7 +92,7 @@ namespace Org.Apache.Rocketmq
Logger.Info($"Shutdown
client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
_telemetryCts.Cancel();
- await Manager.Shutdown();
+ await _manager.Shutdown();
}
protected string FilterBroker(Func<string, bool> acceptor)
@@ -243,7 +242,7 @@ namespace Org.Apache.Rocketmq
try
{
Logger.Debug($"Resolving route for topic={topic}");
- topicRouteData = await Manager.ResolveRoute(target, metadata,
request, RequestTimeout);
+ topicRouteData = await _manager.ResolveRoute(target, metadata,
request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name
server");
@@ -283,7 +282,7 @@ namespace Org.Apache.Rocketmq
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
+ tasks.Add(_manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
}
await Task.WhenAll(tasks);
@@ -321,7 +320,7 @@ namespace Org.Apache.Rocketmq
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
+ return await _manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
}
catch (System.Exception e)
{
@@ -349,7 +348,7 @@ namespace Org.Apache.Rocketmq
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- var stream = Manager.Telemetry(url, metadata);
+ var stream = _manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
Task.Run(async () =>
@@ -369,7 +368,7 @@ namespace Org.Apache.Rocketmq
request.Group.ResourceNamespace = _resourceNamespace;
request.Group.Name = group;
request.MessageQueue = assignment.MessageQueue;
- var messages = await Manager.ReceiveMessage(targetUrl, metadata,
request, getLongPollingTimeout());
+ var messages = await _manager.ReceiveMessage(targetUrl, metadata,
request, getLongPollingTimeout());
return messages;
}
@@ -391,7 +390,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.Ack(target, metadata, request,
RequestTimeout);
+ return await _manager.Ack(target, metadata, request,
RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target,
string group, string topic, string receiptHandle, String messageId)
@@ -410,7 +409,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
+ return await _manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
}
public async Task<bool> NotifyClientTermination()
@@ -426,7 +425,7 @@ namespace Org.Apache.Rocketmq
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
+ tasks.Add(_manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
@@ -455,7 +454,7 @@ namespace Org.Apache.Rocketmq
}
}
- protected readonly IClientManager Manager;
+ protected readonly IClientManager _manager;
private readonly HashSet<string> _topicsOfInterest = new
HashSet<string>();
diff --git a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
deleted file mode 100644
index 9d03994..0000000
--- a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace Org.Apache.Rocketmq
-{
- public sealed class ClientManagerFactory
- {
- public static IClientManager getClientManager(string resourceNamespace)
- {
- if (clientManagers.ContainsKey(resourceNamespace))
- {
- return clientManagers[resourceNamespace];
- }
-
- var clientManager = new ClientManager();
- // TODO: configure client managers.
- if (clientManagers.TryAdd<string,
IClientManager>(resourceNamespace, clientManager))
- {
- return clientManager;
- }
-
- return clientManagers[resourceNamespace];
- }
-
- private static ConcurrentDictionary<string, IClientManager>
clientManagers = new ConcurrentDictionary<string, IClientManager>();
- }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs
b/csharp/rocketmq-client-csharp/Producer.cs
index 333fc91..e337b1a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
{
public class Producer : Client, IProducer
{
- public Producer(AccessPoint accessPoint, string resourceNamespace) :
base(accessPoint, resourceNamespace)
+ public Producer(string accessUrl) : base(accessUrl)
{
_loadBalancer = new ConcurrentDictionary<string,
PublishLoadBalancer>();
_sendFailureTotal =
MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
{
var stopWatch = new Stopwatch();
stopWatch.Start();
- rmq::SendMessageResponse response = await
Manager.SendMessage(target, metadata, request, RequestTimeout);
+ rmq::SendMessageResponse response = await
_manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok ==
response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs
b/csharp/rocketmq-client-csharp/PushConsumer.cs
deleted file mode 100644
index 30c3c8f..0000000
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
- public class PushConsumer : Client, IConsumer
- {
- public PushConsumer(AccessPoint accessPoint, string resourceNamespace,
string group) : base(accessPoint, resourceNamespace)
- {
- _group = group;
- _topicFilterExpressionMap = new ConcurrentDictionary<string,
FilterExpression>();
- _topicAssignmentsMap = new ConcurrentDictionary<string,
List<rmq::Assignment>>();
- _processQueueMap = new ConcurrentDictionary<rmq::Assignment,
ProcessQueue>();
- _scanAssignmentCTS = new CancellationTokenSource();
- _scanExpiredProcessQueueCTS = new CancellationTokenSource();
- }
-
- public override async Task Start()
- {
- if (null == _messageListener)
- {
- throw new System.Exception("Bad configuration: message
listener is required");
- }
-
- await base.Start();
-
- // Step-1: Resolve topic routes
- List<Task<TopicRouteData>> queryRouteTasks = new
List<Task<TopicRouteData>>();
- foreach (var item in _topicFilterExpressionMap)
- {
- queryRouteTasks.Add(GetRouteFor(item.Key, true));
- }
- Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
-
- // Step-2: Send heartbeats to all involving brokers so that we may
get immediate, valid assignments.
- await Heartbeat();
-
- // Step-3: Scan load assignments that are assigned to current
client
- Schedule(async () =>
- {
- await scanLoadAssignments();
- }, 10, _scanAssignmentCTS.Token);
-
- Schedule(() =>
- {
- ScanExpiredProcessQueue();
- }, 10, _scanExpiredProcessQueueCTS.Token);
- }
-
- public override async Task Shutdown()
- {
- _scanAssignmentCTS.Cancel();
- _scanExpiredProcessQueueCTS.Cancel();
-
- // Shutdown resources of derived class
- await base.Shutdown();
- }
-
- private async Task scanLoadAssignments()
- {
- Logger.Debug("Start to scan load assignments from server");
- List<Task<List<rmq::Assignment>>> tasks = new
List<Task<List<rmq::Assignment>>>();
- foreach (var item in _topicFilterExpressionMap)
- {
- tasks.Add(ScanLoadAssignment(item.Key, _group));
- }
- var result = await Task.WhenAll(tasks);
-
- foreach (var assignments in result)
- {
- if (assignments.Count == 0)
- {
- continue;
- }
-
- checkAndUpdateAssignments(assignments);
- }
- Logger.Debug("Completed scanning load assignments");
- }
-
- private void ScanExpiredProcessQueue()
- {
- foreach (var item in _processQueueMap)
- {
- if (item.Value.Expired())
- {
- Task.Run(async () =>
- {
- await ExecutePop0(item.Key);
- });
- }
- }
- }
-
- private void checkAndUpdateAssignments(List<rmq::Assignment>
assignments)
- {
- if (assignments.Count == 0)
- {
- return;
- }
-
- string topic = assignments[0].MessageQueue.Topic.Name;
-
- // Compare to generate or cancel pop-cycles
- List<rmq::Assignment> existing;
- _topicAssignmentsMap.TryGetValue(topic, out existing);
-
- foreach (var assignment in assignments)
- {
- if (null == existing || !existing.Contains(assignment))
- {
- ExecutePop(assignment);
- }
- }
-
- if (null != existing)
- {
- foreach (var assignment in existing)
- {
- if (!assignments.Contains(assignment))
- {
- Logger.Info($"Stop receiving messages from
{assignment.MessageQueue.ToString()}");
- CancelPop(assignment);
- }
- }
- }
-
- }
-
- private void ExecutePop(rmq::Assignment assignment)
- {
- var processQueue = new ProcessQueue();
- if (_processQueueMap.TryAdd(assignment, processQueue))
- {
- Task.Run(async () =>
- {
- await ExecutePop0(assignment);
- });
- }
- }
-
- private async Task ExecutePop0(rmq::Assignment assignment)
- {
- Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
- while (true)
- {
- try
- {
- ProcessQueue processQueue;
- if (!_processQueueMap.TryGetValue(assignment, out
processQueue))
- {
- break;
- }
-
- if (processQueue.Dropped)
- {
- break;
- }
-
- List<Message> messages = await
base.ReceiveMessage(assignment, _group);
- processQueue.LastReceiveTime = System.DateTime.UtcNow;
-
- // TODO: cache message and dispatch them
-
- List<Message> failed = new List<Message>();
- await _messageListener.Consume(messages, failed);
-
- foreach (var message in failed)
- {
- await
base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic,
message._receiptHandle, message.MessageId);
- }
-
- foreach (var message in messages)
- {
- if (!failed.Contains(message))
- {
- bool success = await base.Ack(message._sourceHost,
_group, message.Topic, message._receiptHandle, message.MessageId);
- if (!success)
- {
- //TODO: log error.
- }
- }
- }
- }
- catch (System.Exception)
- {
- // TODO: log exception raised.
- }
-
-
- }
- }
-
- private void CancelPop(rmq::Assignment assignment)
- {
- if (!_processQueueMap.ContainsKey(assignment))
- {
- return;
- }
-
- ProcessQueue processQueue;
- if (_processQueueMap.Remove(assignment, out processQueue))
- {
- processQueue.Dropped = true;
- }
- }
-
- protected override void PrepareHeartbeatData(rmq::HeartbeatRequest
request)
- {
- }
-
- public void Subscribe(string topic, string expression, ExpressionType
type)
- {
- var filterExpression = new FilterExpression(expression, type);
- _topicFilterExpressionMap[topic] = filterExpression;
-
- }
-
- public void RegisterListener(IMessageListener listener)
- {
- if (null != listener)
- {
- _messageListener = listener;
- }
- }
-
- private string _group;
-
- private ConcurrentDictionary<string, FilterExpression>
_topicFilterExpressionMap;
- private IMessageListener _messageListener;
-
- private CancellationTokenSource _scanAssignmentCTS;
-
- private ConcurrentDictionary<string, List<rmq::Assignment>>
_topicAssignmentsMap;
-
- private ConcurrentDictionary<rmq::Assignment, ProcessQueue>
_processQueueMap;
-
- private CancellationTokenSource _scanExpiredProcessQueueCTS;
-
- }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index a0077ff..d4694ac 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -30,9 +30,8 @@ namespace Org.Apache.Rocketmq
public class SimpleConsumer : Client
{
- public SimpleConsumer(AccessPoint accessPoint,
- string resourceNamespace, string group)
- : base(accessPoint, resourceNamespace)
+ public SimpleConsumer(string accessUrl, string group)
+ : base(accessUrl)
{
_fifo = false;
_subscriptions = new ConcurrentDictionary<string,
rmq.SubscriptionEntry>();
@@ -106,7 +105,7 @@ namespace Org.Apache.Rocketmq
var metadata = new Metadata();
Signature.sign(this, metadata);
- tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(),
metadata, request, TimeSpan.FromSeconds(3)));
+
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata,
request, TimeSpan.FromSeconds(3)));
}
List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
@@ -184,7 +183,7 @@ namespace Org.Apache.Rocketmq
var metadata = new Metadata();
Signature.sign(this, metadata);
- return await Manager.ReceiveMessage(targetUrl, metadata, request,
timeout);
+ return await _manager.ReceiveMessage(targetUrl, metadata, request,
timeout);
}
@@ -207,7 +206,7 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
- await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+ await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
}
public async Task ChangeInvisibleDuration(Message message, TimeSpan
invisibleDuration)
@@ -229,7 +228,7 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
- await Manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
+ await _manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
}
private rmq.MessageQueue NextQueue()
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index baf103f..e2e1844 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -40,4 +40,8 @@
</None>
</ItemGroup>
+ <ItemGroup>
+ <Compile Remove="ClientManagerFactory.cs" />
+ </ItemGroup>
+
</Project>
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index 663980a..b5094a2 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -47,7 +47,7 @@ namespace tests
[TestMethod]
public async Task TestLifecycle()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -57,7 +57,7 @@ namespace tests
[TestMethod]
public async Task TestSendStandardMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -82,7 +82,7 @@ namespace tests
[TestMethod]
public async Task TestSendMultipleMessages()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -109,7 +109,7 @@ namespace tests
[TestMethod]
public async Task TestSendFifoMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -131,7 +131,7 @@ namespace tests
[TestMethod]
public async Task TestSendScheduledMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -154,7 +154,7 @@ namespace tests
[TestMethod]
public async Task TestSendMessage_Failure()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -170,14 +170,12 @@ namespace tests
await producer.Send(msg);
Assert.Fail("Should have raised an exception");
}
- catch (MessageException e)
+ catch (MessageException)
{
}
await producer.Shutdown();
}
-
- private static string resourceNamespace = "";
-
+
private static string topic = "cpp_sdk_standard";
private static string HOST = "127.0.0.1";
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
deleted file mode 100644
index 78f01de..0000000
--- a/csharp/tests/PushConsumerTest.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-using System;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-
- public class TestMessageListener : IMessageListener
- {
- public Task Consume(List<Message> messages, List<Message> failed)
- {
- foreach (var message in messages)
- {
- Console.WriteLine("");
- }
-
- return Task.CompletedTask;
- }
- }
-
- public class CountableMessageListener : IMessageListener
- {
- public Task Consume(List<Message> messages, List<Message> failed)
- {
- foreach (var message in messages)
- {
- Console.WriteLine("{}", message.MessageId);
- }
-
- return Task.CompletedTask;
- }
- }
-
- [TestClass]
- public class PushConsumerTest
- {
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- credentialsProvider = new ConfigFileCredentialsProvider();
-
- }
-
- [ClassCleanup]
- public static void TearDown()
- {
-
- }
-
- [TestInitialize]
- public void SetUp()
- {
- accessPoint = new AccessPoint();
- accessPoint.Host = host;
- accessPoint.Port = port;
- }
-
- [TestMethod]
- public void testLifecycle()
- {
- var consumer = new PushConsumer(accessPoint, resourceNamespace,
group);
- consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
- consumer.Region = "cn-hangzhou-pre";
- consumer.Subscribe(topic, "*", ExpressionType.TAG);
- consumer.RegisterListener(new TestMessageListener());
- consumer.Start();
-
- consumer.Shutdown();
- }
-
-
- // [Ignore]
- [TestMethod]
- public void testConsumeMessage()
- {
- var consumer = new PushConsumer(accessPoint, resourceNamespace,
group);
- consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
- consumer.Region = "cn-hangzhou-pre";
- consumer.Subscribe(topic, "*", ExpressionType.TAG);
- consumer.RegisterListener(new CountableMessageListener());
- consumer.Start();
- System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
- consumer.Shutdown();
- }
-
-
- private static string resourceNamespace =
"MQ_INST_1080056302921134_BXuIbML7";
-
- private static string topic = "cpp_sdk_standard";
-
- private static string group = "GID_cpp_sdk_standard";
-
- private static ICredentialsProvider credentialsProvider;
- private static string host = "116.62.231.199";
- private static int port = 80;
-
- private AccessPoint accessPoint;
-
- }
-
-}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs
b/csharp/tests/SimpleConsumerTest.cs
index c986614..e5fc8f0 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -20,7 +20,6 @@ using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using rmq = Apache.Rocketmq.V2;
using System.Threading.Tasks;
-using Castle.Core.Logging;
using Org.Apache.Rocketmq;
namespace tests
@@ -30,26 +29,16 @@ namespace tests
public class SimpleConsumerTest
{
- private static AccessPoint accessPoint;
- private static string _resourceNamespace = "";
private static string _group = "GID_cpp_sdk_standard";
private static string _topic = "cpp_sdk_standard";
-
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- accessPoint = new AccessPoint
- {
- Host = "127.0.0.1",
- Port = 8081
- };
- }
+ private const string HOST = "127.0.0.1";
+ private const int PORT = 8081;
+
[TestMethod]
public async Task TestLifecycle()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
Thread.Sleep(1_000);
@@ -59,7 +48,7 @@ namespace tests
[TestMethod]
public async Task TestReceive()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;
@@ -74,7 +63,7 @@ namespace tests
[TestMethod]
public async Task TestAck()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;
@@ -91,7 +80,7 @@ namespace tests
[TestMethod]
public async Task TestChangeInvisibleDuration()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;