This is an automated email from the ASF dual-hosted git repository.
sspirits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 430ed16c [rust]sync client settings periodically (#691)
430ed16c is described below
commit 430ed16c13c407c56a096c599a38cad383b223dd
Author: Qiping Luo <[email protected]>
AuthorDate: Tue Mar 12 15:54:31 2024 +0800
[rust]sync client settings periodically (#691)
* [rust]sync client settings periodically
* fix ugly import
* reuse existing update_settings function
* simplify code
* fix: avoid unwrap in main code
Signed-off-by: Li Zhanhui <[email protected]>
* fix: fix tests
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
---
rust/src/client.rs | 164 ++++++++++++++++++++++++++----------------
rust/src/conf.rs | 36 ++++++++++
rust/src/model/transaction.rs | 4 +-
rust/src/producer.rs | 46 ++++++------
rust/src/session.rs | 9 +--
rust/src/simple_consumer.rs | 15 ++--
rust/src/util.rs | 41 +++++------
7 files changed, 192 insertions(+), 123 deletions(-)
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 884f98b0..4183256f 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -26,9 +26,10 @@ use parking_lot::Mutex;
use prost_types::Duration;
use slog::{debug, error, info, o, warn, Logger};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, RwLock};
+use tokio::time::Instant;
-use crate::conf::ClientOption;
+use crate::conf::{ClientOption, SettingsAware};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, Endpoints, Route, RouteStatus,
SendReceipt};
use crate::model::message::AckMessageEntry;
@@ -44,14 +45,14 @@ use crate::pb::{
use crate::session::SessionManager;
use crate::session::{RPCClient, Session};
-pub(crate) struct Client {
+pub(crate) struct Client<S> {
logger: Logger,
option: ClientOption,
session_manager: Arc<SessionManager>,
route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
id: String,
access_endpoints: Endpoints,
- settings: TelemetryCommand,
+ settings: Arc<RwLock<S>>,
telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -68,7 +69,10 @@ const OPERATION_SEND_MESSAGE: &str = "client.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
-impl Debug for Client {
+impl<S> Debug for Client<S>
+where
+ S: SettingsAware + 'static,
+{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("id", &self.id)
@@ -79,11 +83,14 @@ impl Debug for Client {
}
#[automock]
-impl Client {
+impl<S> Client<S>
+where
+ S: SettingsAware + 'static + Send + Sync,
+{
pub(crate) fn new(
logger: &Logger,
option: ClientOption,
- settings: TelemetryCommand,
+ settings: Arc<RwLock<S>>,
) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
@@ -131,12 +138,16 @@ impl Client {
.await
.map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
+ let settings = Arc::clone(&self.settings);
tokio::spawn(async move {
rpc_client.is_started();
- let mut interval =
tokio::time::interval(std::time::Duration::from_secs(30));
+ let seconds_30 = std::time::Duration::from_secs(30);
+ let mut heartbeat_interval = tokio::time::interval(seconds_30);
+ let mut sync_settings_interval =
+ tokio::time::interval_at(Instant::now() + seconds_30,
seconds_30);
loop {
select! {
- _ = interval.tick() => {
+ _ = heartbeat_interval.tick() => {
let sessions =
session_manager.get_all_sessions().await;
if sessions.is_err() {
error!(
@@ -159,7 +170,7 @@ impl Client {
continue;
}
let result =
-
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
+
handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
if result.is_err() {
error!(
logger,
@@ -171,13 +182,34 @@ impl Client {
debug!(logger,"send heartbeat to server success,
peer={}",peer);
}
},
+ _ = sync_settings_interval.tick() => {
+ let sessions =
session_manager.get_all_sessions().await;
+ if sessions.is_err() {
+ error!(logger, "sync settings failed: failed to
get sessions: {}", sessions.unwrap_err());
+ continue;
+ }
+ for mut session in sessions.unwrap() {
+ let command;
+ {
+ command =
settings.read().await.build_telemetry_command();
+ }
+ let peer = session.peer().to_string();
+ let result =
session.update_settings(command).await;
+ if result.is_err() {
+ error!(logger, "sync settings failed: failed
to call rpc: {}", result.unwrap_err());
+ continue;
+ }
+ debug!(logger, "sync settings success, peer = {}",
peer);
+ }
+
+ },
_ = &mut shutdown_rx => {
- info!(logger, "receive shutdown signal, stop heartbeat
task.");
+ info!(logger, "receive shutdown signal, stop heartbeat
and telemetry tasks.");
break;
}
}
}
- info!(logger, "heartbeat task is stopped");
+ info!(logger, "heartbeat and telemetry task were stopped");
});
Ok(())
}
@@ -206,7 +238,7 @@ impl Client {
resource_namespace: self.option.namespace.to_string(),
});
let response =
rpc_client.notify_shutdown(NotifyClientTerminationRequest { group });
- Self::handle_response_status(response.await?.status,
OPERATION_CLIENT_SHUTDOWN)?;
+ handle_response_status(response.await?.status,
OPERATION_CLIENT_SHUTDOWN)?;
self.session_manager.shutdown().await;
Ok(())
}
@@ -234,13 +266,17 @@ impl Client {
)
}
+ async fn build_telemetry_command(&self) -> TelemetryCommand {
+ self.settings.read().await.build_telemetry_command()
+ }
+
pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
self.check_started(OPERATION_GET_SESSION)?;
let session = self
.session_manager
.get_or_create_session(
&self.access_endpoints,
- self.settings.clone(),
+ self.build_telemetry_command().await,
self.telemetry_command_tx.clone().unwrap(),
)
.await?;
@@ -255,37 +291,13 @@ impl Client {
.session_manager
.get_or_create_session(
endpoints,
- self.settings.clone(),
+ self.build_telemetry_command().await,
self.telemetry_command_tx.clone().unwrap(),
)
.await?;
Ok(session)
}
- pub(crate) fn handle_response_status(
- status: Option<Status>,
- operation: &'static str,
- ) -> Result<(), ClientError> {
- if status.is_none() {
- return Err(ClientError::new(
- ErrorKind::Server,
- "server do not return status, this may be a bug",
- operation,
- ));
- }
-
- let status = status.unwrap();
- let status_code = Code::from_i32(status.code).unwrap();
- if !status_code.eq(&Code::Ok) {
- return Err(
- ClientError::new(ErrorKind::Server, "server return an error",
operation)
- .with_context("code", status_code.as_str_name())
- .with_context("message", status.message),
- );
- }
- Ok(())
- }
-
pub(crate) fn topic_route_from_cache(&self, topic: &str) ->
Option<Arc<Route>> {
self.route_table.lock().get(topic).and_then(|route_status| {
if let RouteStatus::Found(route) = route_status {
@@ -325,7 +337,7 @@ impl Client {
};
let response = rpc_client.query_route(request).await?;
- Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
+ handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
let route = Route {
index: AtomicUsize::new(0),
@@ -454,7 +466,7 @@ impl Client {
) -> Result<Vec<SendReceipt>, ClientError> {
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
- Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
+ handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
Ok(response
.entries
@@ -512,7 +524,7 @@ impl Client {
if status.code() == Code::MessageNotFound {
return Ok(vec![]);
}
- Self::handle_response_status(Some(status),
OPERATION_RECEIVE_MESSAGE)?;
+ handle_response_status(Some(status),
OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
messages.push(message);
@@ -560,7 +572,7 @@ impl Client {
entries,
};
let response = rpc_client.ack_message(request).await?;
- Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+ handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
@@ -605,11 +617,31 @@ impl Client {
message_id,
};
let response = rpc_client.change_invisible_duration(request).await?;
- Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+ handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.receipt_handle)
}
}
+pub fn handle_response_status(
+ status: Option<Status>,
+ operation: &'static str,
+) -> Result<(), ClientError> {
+ let status = status.ok_or(ClientError::new(
+ ErrorKind::Server,
+ "server do not return status, this may be a bug",
+ operation,
+ ))?;
+
+ if status.code != Code::Ok as i32 {
+ return Err(
+ ClientError::new(ErrorKind::Server, "server return an error",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message),
+ );
+ }
+ Ok(())
+}
+
#[cfg(test)]
pub(crate) mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -624,7 +656,6 @@ pub(crate) mod tests {
use crate::error::{ClientError, ErrorKind};
use crate::log::terminal_logger;
use crate::model::common::{ClientType, Route};
- use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse,
Code,
FilterExpression, HeartbeatResponse, Message, MessageQueue,
QueryRouteResponse,
@@ -637,7 +668,16 @@ pub(crate) mod tests {
// The lock is used to prevent the mocking static function at same time
during parallel testing.
pub(crate) static MTX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
- fn new_client_for_test() -> Client {
+ #[derive(Default)]
+ struct MockSettings {}
+
+ impl SettingsAware for MockSettings {
+ fn build_telemetry_command(&self) -> TelemetryCommand {
+ TelemetryCommand::default()
+ }
+ }
+
+ fn new_client_for_test() -> Client<MockSettings> {
Client {
logger: terminal_logger(),
option: ClientOption {
@@ -646,24 +686,24 @@ pub(crate) mod tests {
},
session_manager: Arc::new(SessionManager::default()),
route_table: Mutex::new(HashMap::new()),
- id: Client::generate_client_id(),
+ id: Client::<MockSettings>::generate_client_id(),
access_endpoints:
Endpoints::from_url("http://localhost:8081").unwrap(),
- settings: TelemetryCommand::default(),
+ settings: Arc::new(RwLock::new(MockSettings::default())),
telemetry_command_tx: None,
shutdown_tx: None,
}
}
- fn new_client_with_session_manager(session_manager: SessionManager) ->
Client {
+ fn new_client_with_session_manager(session_manager: SessionManager) ->
Client<MockSettings> {
let (tx, _) = mpsc::channel(16);
Client {
logger: terminal_logger(),
option: ClientOption::default(),
session_manager: Arc::new(session_manager),
route_table: Mutex::new(HashMap::new()),
- id: Client::generate_client_id(),
+ id: Client::<MockSettings>::generate_client_id(),
access_endpoints:
Endpoints::from_url("http://localhost:8081").unwrap(),
- settings: TelemetryCommand::default(),
+ settings: Arc::new(RwLock::new(MockSettings::default())),
telemetry_command_tx: Some(tx),
shutdown_tx: None,
}
@@ -684,7 +724,7 @@ pub(crate) mod tests {
Client::new(
&terminal_logger(),
ClientOption::default(),
- TelemetryCommand::default(),
+ Arc::new(RwLock::new(MockSettings::default())),
)?;
Ok(())
}
@@ -728,8 +768,8 @@ pub(crate) mod tests {
}
#[test]
- fn handle_response_status() {
- let result = Client::handle_response_status(None, "test");
+ fn test_handle_response_status() {
+ let result = handle_response_status(None, "test");
assert!(result.is_err(), "should return error when status is None");
let result = result.unwrap_err();
assert_eq!(result.kind, ErrorKind::Server);
@@ -739,7 +779,7 @@ pub(crate) mod tests {
);
assert_eq!(result.operation, "test");
- let result = Client::handle_response_status(
+ let result = handle_response_status(
Some(Status {
code: Code::BadRequest as i32,
message: "test failed".to_string(),
@@ -757,12 +797,12 @@ pub(crate) mod tests {
assert_eq!(
result.context,
vec![
- ("code", "BAD_REQUEST".to_string()),
+ ("code", format!("{}", Code::BadRequest as i32)),
("message", "test failed".to_string()),
]
);
- let result = Client::handle_response_status(
+ let result = handle_response_status(
Some(Status {
code: Code::Ok as i32,
message: "test success".to_string(),
@@ -897,9 +937,13 @@ pub(crate) mod tests {
mock.expect_heartbeat()
.return_once(|_| Box::pin(futures::future::ready(response)));
- let send_result =
- Client::heart_beat_inner(mock, &Some("group".to_string()), "",
&ClientType::Producer)
- .await;
+ let send_result = Client::<MockSettings>::heart_beat_inner(
+ mock,
+ &Some("group".to_string()),
+ "",
+ &ClientType::Producer,
+ )
+ .await;
assert!(send_result.is_ok());
}
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index fa16a41a..8b270612 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -20,10 +20,12 @@
use std::time::Duration;
use crate::model::common::ClientType;
+use crate::pb::TelemetryCommand;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
+use crate::util::{build_producer_settings, build_simple_consumer_settings};
/// [`ClientOption`] is the configuration of internal client, which manages
the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
@@ -130,6 +132,7 @@ pub struct ProducerOption {
topics: Option<Vec<String>>,
namespace: String,
validate_message_type: bool,
+ timeout: Duration,
}
impl Default for ProducerOption {
@@ -140,6 +143,7 @@ impl Default for ProducerOption {
topics: None,
namespace: "".to_string(),
validate_message_type: true,
+ timeout: Duration::from_secs(3),
}
}
}
@@ -188,6 +192,10 @@ impl ProducerOption {
pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
self.validate_message_type = validate_message_type;
}
+
+ pub fn timeout(&self) -> &Duration {
+ &self.timeout
+ }
}
/// The configuration of [`SimpleConsumer`].
@@ -198,6 +206,8 @@ pub struct SimpleConsumerOption {
prefetch_route: bool,
topics: Option<Vec<String>>,
namespace: String,
+ timeout: Duration,
+ long_polling_timeout: Duration,
}
impl Default for SimpleConsumerOption {
@@ -208,6 +218,8 @@ impl Default for SimpleConsumerOption {
prefetch_route: true,
topics: None,
namespace: "".to_string(),
+ timeout: Duration::from_secs(3),
+ long_polling_timeout: Duration::from_secs(40),
}
}
}
@@ -256,6 +268,30 @@ impl SimpleConsumerOption {
pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
self.namespace = name_space.into();
}
+
+ pub fn timeout(&self) -> &Duration {
+ &self.timeout
+ }
+
+ pub fn long_polling_timeout(&self) -> &Duration {
+ &self.long_polling_timeout
+ }
+}
+
+pub trait SettingsAware {
+ fn build_telemetry_command(&self) -> TelemetryCommand;
+}
+
+impl SettingsAware for ProducerOption {
+ fn build_telemetry_command(&self) -> TelemetryCommand {
+ build_producer_settings(self)
+ }
+}
+
+impl SettingsAware for SimpleConsumerOption {
+ fn build_telemetry_command(&self) -> TelemetryCommand {
+ build_simple_consumer_settings(self)
+ }
}
#[cfg(test)]
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 2a40d7bf..0a4c6871 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -21,7 +21,7 @@ use std::fmt::{Debug, Formatter};
use async_trait::async_trait;
-use crate::client::Client;
+use crate::client::handle_response_status;
use crate::error::ClientError;
use crate::model::common::SendReceipt;
use crate::model::message::MessageView;
@@ -95,7 +95,7 @@ impl TransactionImpl {
trace_context: "".to_string(),
})
.await?;
- Client::handle_response_status(response.status, "end transaction")
+ handle_response_status(response.status, "end transaction")
}
}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index e456cbe7..26ba496a 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -26,6 +26,7 @@ use tokio::select;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot};
+use crate::client::handle_response_status;
#[double]
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
@@ -40,8 +41,8 @@ use
crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, S
use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties,
TransactionSource};
use crate::session::RPCClient;
use crate::util::{
- build_endpoints_by_message_queue, build_producer_settings,
select_message_queue,
- select_message_queue_by_message_group, HOST_NAME,
+ build_endpoints_by_message_queue, select_message_queue,
select_message_queue_by_message_group,
+ HOST_NAME,
};
use crate::{log, pb};
@@ -54,7 +55,7 @@ use crate::{log, pb};
pub struct Producer {
option: Arc<RwLock<ProducerOption>>,
logger: Logger,
- client: Client,
+ client: Client<ProducerOption>,
transaction_checker: Option<Box<TransactionChecker>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -85,9 +86,8 @@ impl Producer {
..client_option
};
let logger = log::logger(option.logging_format());
- let settings = build_producer_settings(&option, &client_option);
- let client = Client::new(&logger, client_option, settings)?;
let option = Arc::new(RwLock::new(option));
+ let client = Client::new(&logger, client_option, Arc::clone(&option))?;
Ok(Producer {
option,
logger,
@@ -115,9 +115,8 @@ impl Producer {
..client_option
};
let logger = log::logger(option.logging_format());
- let settings = build_producer_settings(&option, &client_option);
- let client = Client::new(&logger, client_option, settings)?;
let option = Arc::new(RwLock::new(option));
+ let client = Client::<ProducerOption>::new(&logger, client_option,
Arc::clone(&option))?;
Ok(Producer {
option,
logger,
@@ -128,9 +127,7 @@ impl Producer {
}
async fn get_resource_namespace(&self) -> String {
- let option_guard = self.option.read();
- let resource_namespace = option_guard.await.namespace().to_string();
- resource_namespace
+ self.option.read().await.namespace().to_string()
}
/// Start the producer
@@ -139,14 +136,15 @@ impl Producer {
let telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>
=
telemetry_command_tx;
self.client.start(telemetry_command_tx).await?;
- let option_guard = self.option.read().await;
- let topics = option_guard.topics();
- if let Some(topics) = topics {
- for topic in topics {
- self.client.topic_route(topic, true).await?;
+ {
+ let option_guard = self.option.read().await;
+ let topics = option_guard.topics();
+ if let Some(topics) = topics {
+ for topic in topics {
+ self.client.topic_route(topic, true).await?;
+ }
}
}
- drop(option_guard);
let transaction_checker = self.transaction_checker.take();
if transaction_checker.is_some() {
self.transaction_checker = Some(Box::new(|_, _|
TransactionResolution::UNKNOWN));
@@ -245,7 +243,7 @@ impl Producer {
trace_context: "".to_string(),
})
.await?;
- Client::handle_response_status(response.status,
Self::OPERATION_END_TRANSACTION)
+ handle_response_status(response.status,
Self::OPERATION_END_TRANSACTION)
} else {
Err(ClientError::new(
ErrorKind::Config,
@@ -396,9 +394,7 @@ impl Producer {
select_message_queue(route)
};
- let option_guard = self.option.read().await;
- let validate_message_type = option_guard.validate_message_type();
- drop(option_guard);
+ let validate_message_type = self.validate_message_type().await;
if validate_message_type {
for message_type in message_types {
if !message_queue.accept_type(message_type) {
@@ -424,6 +420,10 @@ impl Producer {
self.client.send_message(&endpoints, pb_messages).await
}
+ async fn validate_message_type(&self) -> bool {
+ self.option.read().await.validate_message_type()
+ }
+
pub fn has_transaction_checker(&self) -> bool {
self.transaction_checker.is_some()
}
@@ -500,7 +500,7 @@ mod tests {
async fn producer_start() -> Result<(), ClientError> {
let _m = crate::client::tests::MTX.lock();
- let ctx = Client::new_context();
+ let ctx = Client::<ProducerOption>::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
@@ -533,7 +533,7 @@ mod tests {
async fn transaction_producer_start() -> Result<(), ClientError> {
let _m = crate::client::tests::MTX.lock();
- let ctx = Client::new_context();
+ let ctx = Client::<ProducerOption>::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
@@ -774,8 +774,6 @@ mod tests {
mock.expect_end_transaction()
.return_once(|_| Box::pin(futures::future::ready(response)));
- let context = MockClient::handle_response_status_context();
- context.expect().return_once(|_, _| Result::Ok(()));
let result = Producer::handle_recover_orphaned_transaction_command(
mock,
pb::RecoverOrphanedTransactionCommand {
diff --git a/rust/src/session.rs b/rust/src/session.rs
index c69e9ee9..7b2643c8 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -110,7 +110,7 @@ impl Session {
option: self.option.clone(),
endpoints: self.endpoints.clone(),
stub: self.stub.clone(),
- telemetry_tx: None,
+ telemetry_tx: self.telemetry_tx.clone(),
shutdown_tx: None,
}
}
@@ -686,10 +686,7 @@ mod tests {
let (tx, _) = mpsc::channel(16);
let result = session
- .start(
- build_producer_settings(&ProducerOption::default(),
&ClientOption::default()),
- tx,
- )
+ .start(build_producer_settings(&ProducerOption::default()), tx)
.await;
assert!(result.is_ok());
assert!(session.is_started());
@@ -714,7 +711,7 @@ mod tests {
let session = session_manager
.get_or_create_session(
&Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
- build_producer_settings(&ProducerOption::default(),
&client_option),
+ build_producer_settings(&ProducerOption::default()),
tx,
)
.await
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index e891d384..8d655000 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -15,12 +15,13 @@
* limitations under the License.
*/
+use std::sync::Arc;
use std::time::Duration;
use mockall_double::double;
use slog::{info, warn, Logger};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, RwLock};
#[double]
use crate::client::Client;
@@ -28,9 +29,7 @@ use crate::conf::{ClientOption, SimpleConsumerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, FilterExpression};
use crate::model::message::{AckMessageEntry, MessageView};
-use crate::util::{
- build_endpoints_by_message_queue, build_simple_consumer_settings,
select_message_queue,
-};
+use crate::util::{build_endpoints_by_message_queue, select_message_queue};
use crate::{log, pb};
/// [`SimpleConsumer`] is a lightweight consumer to consume messages from
RocketMQ proxy.
@@ -46,7 +45,7 @@ use crate::{log, pb};
pub struct SimpleConsumer {
option: SimpleConsumerOption,
logger: Logger,
- client: Client,
+ client: Client<SimpleConsumerOption>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -75,8 +74,8 @@ impl SimpleConsumer {
..client_option
};
let logger = log::logger(option.logging_format());
- let settings = build_simple_consumer_settings(&option, &client_option);
- let client = Client::new(&logger, client_option, settings)?;
+ let settings = Arc::new(RwLock::new(option.clone()));
+ let client = Client::<SimpleConsumerOption>::new(&logger,
client_option, settings)?;
Ok(SimpleConsumer {
option,
logger,
@@ -230,7 +229,7 @@ mod tests {
async fn simple_consumer_start() -> Result<(), ClientError> {
let _m = crate::client::tests::MTX.lock();
- let ctx = Client::new_context();
+ let ctx = Client::<SimpleConsumerOption>::new_context();
ctx.expect().return_once(|_, _, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
diff --git a/rust/src/util.rs b/rust/src/util.rs
index ac935b6b..1e5cf11c 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use once_cell::sync::Lazy;
use std::hash::Hasher;
use std::sync::atomic::Ordering;
use std::sync::Arc;
-use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption};
+use once_cell::sync::Lazy;
use siphasher::sip::SipHasher24;
+use crate::conf::{ProducerOption, SimpleConsumerOption};
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route};
+use crate::model::common::{ClientType, Endpoints, Route};
use crate::pb::settings::PubSub;
use crate::pb::telemetry_command::Command;
use crate::pb::{
@@ -84,10 +84,7 @@ pub(crate) fn build_endpoints_by_message_queue(
Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap()))
}
-pub(crate) fn build_producer_settings(
- option: &ProducerOption,
- client_options: &ClientOption,
-) -> TelemetryCommand {
+pub(crate) fn build_producer_settings(option: &ProducerOption) ->
TelemetryCommand {
let topics = option
.topics()
.clone()
@@ -101,10 +98,10 @@ pub(crate) fn build_producer_settings(
let platform = os_info::get();
TelemetryCommand {
command: Some(Command::Settings(Settings {
- client_type: Some(client_options.client_type.clone() as i32),
+ client_type: Some(ClientType::Producer as i32),
request_timeout: Some(prost_types::Duration {
- seconds: client_options.timeout().as_secs() as i64,
- nanos: client_options.timeout().subsec_nanos() as i32,
+ seconds: option.timeout().as_secs() as i64,
+ nanos: option.timeout().subsec_nanos() as i32,
}),
pub_sub: Some(PubSub::Publishing(Publishing {
topics,
@@ -123,17 +120,14 @@ pub(crate) fn build_producer_settings(
}
}
-pub(crate) fn build_simple_consumer_settings(
- option: &SimpleConsumerOption,
- client_option: &ClientOption,
-) -> TelemetryCommand {
+pub(crate) fn build_simple_consumer_settings(option: &SimpleConsumerOption) ->
TelemetryCommand {
let platform = os_info::get();
TelemetryCommand {
command: Some(Command::Settings(Settings {
- client_type: Some(client_option.client_type.clone() as i32),
+ client_type: Some(ClientType::SimpleConsumer as i32),
request_timeout: Some(prost_types::Duration {
- seconds: client_option.timeout().as_secs() as i64,
- nanos: client_option.timeout().subsec_nanos() as i32,
+ seconds: option.timeout().as_secs() as i64,
+ nanos: option.timeout().subsec_nanos() as i32,
}),
pub_sub: Some(PubSub::Subscription(Subscription {
group: Some(Resource {
@@ -144,8 +138,8 @@ pub(crate) fn build_simple_consumer_settings(
fifo: Some(false),
receive_batch_size: None,
long_polling_timeout: Some(prost_types::Duration {
- seconds: client_option.long_polling_timeout().as_secs() as
i64,
- nanos: client_option.long_polling_timeout().subsec_nanos()
as i32,
+ seconds: option.long_polling_timeout().as_secs() as i64,
+ nanos: option.long_polling_timeout().subsec_nanos() as i32,
}),
})),
user_agent: Some(Ua {
@@ -162,11 +156,12 @@ pub(crate) fn build_simple_consumer_settings(
#[cfg(test)]
mod tests {
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::Arc;
+
use crate::model::common::Route;
use crate::pb;
use crate::pb::{Broker, MessageQueue};
- use std::sync::atomic::AtomicUsize;
- use std::sync::Arc;
use super::*;
@@ -272,11 +267,11 @@ mod tests {
#[test]
fn util_build_producer_settings() {
- build_producer_settings(&ProducerOption::default(),
&ClientOption::default());
+ build_producer_settings(&ProducerOption::default());
}
#[test]
fn util_build_simple_consumer_settings() {
- build_simple_consumer_settings(&SimpleConsumerOption::default(),
&ClientOption::default());
+ build_simple_consumer_settings(&SimpleConsumerOption::default());
}
}