This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 4b99c037 [ISSUE #575] Gracefully shutdown for Rust client (#576)
4b99c037 is described below
commit 4b99c03706921ad9bdbfa2ea0a9a02ebd9d4731d
Author: SSpirits <[email protected]>
AuthorDate: Tue Jul 25 19:52:53 2023 +0800
[ISSUE #575] Gracefully shutdown for Rust client (#576)
* feat(rust): add gracefully shutdown for rust sdk
Signed-off-by: SSpirits <[email protected]>
* fix(rust): fix test
Signed-off-by: SSpirits <[email protected]>
* fix(rust): apply reviewer changes
Signed-off-by: SSpirits <[email protected]>
---------
Signed-off-by: SSpirits <[email protected]>
---
rust/examples/delay_producer.rs | 29 +++++++--
rust/examples/fifo_producer.rs | 29 +++++++--
rust/examples/producer.rs | 29 +++++++--
rust/examples/simple_consumer.rs | 72 +++++++++++++---------
rust/examples/transaction_producer.rs | 25 ++++++--
rust/src/client.rs | 112 ++++++++++++++++++++++++----------
rust/src/conf.rs | 4 +-
rust/src/producer.rs | 7 ++-
rust/src/session.rs | 105 +++++++++++++++++++++++++------
rust/src/simple_consumer.rs | 10 ++-
10 files changed, 316 insertions(+), 106 deletions(-)
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
index 3e237c38..cca3baea 100644
--- a/rust/examples/delay_producer.rs
+++ b/rust/examples/delay_producer.rs
@@ -23,8 +23,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic
not exist
+ // It's recommended to specify the topics that applications will publish
messages to
+ // because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["delay_test"]);
@@ -34,7 +34,11 @@ async fn main() {
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::delay_message_builder(
@@ -51,10 +55,23 @@ async fn main() {
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // recommend shutdown manually to gracefully stop and unregister from
server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
index 38562e2b..211ae683 100644
--- a/rust/examples/fifo_producer.rs
+++ b/rust/examples/fifo_producer.rs
@@ -20,8 +20,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic
not exist
+ // It's recommended to specify the topics that applications will publish
messages to
+ // because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["fifo_test"]);
@@ -31,7 +31,11 @@ async fn main() {
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::fifo_message_builder(
@@ -44,10 +48,23 @@ async fn main() {
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from
server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 335b31fb..3e818e91 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -20,8 +20,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic
not exist
+ // It's recommended to specify the topics that applications will publish
messages to
+ // because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["test_topic"]);
@@ -31,7 +31,11 @@ async fn main() {
// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::builder()
@@ -42,10 +46,23 @@ async fn main() {
.unwrap();
// send message to rocketmq proxy
- let result = producer.send(message).await;
- debug_assert!(result.is_ok(), "send message failed: {:?}", result);
+ let send_result = producer.send(message).await;
+ if send_result.is_err() {
+ eprintln!("send message failed: {:?}", send_result.unwrap_err());
+ return;
+ }
println!(
"send message success, message_id={}",
- result.unwrap().message_id()
+ send_result.unwrap().message_id()
);
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from
server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_ok() {
+ eprintln!(
+ "producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index a5f94085..fc1a8388 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -20,8 +20,8 @@ use rocketmq::SimpleConsumer;
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // simple consumer will prefetch topic route when start and failed fast if
topic not exist
+ // It's recommended to specify the topics that applications will publish
messages to
+ // because the simple consumer will prefetch topic routes for them on
start and fail fast in case they do not exist
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test_topic"]);
consumer_option.set_consumer_group("SimpleConsumerGroup");
@@ -33,38 +33,54 @@ async fn main() {
// build and start simple consumer
let mut consumer = SimpleConsumer::new(consumer_option,
client_option).unwrap();
- consumer.start().await.unwrap();
-
- loop {
- // pop message from rocketmq proxy
- let receive_result = consumer
- .receive(
- "test_topic".to_string(),
- &FilterExpression::new(FilterType::Tag, "test_tag"),
- )
- .await;
- debug_assert!(
- receive_result.is_ok(),
- "receive message failed: {:?}",
- receive_result.unwrap_err()
+ let start_result = consumer.start().await;
+ if start_result.is_err() {
+ eprintln!(
+ "simple consumer start failed: {:?}",
+ start_result.unwrap_err()
);
+ return;
+ }
- let messages = receive_result.unwrap();
+ // pop message from rocketmq proxy
+ let receive_result = consumer
+ .receive(
+ "test_topic".to_string(),
+ &FilterExpression::new(FilterType::Tag, "test_tag"),
+ )
+ .await;
+ if receive_result.is_err() {
+ eprintln!("receive message failed: {:?}", receive_result.unwrap_err());
+ return;
+ }
- if messages.is_empty() {
- println!("no message received");
- return;
- }
+ let messages = receive_result.unwrap();
- for message in messages {
- println!("receive message: {:?}", message);
- // ack message to rocketmq proxy
- let ack_result = consumer.ack(&message).await;
- debug_assert!(
- ack_result.is_ok(),
- "ack message failed: {:?}",
+ if messages.is_empty() {
+ println!("no message received");
+ return;
+ }
+
+ for message in messages {
+ println!("receive message: {:?}", message);
+ // ack message to rocketmq proxy
+ let ack_result = consumer.ack(&message).await;
+ if ack_result.is_err() {
+ eprintln!(
+ "ack message {} failed: {:?}",
+ message.message_id(),
ack_result.unwrap_err()
);
}
}
+
+ // shutdown the simple consumer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from
server
+ let shutdown_result = consumer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "simple consumer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/examples/transaction_producer.rs
b/rust/examples/transaction_producer.rs
index 7423cc9b..6df6cb65 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -28,8 +28,8 @@ lazy_static::lazy_static! {
#[tokio::main]
async fn main() {
- // recommend to specify which topic(s) you would like to send message to
- // producer will prefetch topic route when start and failed fast if topic
not exist
+ // It's recommended to specify the topics that applications will publish
messages to
+ // because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["transaction_test"]);
@@ -62,7 +62,11 @@ async fn main() {
}),
)
.unwrap();
- producer.start().await.unwrap();
+ let start_result = producer.start().await;
+ if start_result.is_err() {
+ eprintln!("producer start failed: {:?}", start_result.unwrap_err());
+ return;
+ }
// build message
let message = MessageBuilder::transaction_message_builder(
@@ -93,5 +97,18 @@ async fn main() {
// commit transaction manually
// delete following two lines so that RocketMQ server will check
transaction status periodically
let result = transaction.commit().await;
- debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
+ if result.is_err() {
+ eprintln!("commit transaction failed: {:?}", result.unwrap_err());
+ return;
+ }
+
+ // shutdown the producer when you don't need it anymore.
+ // you should shutdown it manually to gracefully stop and unregister from
server
+ let shutdown_result = producer.shutdown().await;
+ if shutdown_result.is_err() {
+ eprintln!(
+ "transaction producer shutdown failed: {:?}",
+ shutdown_result.unwrap_err()
+ );
+ }
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 57bddb84..3804b4ae 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -37,9 +37,9 @@ use crate::pb::receive_message_response::Content;
use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest,
FilterExpression,
- HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
QueryRouteRequest,
- ReceiveMessageRequest, Resource, SendMessageRequest, Status,
TelemetryCommand,
- TransactionSource,
+ HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
NotifyClientTerminationRequest,
+ QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest,
Status,
+ TelemetryCommand, TransactionSource,
};
#[double]
use crate::session::SessionManager;
@@ -55,6 +55,7 @@ pub(crate) struct Client {
settings: TelemetryCommand,
transaction_checker: Option<Box<TransactionChecker>>,
telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
+ shutdown_tx: Option<oneshot::Sender<()>>,
}
lazy_static::lazy_static! {
@@ -62,6 +63,8 @@ lazy_static::lazy_static! {
}
const OPERATION_CLIENT_NEW: &str = "client.new";
+const OPERATION_CLIENT_START: &str = "client.start";
+const OPERATION_CLIENT_SHUTDOWN: &str = "client.shutdown";
const OPERATION_GET_SESSION: &str = "client.get_session";
const OPERATION_QUERY_ROUTE: &str = "client.query_route";
const OPERATION_HEARTBEAT: &str = "client.heartbeat";
@@ -102,11 +105,12 @@ impl Client {
settings,
transaction_checker: None,
telemetry_command_tx: None,
+ shutdown_tx: None,
})
}
pub(crate) fn is_started(&self) -> bool {
- self.telemetry_command_tx.is_some()
+ self.shutdown_tx.is_some()
}
pub(crate) fn has_transaction_checker(&self) -> bool {
@@ -124,20 +128,27 @@ impl Client {
let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
- let group = self.option.group.to_string();
+ let group = self.option.group.clone();
let namespace = self.option.namespace.to_string();
let client_type = self.option.client_type.clone();
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+
// send heartbeat and handle telemetry command
- let (tx, mut rx) = mpsc::channel(16);
- self.telemetry_command_tx = Some(tx);
- let rpc_client = self.get_session().await?;
+ let (telemetry_command_tx, mut telemetry_command_rx) =
mpsc::channel(16);
+ self.telemetry_command_tx = Some(telemetry_command_tx);
+ let rpc_client = self
+ .get_session()
+ .await
+ .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
let endpoints = self.access_endpoints.clone();
let transaction_checker = self.transaction_checker.take();
// give a placeholder
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _|
TransactionResolution::UNKNOWN));
}
+
tokio::spawn(async move {
rpc_client.is_started();
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(30));
@@ -178,17 +189,54 @@ impl Client {
debug!(logger,"send heartbeat to server success,
peer={}",peer);
}
},
- command = rx.recv() => {
+ command = telemetry_command_rx.recv() => {
if let Some(command) = command {
let result =
Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker,
endpoints.clone(), command).await;
if let Err(error) = result {
- error!(logger, "handle telemetry command
failed: {:?}", error)
+ error!(logger, "handle telemetry command
failed: {:?}", error);
}
}
},
+ _ = &mut shutdown_rx => {
+ debug!(logger, "receive shutdown signal, stop
heartbeat task and telemetry command handler");
+ break;
+ }
}
}
+ info!(
+ logger,
+ "heartbeat task and telemetry command handler are stopped"
+ );
+ });
+ Ok(())
+ }
+
+ fn check_started(&self, operation: &'static str) -> Result<(),
ClientError> {
+ if !self.is_started() {
+ return Err(ClientError::new(
+ ErrorKind::ClientIsNotRunning,
+ "client is not started",
+ operation,
+ )
+ .with_context("client_id", self.id.clone()));
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> {
+ self.check_started(OPERATION_CLIENT_SHUTDOWN)?;
+ let mut rpc_client = self.get_session().await?;
+ self.telemetry_command_tx = None;
+ if let Some(tx) = self.shutdown_tx.take() {
+ let _ = tx.send(());
+ }
+ let group = self.option.group.as_ref().map(|group| Resource {
+ name: group.to_string(),
+ resource_namespace: self.option.namespace.to_string(),
});
+ let response =
rpc_client.notify_shutdown(NotifyClientTerminationRequest { group });
+ Self::handle_response_status(response.await?.status,
OPERATION_CLIENT_SHUTDOWN)?;
+ self.session_manager.shutdown().await;
Ok(())
}
@@ -268,14 +316,7 @@ impl Client {
}
pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
- if !self.is_started() {
- return Err(ClientError::new(
- ErrorKind::ClientIsNotRunning,
- "client is not started",
- OPERATION_GET_SESSION,
- )
- .with_context("client_id", self.id.clone()));
- }
+ self.check_started(OPERATION_GET_SESSION)?;
let session = self
.session_manager
.get_or_create_session(
@@ -348,8 +389,8 @@ impl Client {
return Ok(route);
}
}
- self.topic_route_inner(self.get_session().await.unwrap(), topic)
- .await
+ let rpc_client = self.get_session().await?;
+ self.topic_route_inner(rpc_client, topic).await
}
async fn query_topic_route<T: RPCClient + 'static>(
@@ -461,15 +502,16 @@ impl Client {
async fn heart_beat_inner<T: RPCClient + 'static>(
mut rpc_client: T,
- group: &str,
+ group: &Option<String>,
namespace: &str,
client_type: &ClientType,
) -> Result<HeartbeatResponse, ClientError> {
+ let group = group.as_ref().map(|group| Resource {
+ name: group.to_string(),
+ resource_namespace: namespace.to_string(),
+ });
let request = HeartbeatRequest {
- group: Some(Resource {
- name: group.to_string(),
- resource_namespace: namespace.to_string(),
- }),
+ group,
client_type: client_type.clone() as i32,
};
let response = rpc_client.heartbeat(request).await?;
@@ -534,7 +576,7 @@ impl Client {
) -> Result<Vec<Message>, ClientError> {
let request = ReceiveMessageRequest {
group: Some(Resource {
- name: self.option.group.to_string(),
+ name: self.option.group.as_ref().unwrap().to_string(),
resource_namespace: self.option.namespace.to_string(),
}),
message_queue: Some(message_queue),
@@ -594,7 +636,7 @@ impl Client {
) -> Result<Vec<AckMessageResultEntry>, ClientError> {
let request = AckMessageRequest {
group: Some(Resource {
- name: self.option.group.to_string(),
+ name: self.option.group.as_ref().unwrap().to_string(),
resource_namespace: self.option.namespace.to_string(),
}),
topic: Some(Resource {
@@ -642,7 +684,10 @@ pub(crate) mod tests {
fn new_client_for_test() -> Client {
Client {
logger: terminal_logger(),
- option: ClientOption::default(),
+ option: ClientOption {
+ group: Some("group".to_string()),
+ ..Default::default()
+ },
session_manager: Arc::new(SessionManager::default()),
route_table: Mutex::new(HashMap::new()),
id: Client::generate_client_id(),
@@ -650,6 +695,7 @@ pub(crate) mod tests {
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: None,
+ shutdown_tx: None,
}
}
@@ -665,6 +711,7 @@ pub(crate) mod tests {
settings: TelemetryCommand::default(),
transaction_checker: None,
telemetry_command_tx: Some(tx),
+ shutdown_tx: None,
}
}
@@ -714,7 +761,8 @@ pub(crate) mod tests {
.expect_get_or_create_session()
.returning(|_, _, _| Ok(Session::mock()));
- let client = new_client_with_session_manager(session_manager);
+ let mut client = new_client_with_session_manager(session_manager);
+ let _ = client.start().await;
let result = client.get_session().await;
assert!(result.is_ok());
let result = client
@@ -893,7 +941,9 @@ pub(crate) mod tests {
mock.expect_heartbeat()
.return_once(|_| Box::pin(futures::future::ready(response)));
- let send_result = Client::heart_beat_inner(mock, "", "",
&ClientType::Producer).await;
+ let send_result =
+ Client::heart_beat_inner(mock, &Some("group".to_string()), "",
&ClientType::Producer)
+ .await;
assert!(send_result.is_ok());
}
@@ -1034,7 +1084,7 @@ pub(crate) mod tests {
let result = Client::handle_telemetry_command(
mock,
&Some(Box::new(|_, _| TransactionResolution::COMMIT)),
- Endpoints::from_url("localhopst:8081").unwrap(),
+ Endpoints::from_url("localhost:8081").unwrap(),
RecoverOrphanedTransactionCommand(pb::RecoverOrphanedTransactionCommand {
message: Some(Message {
topic: Some(Resource::default()),
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index ea27d279..fa16a41a 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -29,7 +29,7 @@ use crate::simple_consumer::SimpleConsumer;
#[derive(Debug, Clone)]
pub struct ClientOption {
pub(crate) client_type: ClientType,
- pub(crate) group: String,
+ pub(crate) group: Option<String>,
pub(crate) namespace: String,
pub(crate) access_url: String,
pub(crate) enable_tls: bool,
@@ -43,7 +43,7 @@ impl Default for ClientOption {
fn default() -> Self {
ClientOption {
client_type: ClientType::Producer,
- group: "".to_string(),
+ group: None,
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
enable_tls: false,
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index d5e768c5..7e3f3996 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -275,8 +275,9 @@ impl Producer {
}
let topic = message.take_topic();
let receipt = self.send(message).await?;
+ let rpc_client = self.client.get_session().await?;
Ok(TransactionImpl::new(
- Box::new(self.client.get_session().await.unwrap()),
+ Box::new(rpc_client),
Resource {
resource_namespace: self.option.namespace().to_string(),
name: topic,
@@ -284,6 +285,10 @@ impl Producer {
receipt,
))
}
+
+ pub async fn shutdown(self) -> Result<(), ClientError> {
+ self.client.shutdown().await
+ }
}
#[cfg(test)]
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 5762d662..9441f7c5 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -22,7 +22,7 @@ use ring::hmac;
use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::metadata::{AsciiMetadataValue, MetadataMap};
@@ -34,9 +34,9 @@ use crate::model::common::Endpoints;
use crate::pb::telemetry_command::Command;
use crate::pb::{
AckMessageRequest, AckMessageResponse, EndTransactionRequest,
EndTransactionResponse,
- HeartbeatRequest, HeartbeatResponse, QueryRouteRequest, QueryRouteResponse,
- ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest,
SendMessageResponse,
- TelemetryCommand,
+ HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
+ NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse,
ReceiveMessageRequest,
+ ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
TelemetryCommand,
};
use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
use crate::{error::ClientError,
pb::messaging_service_client::MessagingServiceClient};
@@ -50,6 +50,7 @@ const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
+const OPERATION_NOTIFY_CLIENT_TERMINATION: &str =
"rpc.notify_client_termination";
#[async_trait]
#[automock]
@@ -78,17 +79,36 @@ pub(crate) trait RPCClient {
&mut self,
request: EndTransactionRequest,
) -> Result<EndTransactionResponse, ClientError>;
+ async fn notify_shutdown(
+ &mut self,
+ request: NotifyClientTerminationRequest,
+ ) -> Result<NotifyClientTerminationResponse, ClientError>;
}
#[allow(dead_code)]
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub(crate) struct Session {
logger: Logger,
client_id: String,
option: ClientOption,
endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
- telemetry_tx: Box<Option<mpsc::Sender<TelemetryCommand>>>,
+ telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
+ shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl Clone for Session {
+ fn clone(&self) -> Self {
+ Session {
+ logger: self.logger.clone(),
+ client_id: self.client_id.clone(),
+ option: self.option.clone(),
+ endpoints: self.endpoints.clone(),
+ stub: self.stub.clone(),
+ telemetry_tx: None,
+ shutdown_tx: None,
+ }
+ }
}
impl Session {
@@ -108,7 +128,8 @@ impl Session {
stub: MessagingServiceClient::new(
Channel::from_static("http://localhost:8081").connect_lazy(),
),
- telemetry_tx: Box::new(None),
+ telemetry_tx: None,
+ shutdown_tx: None,
}
}
@@ -159,7 +180,8 @@ impl Session {
endpoints: endpoints.clone(),
client_id,
stub,
- telemetry_tx: Box::new(None),
+ telemetry_tx: None,
+ shutdown_tx: None,
})
}
@@ -288,24 +310,35 @@ impl Session {
.set_source(e)
})?;
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+ self.shutdown_tx = Some(shutdown_tx);
+
let logger = self.logger.clone();
tokio::spawn(async move {
let mut stream = response.into_inner();
loop {
- match stream.message().await {
- Ok(Some(item)) => {
- debug!(logger, "receive telemetry command: {:?}",
item);
- if let Some(command) = item.command {
- _ = telemetry_command_tx.send(command).await;
+ tokio::select! {
+ message = stream.message() => {
+ match message {
+ Ok(Some(item)) => {
+ debug!(logger, "receive telemetry command:
{:?}", item);
+ if let Some(command) = item.command {
+ _ =
telemetry_command_tx.send(command).await;
+ }
+ }
+ Ok(None) => {
+ info!(logger, "telemetry command stream closed
by server");
+ break;
+ }
+ Err(e) => {
+ error!(logger, "telemetry response error:
{:?}", e);
+ }
}
}
- Ok(None) => {
- debug!(logger, "request stream closed");
+ _ = &mut shutdown_rx => {
+ info!(logger, "receive shutdown signal, stop dealing
with telemetry command");
break;
}
- Err(e) => {
- error!(logger, "telemetry response error: {:?}", e);
- }
}
}
});
@@ -314,9 +347,15 @@ impl Session {
Ok(())
}
+ pub(crate) fn shutdown(&mut self) {
+ if let Some(tx) = self.shutdown_tx.take() {
+ let _ = tx.send(());
+ }
+ }
+
#[allow(dead_code)]
pub(crate) fn is_started(&self) -> bool {
- self.telemetry_tx.is_some()
+ self.shutdown_tx.is_some()
}
#[allow(dead_code)]
@@ -465,6 +504,26 @@ impl RPCClient for Session {
})?;
Ok(response.into_inner())
}
+
+ async fn notify_shutdown(
+ &mut self,
+ request: NotifyClientTerminationRequest,
+ ) -> Result<NotifyClientTerminationResponse, ClientError> {
+ let request = self.sign(request);
+ let response = self
+ .stub
+ .notify_client_termination(request)
+ .await
+ .map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc notify_client_termination failed",
+ OPERATION_NOTIFY_CLIENT_TERMINATION,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
}
#[derive(Debug)]
@@ -521,6 +580,14 @@ impl SessionManager {
}
Ok(sessions)
}
+
+ pub(crate) async fn shutdown(&self) {
+ let mut session_map = self.session_map.lock().await;
+ for (_, session) in session_map.iter_mut() {
+ session.shutdown();
+ }
+ session_map.clear();
+ }
}
#[cfg(test)]
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index d4e222ea..a8777053 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -67,7 +67,7 @@ impl SimpleConsumer {
let client_option = ClientOption {
client_type: ClientType::SimpleConsumer,
- group: option.consumer_group().to_string(),
+ group: Some(option.consumer_group().to_string()),
namespace: option.namespace().to_string(),
..client_option
};
@@ -104,6 +104,10 @@ impl SimpleConsumer {
Ok(())
}
+ pub async fn shutdown(self) -> Result<(), ClientError> {
+ self.client.shutdown().await
+ }
+
/// receive messages from the specified topic
///
/// # Arguments
@@ -115,7 +119,7 @@ impl SimpleConsumer {
topic: impl AsRef<str>,
expression: &FilterExpression,
) -> Result<Vec<MessageView>, ClientError> {
- self.receive_with_batch_size(topic.as_ref(), expression, 32,
Duration::from_secs(15))
+ self.receive_with(topic.as_ref(), expression, 32,
Duration::from_secs(15))
.await
}
@@ -127,7 +131,7 @@ impl SimpleConsumer {
/// * `expression` - the subscription for the topic
/// * `batch_size` - max message num of server returned
/// * `invisible_duration` - set the invisible duration of messages that
return from the server, these messages will not be visible to other consumers
unless timeout
- pub async fn receive_with_batch_size(
+ pub async fn receive_with(
&self,
topic: impl AsRef<str>,
expression: &FilterExpression,