This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 708f051 Clean up unused code: Remove PushConsumer (#207)
708f051 is described below
commit 708f051fa147ff328623261eb1c8f846d6581d3e
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Aug 30 20:35:57 2022 +0800
Clean up unused code: Remove PushConsumer (#207)
* Clean up code
* Remove unused code
---
csharp/examples/Program.cs | 11 +-
csharp/rocketmq-client-csharp/AccessPoint.cs | 20 ++
csharp/rocketmq-client-csharp/Client.cs | 106 ++++-----
.../rocketmq-client-csharp/ClientManagerFactory.cs | 45 ----
csharp/rocketmq-client-csharp/Producer.cs | 8 +-
csharp/rocketmq-client-csharp/PushConsumer.cs | 261 ---------------------
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 19 +-
.../rocketmq-client-csharp.csproj | 4 +
csharp/tests/ProducerTest.cs | 18 +-
csharp/tests/PushConsumerTest.cs | 119 ----------
csharp/tests/SimpleConsumerTest.cs | 25 +-
11 files changed, 108 insertions(+), 528 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 28ad6f3..9f01e29 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
-using System.Threading;
using Org.Apache.Rocketmq;
namespace examples
@@ -26,15 +25,15 @@ namespace examples
{
static async Task Main(string[] args)
{
- string accessUrl =
"rmq-cn-7mz2uk4nn0p.cn-hangzhou.rmq.aliyuncs.com:8080";
+ string accessUrl =
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+ var topic = "sdk_standard";
var credentialsProvider = new ConfigFileCredentialsProvider();
- var accessPoint = new AccessPoint(accessUrl);
- var producer = new Producer(accessPoint, "");
+ var producer = new Producer(accessUrl);
producer.CredentialsProvider = credentialsProvider;
+ producer.AddTopicOfInterest(topic);
+
await producer.Start();
- var topic = "sdk_standard";
-
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
// Associate the message with one or multiple keys
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index f4ba47c..ab29273 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -16,6 +16,9 @@
*/
using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Net;
+using System.Net.Sockets;
namespace Org.Apache.Rocketmq
{
@@ -58,5 +61,22 @@ namespace Org.Apache.Rocketmq
{
return $"https://{_host}:{_port}";
}
+
+ public rmq::AddressScheme HostScheme()
+ {
+ IPAddress ip;
+ bool result = IPAddress.TryParse(_host, out ip);
+ if (!result)
+ {
+ return rmq::AddressScheme.DomainName;
+ }
+
+ return ip.AddressFamily switch
+ {
+ AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4,
+ AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6,
+ _ => rmq::AddressScheme.Unspecified
+ };
+ }
}
}
diff --git a/csharp/rocketmq-client-csharp/Client.cs
b/csharp/rocketmq-client-csharp/Client.cs
index 32dffae..0fe4ec9 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -25,9 +25,6 @@ using rmq = Apache.Rocketmq.V2;
using grpc = global::Grpc.Core;
using NLog;
using System.Diagnostics.Metrics;
-using OpenTelemetry;
-using OpenTelemetry.Metrics;
-
namespace Org.Apache.Rocketmq
{
@@ -35,46 +32,45 @@ namespace Org.Apache.Rocketmq
{
protected static readonly Logger Logger =
MqLogManager.Instance.GetCurrentClassLogger();
- protected Client(AccessPoint accessPoint, string resourceNamespace)
+ protected Client(string accessUrl)
{
- _accessPoint = accessPoint;
+ AccessPoint = new AccessPoint(accessUrl);
- // Support IPv4 for now
- AccessPointScheme = rmq::AddressScheme.Ipv4;
+ AccessPointScheme = AccessPoint.HostScheme();
var serviceEndpoint = new rmq::Address();
- serviceEndpoint.Host = accessPoint.Host;
- serviceEndpoint.Port = accessPoint.Port;
+ serviceEndpoint.Host = AccessPoint.Host;
+ serviceEndpoint.Port = AccessPoint.Port;
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
- _resourceNamespace = resourceNamespace;
+ _resourceNamespace = "";
- _clientSettings = new rmq::Settings();
+ ClientSettings = new rmq::Settings();
- _clientSettings.AccessPoint = new rmq::Endpoints();
- _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
- _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+ ClientSettings.AccessPoint = new rmq::Endpoints();
+ ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+ ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
- _clientSettings.RequestTimeout =
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+ ClientSettings.RequestTimeout =
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
- _clientSettings.UserAgent = new rmq.UA();
- _clientSettings.UserAgent.Language = rmq::Language.DotNet;
- _clientSettings.UserAgent.Version = "5.0.0";
- _clientSettings.UserAgent.Platform =
Environment.OSVersion.ToString();
- _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+ ClientSettings.UserAgent = new rmq.UA();
+ ClientSettings.UserAgent.Language = rmq::Language.DotNet;
+ ClientSettings.UserAgent.Version = "5.0.0";
+ ClientSettings.UserAgent.Platform =
Environment.OSVersion.ToString();
+ ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
- Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+ _manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string,
TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
_healthCheckCts = new CancellationTokenSource();
- telemetryCts_ = new CancellationTokenSource();
+ _telemetryCts = new CancellationTokenSource();
}
public virtual async Task Start()
{
- schedule(async () =>
+ Schedule(async () =>
{
await UpdateTopicRoute();
@@ -83,8 +79,8 @@ namespace Org.Apache.Rocketmq
// Get routes for topics of interest.
await UpdateTopicRoute();
- string accessPointUrl = _accessPoint.TargetUrl();
- createSession(accessPointUrl);
+ string accessPointUrl = AccessPoint.TargetUrl();
+ CreateSession(accessPointUrl);
await
_sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
@@ -95,8 +91,8 @@ namespace Org.Apache.Rocketmq
{
Logger.Info($"Shutdown
client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
- telemetryCts_.Cancel();
- await Manager.Shutdown();
+ _telemetryCts.Cancel();
+ await _manager.Shutdown();
}
protected string FilterBroker(Func<string, bool> acceptor)
@@ -138,7 +134,7 @@ namespace Org.Apache.Rocketmq
private async Task UpdateTopicRoute()
{
HashSet<string> topics = new HashSet<string>();
- foreach (var topic in topicsOfInterest_)
+ foreach (var topic in _topicsOfInterest)
{
topics.Add(topic);
}
@@ -192,7 +188,7 @@ namespace Org.Apache.Rocketmq
}
}
- public void schedule(Action action, int seconds, CancellationToken
token)
+ public void Schedule(Action action, int seconds, CancellationToken
token)
{
if (null == action)
{
@@ -246,7 +242,7 @@ namespace Org.Apache.Rocketmq
try
{
Logger.Debug($"Resolving route for topic={topic}");
- topicRouteData = await Manager.ResolveRoute(target, metadata,
request, RequestTimeout);
+ topicRouteData = await _manager.ResolveRoute(target, metadata,
request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name
server");
@@ -286,7 +282,7 @@ namespace Org.Apache.Rocketmq
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
+ tasks.Add(_manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
}
await Task.WhenAll(tasks);
@@ -303,7 +299,7 @@ namespace Org.Apache.Rocketmq
}
- protected async Task<List<rmq::Assignment>> scanLoadAssignment(string
topic, string group)
+ protected async Task<List<rmq::Assignment>> ScanLoadAssignment(string
topic, string group)
{
// Pick a broker randomly
string target = FilterBroker((s) => true);
@@ -324,7 +320,7 @@ namespace Org.Apache.Rocketmq
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
+ return await _manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
}
catch (System.Exception e)
{
@@ -345,14 +341,14 @@ namespace Org.Apache.Rocketmq
public virtual void BuildClientSetting(rmq::Settings settings)
{
- settings.MergeFrom(_clientSettings);
+ settings.MergeFrom(ClientSettings);
}
- public void createSession(string url)
+ public void CreateSession(string url)
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- var stream = Manager.Telemetry(url, metadata);
+ var stream = _manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
Task.Run(async () =>
@@ -372,7 +368,7 @@ namespace Org.Apache.Rocketmq
request.Group.ResourceNamespace = _resourceNamespace;
request.Group.Name = group;
request.MessageQueue = assignment.MessageQueue;
- var messages = await Manager.ReceiveMessage(targetUrl, metadata,
request, getLongPollingTimeout());
+ var messages = await _manager.ReceiveMessage(targetUrl, metadata,
request, getLongPollingTimeout());
return messages;
}
@@ -394,7 +390,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.Ack(target, metadata, request,
RequestTimeout);
+ return await _manager.Ack(target, metadata, request,
RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target,
string group, string topic, string receiptHandle, String messageId)
@@ -413,7 +409,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
+ return await _manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
}
public async Task<bool> NotifyClientTermination()
@@ -429,7 +425,7 @@ namespace Org.Apache.Rocketmq
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
+ tasks.Add(_manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
@@ -447,24 +443,24 @@ namespace Org.Apache.Rocketmq
{
if (null != settings.Metric)
{
- _clientSettings.Metric = new rmq::Metric();
- _clientSettings.Metric.MergeFrom(settings.Metric);
+ ClientSettings.Metric = new rmq::Metric();
+ ClientSettings.Metric.MergeFrom(settings.Metric);
}
if (null != settings.BackoffPolicy)
{
- _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
-
_clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+ ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
+ ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
}
}
- protected readonly IClientManager Manager;
+ protected readonly IClientManager _manager;
- private readonly HashSet<string> topicsOfInterest_ = new
HashSet<string>();
+ private readonly HashSet<string> _topicsOfInterest = new
HashSet<string>();
public void AddTopicOfInterest(string topic)
{
- topicsOfInterest_.Add(topic);
+ _topicsOfInterest.Add(topic);
}
private readonly ConcurrentDictionary<string, TopicRouteData>
_topicRouteTable;
@@ -472,24 +468,24 @@ namespace Org.Apache.Rocketmq
private readonly CancellationTokenSource _healthCheckCts;
- private readonly CancellationTokenSource telemetryCts_ = new
CancellationTokenSource();
+ private readonly CancellationTokenSource _telemetryCts;
public CancellationTokenSource TelemetryCts()
{
- return telemetryCts_;
+ return _telemetryCts;
}
- protected readonly AccessPoint _accessPoint;
+ protected readonly AccessPoint AccessPoint;
// This field is subject changes from servers.
- protected readonly rmq::Settings _clientSettings;
+ protected readonly rmq::Settings ClientSettings;
private readonly Random _random = new Random();
-
- protected readonly ConcurrentDictionary<string, Session> _sessions =
new ConcurrentDictionary<string, Session>();
- public static readonly string MeterName = "Apache.RocketMQ.Client";
-
+ private readonly ConcurrentDictionary<string, Session> _sessions = new
ConcurrentDictionary<string, Session>();
+
+ protected const string MeterName = "Apache.RocketMQ.Client";
+
protected static readonly Meter MetricMeter = new(MeterName, "1.0");
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
deleted file mode 100644
index 9d03994..0000000
--- a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace Org.Apache.Rocketmq
-{
- public sealed class ClientManagerFactory
- {
- public static IClientManager getClientManager(string resourceNamespace)
- {
- if (clientManagers.ContainsKey(resourceNamespace))
- {
- return clientManagers[resourceNamespace];
- }
-
- var clientManager = new ClientManager();
- // TODO: configure client managers.
- if (clientManagers.TryAdd<string,
IClientManager>(resourceNamespace, clientManager))
- {
- return clientManager;
- }
-
- return clientManagers[resourceNamespace];
- }
-
- private static ConcurrentDictionary<string, IClientManager>
clientManagers = new ConcurrentDictionary<string, IClientManager>();
- }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs
b/csharp/rocketmq-client-csharp/Producer.cs
index 37aebb9..e337b1a 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
{
public class Producer : Client, IProducer
{
- public Producer(AccessPoint accessPoint, string resourceNamespace) :
base(accessPoint, resourceNamespace)
+ public Producer(string accessUrl) : base(accessUrl)
{
_loadBalancer = new ConcurrentDictionary<string,
PublishLoadBalancer>();
_sendFailureTotal =
MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
@@ -51,8 +51,8 @@ namespace Org.Apache.Rocketmq
.AddOtlpExporter(delegate(OtlpExporterOptions options,
MetricReaderOptions readerOptions)
{
options.Protocol = OtlpExportProtocol.Grpc;
- options.Endpoint = new Uri(_accessPoint.TargetUrl());
- options.TimeoutMilliseconds = (int)
_clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+ options.Endpoint = new Uri(AccessPoint.TargetUrl());
+ options.TimeoutMilliseconds = (int)
ClientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds =
60 * 1000;
})
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
{
var stopWatch = new Stopwatch();
stopWatch.Start();
- rmq::SendMessageResponse response = await
Manager.SendMessage(target, metadata, request, RequestTimeout);
+ rmq::SendMessageResponse response = await
_manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok ==
response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs
b/csharp/rocketmq-client-csharp/PushConsumer.cs
deleted file mode 100644
index cc30943..0000000
--- a/csharp/rocketmq-client-csharp/PushConsumer.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
- public class PushConsumer : Client, IConsumer
- {
- public PushConsumer(AccessPoint accessPoint, string resourceNamespace,
string group) : base(accessPoint, resourceNamespace)
- {
- _group = group;
- _topicFilterExpressionMap = new ConcurrentDictionary<string,
FilterExpression>();
- _topicAssignmentsMap = new ConcurrentDictionary<string,
List<rmq::Assignment>>();
- _processQueueMap = new ConcurrentDictionary<rmq::Assignment,
ProcessQueue>();
- _scanAssignmentCTS = new CancellationTokenSource();
- _scanExpiredProcessQueueCTS = new CancellationTokenSource();
- }
-
- public override async Task Start()
- {
- if (null == _messageListener)
- {
- throw new System.Exception("Bad configuration: message
listener is required");
- }
-
- await base.Start();
-
- // Step-1: Resolve topic routes
- List<Task<TopicRouteData>> queryRouteTasks = new
List<Task<TopicRouteData>>();
- foreach (var item in _topicFilterExpressionMap)
- {
- queryRouteTasks.Add(GetRouteFor(item.Key, true));
- }
- Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
-
- // Step-2: Send heartbeats to all involving brokers so that we may
get immediate, valid assignments.
- await Heartbeat();
-
- // Step-3: Scan load assignments that are assigned to current
client
- schedule(async () =>
- {
- await scanLoadAssignments();
- }, 10, _scanAssignmentCTS.Token);
-
- schedule(() =>
- {
- ScanExpiredProcessQueue();
- }, 10, _scanExpiredProcessQueueCTS.Token);
- }
-
- public override async Task Shutdown()
- {
- _scanAssignmentCTS.Cancel();
- _scanExpiredProcessQueueCTS.Cancel();
-
- // Shutdown resources of derived class
- await base.Shutdown();
- }
-
- private async Task scanLoadAssignments()
- {
- Logger.Debug("Start to scan load assignments from server");
- List<Task<List<rmq::Assignment>>> tasks = new
List<Task<List<rmq::Assignment>>>();
- foreach (var item in _topicFilterExpressionMap)
- {
- tasks.Add(scanLoadAssignment(item.Key, _group));
- }
- var result = await Task.WhenAll(tasks);
-
- foreach (var assignments in result)
- {
- if (assignments.Count == 0)
- {
- continue;
- }
-
- checkAndUpdateAssignments(assignments);
- }
- Logger.Debug("Completed scanning load assignments");
- }
-
- private void ScanExpiredProcessQueue()
- {
- foreach (var item in _processQueueMap)
- {
- if (item.Value.Expired())
- {
- Task.Run(async () =>
- {
- await ExecutePop0(item.Key);
- });
- }
- }
- }
-
- private void checkAndUpdateAssignments(List<rmq::Assignment>
assignments)
- {
- if (assignments.Count == 0)
- {
- return;
- }
-
- string topic = assignments[0].MessageQueue.Topic.Name;
-
- // Compare to generate or cancel pop-cycles
- List<rmq::Assignment> existing;
- _topicAssignmentsMap.TryGetValue(topic, out existing);
-
- foreach (var assignment in assignments)
- {
- if (null == existing || !existing.Contains(assignment))
- {
- ExecutePop(assignment);
- }
- }
-
- if (null != existing)
- {
- foreach (var assignment in existing)
- {
- if (!assignments.Contains(assignment))
- {
- Logger.Info($"Stop receiving messages from
{assignment.MessageQueue.ToString()}");
- CancelPop(assignment);
- }
- }
- }
-
- }
-
- private void ExecutePop(rmq::Assignment assignment)
- {
- var processQueue = new ProcessQueue();
- if (_processQueueMap.TryAdd(assignment, processQueue))
- {
- Task.Run(async () =>
- {
- await ExecutePop0(assignment);
- });
- }
- }
-
- private async Task ExecutePop0(rmq::Assignment assignment)
- {
- Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
- while (true)
- {
- try
- {
- ProcessQueue processQueue;
- if (!_processQueueMap.TryGetValue(assignment, out
processQueue))
- {
- break;
- }
-
- if (processQueue.Dropped)
- {
- break;
- }
-
- List<Message> messages = await
base.ReceiveMessage(assignment, _group);
- processQueue.LastReceiveTime = System.DateTime.UtcNow;
-
- // TODO: cache message and dispatch them
-
- List<Message> failed = new List<Message>();
- await _messageListener.Consume(messages, failed);
-
- foreach (var message in failed)
- {
- await
base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic,
message._receiptHandle, message.MessageId);
- }
-
- foreach (var message in messages)
- {
- if (!failed.Contains(message))
- {
- bool success = await base.Ack(message._sourceHost,
_group, message.Topic, message._receiptHandle, message.MessageId);
- if (!success)
- {
- //TODO: log error.
- }
- }
- }
- }
- catch (System.Exception)
- {
- // TODO: log exception raised.
- }
-
-
- }
- }
-
- private void CancelPop(rmq::Assignment assignment)
- {
- if (!_processQueueMap.ContainsKey(assignment))
- {
- return;
- }
-
- ProcessQueue processQueue;
- if (_processQueueMap.Remove(assignment, out processQueue))
- {
- processQueue.Dropped = true;
- }
- }
-
- protected override void PrepareHeartbeatData(rmq::HeartbeatRequest
request)
- {
- }
-
- public void Subscribe(string topic, string expression, ExpressionType
type)
- {
- var filterExpression = new FilterExpression(expression, type);
- _topicFilterExpressionMap[topic] = filterExpression;
-
- }
-
- public void RegisterListener(IMessageListener listener)
- {
- if (null != listener)
- {
- _messageListener = listener;
- }
- }
-
- private string _group;
-
- private ConcurrentDictionary<string, FilterExpression>
_topicFilterExpressionMap;
- private IMessageListener _messageListener;
-
- private CancellationTokenSource _scanAssignmentCTS;
-
- private ConcurrentDictionary<string, List<rmq::Assignment>>
_topicAssignmentsMap;
-
- private ConcurrentDictionary<rmq::Assignment, ProcessQueue>
_processQueueMap;
-
- private CancellationTokenSource _scanExpiredProcessQueueCTS;
-
- }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 154efa0..d4694ac 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -30,9 +30,8 @@ namespace Org.Apache.Rocketmq
public class SimpleConsumer : Client
{
- public SimpleConsumer(AccessPoint accessPoint,
- string resourceNamespace, string group)
- : base(accessPoint, resourceNamespace)
+ public SimpleConsumer(string accessUrl, string group)
+ : base(accessUrl)
{
_fifo = false;
_subscriptions = new ConcurrentDictionary<string,
rmq.SubscriptionEntry>();
@@ -61,7 +60,7 @@ namespace Org.Apache.Rocketmq
await base.Start();
// Scan load assignment periodically
- schedule(async () =>
+ Schedule(async () =>
{
while (!_scanAssignmentCts.IsCancellationRequested)
{
@@ -100,13 +99,13 @@ namespace Org.Apache.Rocketmq
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
var address = new rmq::Address();
- address.Host = _accessPoint.Host;
- address.Port = _accessPoint.Port;
+ address.Host = AccessPoint.Host;
+ address.Port = AccessPoint.Port;
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
Signature.sign(this, metadata);
-
tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata,
request, TimeSpan.FromSeconds(3)));
+
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata,
request, TimeSpan.FromSeconds(3)));
}
List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
@@ -184,7 +183,7 @@ namespace Org.Apache.Rocketmq
var metadata = new Metadata();
Signature.sign(this, metadata);
- return await Manager.ReceiveMessage(targetUrl, metadata, request,
timeout);
+ return await _manager.ReceiveMessage(targetUrl, metadata, request,
timeout);
}
@@ -207,7 +206,7 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
- await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+ await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
}
public async Task ChangeInvisibleDuration(Message message, TimeSpan
invisibleDuration)
@@ -229,7 +228,7 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
Signature.sign(this, metadata);
- await Manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
+ await _manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
}
private rmq.MessageQueue NextQueue()
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index baf103f..e2e1844 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -40,4 +40,8 @@
</None>
</ItemGroup>
+ <ItemGroup>
+ <Compile Remove="ClientManagerFactory.cs" />
+ </ItemGroup>
+
</Project>
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index 663980a..b5094a2 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -47,7 +47,7 @@ namespace tests
[TestMethod]
public async Task TestLifecycle()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -57,7 +57,7 @@ namespace tests
[TestMethod]
public async Task TestSendStandardMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -82,7 +82,7 @@ namespace tests
[TestMethod]
public async Task TestSendMultipleMessages()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -109,7 +109,7 @@ namespace tests
[TestMethod]
public async Task TestSendFifoMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -131,7 +131,7 @@ namespace tests
[TestMethod]
public async Task TestSendScheduledMessage()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -154,7 +154,7 @@ namespace tests
[TestMethod]
public async Task TestSendMessage_Failure()
{
- var producer = new Producer(_accessPoint, resourceNamespace);
+ var producer = new Producer($"{HOST}:{PORT}");
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
@@ -170,14 +170,12 @@ namespace tests
await producer.Send(msg);
Assert.Fail("Should have raised an exception");
}
- catch (MessageException e)
+ catch (MessageException)
{
}
await producer.Shutdown();
}
-
- private static string resourceNamespace = "";
-
+
private static string topic = "cpp_sdk_standard";
private static string HOST = "127.0.0.1";
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
deleted file mode 100644
index 78f01de..0000000
--- a/csharp/tests/PushConsumerTest.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
-using System;
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq
-{
-
- public class TestMessageListener : IMessageListener
- {
- public Task Consume(List<Message> messages, List<Message> failed)
- {
- foreach (var message in messages)
- {
- Console.WriteLine("");
- }
-
- return Task.CompletedTask;
- }
- }
-
- public class CountableMessageListener : IMessageListener
- {
- public Task Consume(List<Message> messages, List<Message> failed)
- {
- foreach (var message in messages)
- {
- Console.WriteLine("{}", message.MessageId);
- }
-
- return Task.CompletedTask;
- }
- }
-
- [TestClass]
- public class PushConsumerTest
- {
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- credentialsProvider = new ConfigFileCredentialsProvider();
-
- }
-
- [ClassCleanup]
- public static void TearDown()
- {
-
- }
-
- [TestInitialize]
- public void SetUp()
- {
- accessPoint = new AccessPoint();
- accessPoint.Host = host;
- accessPoint.Port = port;
- }
-
- [TestMethod]
- public void testLifecycle()
- {
- var consumer = new PushConsumer(accessPoint, resourceNamespace,
group);
- consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
- consumer.Region = "cn-hangzhou-pre";
- consumer.Subscribe(topic, "*", ExpressionType.TAG);
- consumer.RegisterListener(new TestMessageListener());
- consumer.Start();
-
- consumer.Shutdown();
- }
-
-
- // [Ignore]
- [TestMethod]
- public void testConsumeMessage()
- {
- var consumer = new PushConsumer(accessPoint, resourceNamespace,
group);
- consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
- consumer.Region = "cn-hangzhou-pre";
- consumer.Subscribe(topic, "*", ExpressionType.TAG);
- consumer.RegisterListener(new CountableMessageListener());
- consumer.Start();
- System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
- consumer.Shutdown();
- }
-
-
- private static string resourceNamespace =
"MQ_INST_1080056302921134_BXuIbML7";
-
- private static string topic = "cpp_sdk_standard";
-
- private static string group = "GID_cpp_sdk_standard";
-
- private static ICredentialsProvider credentialsProvider;
- private static string host = "116.62.231.199";
- private static int port = 80;
-
- private AccessPoint accessPoint;
-
- }
-
-}
\ No newline at end of file
diff --git a/csharp/tests/SimpleConsumerTest.cs
b/csharp/tests/SimpleConsumerTest.cs
index c986614..e5fc8f0 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -20,7 +20,6 @@ using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using rmq = Apache.Rocketmq.V2;
using System.Threading.Tasks;
-using Castle.Core.Logging;
using Org.Apache.Rocketmq;
namespace tests
@@ -30,26 +29,16 @@ namespace tests
public class SimpleConsumerTest
{
- private static AccessPoint accessPoint;
- private static string _resourceNamespace = "";
private static string _group = "GID_cpp_sdk_standard";
private static string _topic = "cpp_sdk_standard";
-
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- accessPoint = new AccessPoint
- {
- Host = "127.0.0.1",
- Port = 8081
- };
- }
+ private const string HOST = "127.0.0.1";
+ private const int PORT = 8081;
+
[TestMethod]
public async Task TestLifecycle()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
Thread.Sleep(1_000);
@@ -59,7 +48,7 @@ namespace tests
[TestMethod]
public async Task TestReceive()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;
@@ -74,7 +63,7 @@ namespace tests
[TestMethod]
public async Task TestAck()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;
@@ -91,7 +80,7 @@ namespace tests
[TestMethod]
public async Task TestChangeInvisibleDuration()
{
- var simpleConsumer = new SimpleConsumer(accessPoint,
_resourceNamespace, _group);
+ var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
var batchSize = 32;