This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3cdf73 fix(rust): fix session life cycle (#646)
2d3cdf73 is described below
commit 2d3cdf735e27a3c3a9e1786d0ee7c8318ba1578c
Author: SSpirits <[email protected]>
AuthorDate: Wed Dec 6 19:35:13 2023 +0800
fix(rust): fix session life cycle (#646)
* fix(rust): fix session life cycle
Signed-off-by: SSpirits <[email protected]>
* fix(rust): fix session life cycle
Signed-off-by: SSpirits <[email protected]>
---------
Signed-off-by: SSpirits <[email protected]>
---
rust/src/client.rs | 4 ++--
rust/src/session.rs | 13 +++++++------
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 69000be2..91bd3692 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -190,14 +190,14 @@ impl Client {
},
command = telemetry_command_rx.recv() => {
if let Some(command) = command {
- let result =
Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker,
endpoints.clone(), command).await;
+ let result =
Self::handle_telemetry_command(rpc_client.shadow_session(),
&transaction_checker, endpoints.clone(), command).await;
if let Err(error) = result {
error!(logger, "handle telemetry command
failed: {:?}", error);
}
}
},
_ = &mut shutdown_rx => {
- debug!(logger, "receive shutdown signal, stop
heartbeat task and telemetry command handler");
+ info!(logger, "receive shutdown signal, stop heartbeat
task and telemetry command handler");
break;
}
}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index d54894d8..c69e9ee9 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -102,8 +102,8 @@ pub(crate) struct Session {
shutdown_tx: Option<oneshot::Sender<()>>,
}
-impl Clone for Session {
- fn clone(&self) -> Self {
+impl Session {
+ pub(crate) fn shadow_session(&self) -> Self {
Session {
logger: self.logger.clone(),
client_id: self.client_id.clone(),
@@ -580,7 +580,7 @@ impl SessionManager {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
return if session_map.contains_key(&endpoint_url) {
- Ok(session_map.get(&endpoint_url).unwrap().clone())
+ Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
let mut session = Session::new(
&self.logger,
@@ -590,8 +590,9 @@ impl SessionManager {
)
.await?;
session.start(settings, telemetry_command_tx).await?;
- session_map.insert(endpoint_url.clone(), session.clone());
- Ok(session)
+ let shadow_session = session.shadow_session();
+ session_map.insert(endpoint_url.clone(), session);
+ Ok(shadow_session)
};
}
@@ -599,7 +600,7 @@ impl SessionManager {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
for (_, session) in session_map.iter() {
- sessions.push(session.clone());
+ sessions.push(session.shadow_session());
}
Ok(sessions)
}