This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f5c2878af5c8e542148f81180c35ce47735d23a5 Author: Aaron Ai <[email protected]> AuthorDate: Wed Feb 22 17:52:56 2023 +0800 Add state machine for rocketmq producer/simpleConsumer --- csharp/rocketmq-client-csharp/Client.cs | 4 +++ csharp/rocketmq-client-csharp/Producer.cs | 34 +++++++++++++++---- csharp/rocketmq-client-csharp/SimpleConsumer.cs | 45 ++++++++++++++++++++----- csharp/rocketmq-client-csharp/State.cs | 7 +++- 4 files changed, 74 insertions(+), 16 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index e0a4c553..a9cd093a 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -58,6 +58,8 @@ namespace Org.Apache.Rocketmq private readonly Dictionary<Endpoints, Session> _sessionsTable; private readonly ReaderWriterLockSlim _sessionLock; + protected volatile State State; + protected Client(ClientConfig clientConfig) { ClientConfig = clientConfig; @@ -75,6 +77,8 @@ namespace Org.Apache.Rocketmq _sessionsTable = new Dictionary<Endpoints, Session>(); _sessionLock = new ReaderWriterLockSlim(); + + State = State.New; } public virtual async Task Start() diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index c4e78e91..d376d14c 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -73,16 +73,36 @@ namespace Org.Apache.Rocketmq public override async Task Start() { - Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}"); - await base.Start(); - Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}"); + try + { + State = State.Starting; + Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}"); + await base.Start(); + Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}"); + State = State.Running; + } + catch (Exception) + { + State = State.Failed; + throw; + } } public override async Task Shutdown() { - Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}"); - await base.Shutdown(); - Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}"); + try + { + State = State.Stopping; + Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}"); + await base.Shutdown(); + Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}"); + State = State.Terminated; + } + catch (Exception) + { + State = State.Failed; + throw; + } } protected override Proto::HeartbeatRequest WrapHeartbeatRequest() @@ -165,7 +185,7 @@ namespace Org.Apache.Rocketmq public async Task<SendReceipt> Send(Message message, ITransaction transaction) { - var tx = (Transaction) transaction; + var tx = (Transaction)transaction; var publishingMessage = tx.TryAddMessage(message); var sendReceipt = await Send(message, true); tx.TryAddReceipt(publishingMessage, sendReceipt); diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs index f6abd5ea..bf9614e1 100644 --- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs +++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs @@ -55,30 +55,59 @@ namespace Org.Apache.Rocketmq public async Task Subscribe(string topic, FilterExpression filterExpression) { - // TODO: check running status. + if (State.Running != State) + { + throw new InvalidOperationException("Simple consumer is not running"); + } + await GetSubscriptionLoadBalancer(topic); _subscriptionExpressions.TryAdd(topic, filterExpression); } public void Unsubscribe(string topic) { + if (State.Running != State) + { + throw new InvalidOperationException("Simple consumer is not running"); + } + _subscriptionExpressions.TryRemove(topic, out _); } public override async Task Start() { - Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}"); - await base.Start(); - Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + try + { + State = State.Starting; + Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}"); + await base.Start(); + Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + State = State.Running; + } + catch (Exception) + { + State = State.Failed; + throw; + } } public override async Task Shutdown() { - Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}"); - await base.Shutdown(); - Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + try + { + State = State.Stopping; + Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}"); + await base.Shutdown(); + Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); + State = State.Terminated; + } + catch (Exception) + { + State = State.Failed; + throw; + } } - + protected override IEnumerable<string> GetTopics() { return _subscriptionExpressions.Keys; diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs index 1dbd6b30..e353df50 100644 --- a/csharp/rocketmq-client-csharp/State.cs +++ b/csharp/rocketmq-client-csharp/State.cs @@ -19,6 +19,11 @@ namespace Org.Apache.Rocketmq { public enum State { - + New, + Starting, + Running, + Stopping, + Terminated, + Failed } } \ No newline at end of file
