This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f99a835d test(rust): add unit tests (#484)
f99a835d is described below
commit f99a835dfac9b7e5ba3c1e67eece1cde3aee4b51
Author: SSpirits <[email protected]>
AuthorDate: Tue Apr 18 20:00:01 2023 +0800
test(rust): add unit tests (#484)
* test(rust): add unit tests
* doc(rust): fix README.md of rust sdk
---
rust/README.md | 6 +++
rust/src/client.rs | 37 +++++++------
rust/src/conf.rs | 33 +++++++++++-
rust/src/error.rs | 19 +++++++
rust/src/lib.rs | 2 -
rust/src/model/common.rs | 1 +
rust/src/model/message.rs | 107 ++++++++++++++++++++++++++++++--------
rust/src/producer.rs | 84 ++++++++++++++++++++++++++----
rust/src/session.rs | 17 ++++--
rust/src/simple_consumer.rs | 98 ++++++++++++++++++++++++++++++++++
rust/src/util.rs | 124 +++++++++++++++++++++++++++++++++++++++++++-
11 files changed, 470 insertions(+), 58 deletions(-)
diff --git a/rust/README.md b/rust/README.md
index 5c484837..e8621d5f 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -16,6 +16,12 @@ Here are some preparations you may need to know (or refer to
[here](https://rock
2. protoc 3.15.0+
3. setup name server, broker, and
[proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
+### Run Tests
+
+```sh
+cargo llvm-cov --ignore-filename-regex pb/ --open
+```
+
### Run Example
Run the following command to start the example:
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 7c0fbdf3..fe5cf340 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -56,15 +56,15 @@ lazy_static::lazy_static! {
static ref CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
}
+const OPERATION_CLIENT_NEW: &'static str = "client.new";
+const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
+const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
+const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
+const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
+const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
+
#[automock]
impl Client {
- const OPERATION_CLIENT_NEW: &'static str = "client.new";
- const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
- const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
- const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
- const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
- const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
-
pub(crate) fn new(
logger: &Logger,
option: ClientOption,
@@ -72,7 +72,7 @@ impl Client {
) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
- .map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
+ .map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
let session_manager = SessionManager::new(logger, id.clone(), &option);
Ok(Client {
logger: logger.new(o!("component" => "client")),
@@ -120,7 +120,7 @@ impl Client {
continue;
}
let result =
-
Self::handle_response_status(response.unwrap().status,
Self::OPERATION_HEARTBEAT);
+
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
if result.is_err() {
error!(
logger,
@@ -137,6 +137,7 @@ impl Client {
});
}
+ #[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
@@ -214,6 +215,7 @@ impl Client {
})
}
+ #[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
@@ -242,7 +244,7 @@ impl Client {
};
let response = rpc_client.query_route(request).await?;
- Self::handle_response_status(response.status,
Self::OPERATION_QUERY_ROUTE)?;
+ Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
let route = Route {
index: AtomicUsize::new(0),
@@ -288,7 +290,7 @@ impl Client {
Err(_e) => Err(ClientError::new(
ErrorKind::ChannelReceive,
"wait for inflight query topic route request failed",
- Self::OPERATION_QUERY_ROUTE,
+ OPERATION_QUERY_ROUTE,
)),
};
}
@@ -327,7 +329,7 @@ impl Client {
let _ = item.send(Err(ClientError::new(
ErrorKind::Server,
"query topic route failed",
- Self::OPERATION_QUERY_ROUTE,
+ OPERATION_QUERY_ROUTE,
)));
}
};
@@ -352,6 +354,7 @@ impl Client {
Ok(response)
}
+ #[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
@@ -372,7 +375,7 @@ impl Client {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
- Self::handle_response_status(response.status,
Self::OPERATION_SEND_MESSAGE)?;
+ Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
if response.entries.len() != message_count {
error!(self.logger, "server do not return illegal send result,
this may be a bug. except result count: {}, found: {}", response.entries.len(),
message_count);
@@ -381,6 +384,7 @@ impl Client {
Ok(response.entries)
}
+ #[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
@@ -427,7 +431,7 @@ impl Client {
for response in responses {
match response.content.unwrap() {
Content::Status(status) => {
- Self::handle_response_status(Some(status),
Self::OPERATION_RECEIVE_MESSAGE)?;
+ Self::handle_response_status(Some(status),
OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
messages.push(message);
@@ -438,6 +442,7 @@ impl Client {
Ok(messages)
}
+ #[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: T,
@@ -475,7 +480,7 @@ impl Client {
entries,
};
let response = rpc_client.ack_message(request).await?;
- Self::handle_response_status(response.status,
Self::OPERATION_ACK_MESSAGE)?;
+ Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
}
@@ -623,7 +628,7 @@ mod tests {
assert!(result.is_ok(), "should not return error when status is Ok");
}
- fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
+ pub(crate) fn new_topic_route_response() -> Result<QueryRouteResponse,
ClientError> {
Ok(QueryRouteResponse {
status: Some(Status {
code: Code::Ok as i32,
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 8423d901..bb6faf6d 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -35,7 +35,7 @@ impl Default for ClientOption {
group: "".to_string(),
namespace: "".to_string(),
access_url: "localhost:8081".to_string(),
- enable_tls: false,
+ enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
}
@@ -72,7 +72,7 @@ impl ClientOption {
}
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LoggingFormat {
Terminal,
Json,
@@ -195,3 +195,32 @@ impl SimpleConsumerOption {
self.namespace = name_space.into();
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn conf_client_option() {
+ let option = ClientOption::default();
+ assert_eq!(option.access_url(), "localhost:8081");
+ assert!(option.enable_tls());
+ assert_eq!(option.timeout(), &Duration::from_secs(3));
+ assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
+ }
+
+ #[test]
+ fn conf_producer_option() {
+ let option = ProducerOption::default();
+ assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
+ assert!(option.prefetch_route());
+ assert!(option.validate_message_type());
+ }
+
+ #[test]
+ fn conf_simple_consumer_option() {
+ let option = SimpleConsumerOption::default();
+ assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
+ assert!(option.prefetch_route());
+ }
+}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index e7299a5f..9eb3d519 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -158,3 +158,22 @@ impl Debug for ClientError {
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn error_client_error() {
+ let err = ClientError::new(ErrorKind::Config, "fake_message",
"error_client_error")
+ .with_operation("another_operation")
+ .with_context("context_key", "context_value")
+ .set_source(anyhow::anyhow!("fake_source_error"));
+ assert_eq!(
+ err.to_string(),
+ "Failed to parse config at another_operation, context: { called:
error_client_error, context_key: context_value } => fake_message, source:
fake_source_error"
+ );
+ assert_eq!(format!("{:?}", err), "Failed to parse config at
another_operation => fake_message\n\nContext:\n called: error_client_error\n
context_key: context_value\n\nSource: fake_source_error\n");
+ assert_eq!(format!("{:#?}", err), "Error {\n kind: Config,\n
message: \"fake_message\",\n operation: \"another_operation\",\n context:
[\n (\n \"called\",\n \"error_client_error\",\n
),\n (\n \"context_key\",\n
\"context_value\",\n ),\n ],\n source: Some(\n
\"fake_source_error\",\n ),\n}");
+ }
+}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index c6d1cc26..7152611a 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -16,7 +16,6 @@
*/
#[allow(dead_code)]
pub mod conf;
-#[allow(dead_code)]
mod error;
#[allow(dead_code)]
mod log;
@@ -28,7 +27,6 @@ mod client;
mod pb;
mod session;
-#[allow(dead_code)]
pub mod model;
mod util;
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 9f1e947c..a488b152 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -24,6 +24,7 @@ use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
+#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 259a4847..5ca53df1 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -94,29 +94,33 @@ impl MessageImpl {
}
pub fn fifo_message_builder(
- topic: String,
+ topic: impl Into<String>,
body: Vec<u8>,
- message_group: String,
+ message_group: impl Into<String>,
) -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
- topic,
+ topic: topic.into(),
body: Some(body),
tags: None,
keys: None,
properties: None,
- message_group: Some(message_group),
+ message_group: Some(message_group.into()),
delivery_timestamp: None,
},
}
}
- pub fn delay_message_builder(topic: String, body: Vec<u8>, delay_time:
i64) -> MessageBuilder {
+ pub fn delay_message_builder(
+ topic: impl Into<String>,
+ body: Vec<u8>,
+ delay_time: i64,
+ ) -> MessageBuilder {
MessageBuilder {
message: MessageImpl {
message_id: UNIQ_ID_GENERATOR.lock().next_id(),
- topic,
+ topic: topic.into(),
body: Some(body),
tags: None,
keys: None,
@@ -313,14 +317,14 @@ mod tests {
use super::*;
#[test]
- fn test_message() {
+ fn common_test_message() {
let mut properties = HashMap::new();
- properties.insert("key".to_string(), "value".to_string());
+ properties.insert("key", "value".to_string());
let message = MessageImpl::builder()
- .set_topic("test".to_string())
+ .set_topic("test")
.set_body(vec![1, 2, 3])
- .set_tags("tag".to_string())
- .set_keys(vec!["key".to_string()])
+ .set_tags("tag")
+ .set_keys(vec!["key"])
.set_properties(properties)
.build();
assert!(message.is_ok());
@@ -337,9 +341,9 @@ mod tests {
});
let message = MessageImpl::builder()
- .set_topic("test".to_string())
+ .set_topic("test")
.set_body(vec![1, 2, 3])
- .set_message_group("message_group".to_string())
+ .set_message_group("message_group")
.set_delivery_timestamp(123456789)
.build();
assert!(message.is_err());
@@ -350,23 +354,80 @@ mod tests {
"message_group and delivery_timestamp can not be set at the same
time."
);
- let message = MessageImpl::builder()
- .set_topic("test".to_string())
- .set_body(vec![1, 2, 3])
- .set_message_group("message_group".to_string())
- .build();
+ let message =
+ MessageImpl::fifo_message_builder("test", vec![1, 2, 3],
"message_group").build();
let mut message = message.unwrap();
assert_eq!(
message.take_message_group(),
Some("message_group".to_string())
);
- let message = MessageImpl::builder()
- .set_topic("test".to_string())
- .set_body(vec![1, 2, 3])
- .set_delivery_timestamp(123456789)
- .build();
+ let message = MessageImpl::delay_message_builder("test", vec![1, 2,
3], 123456789).build();
let mut message = message.unwrap();
assert_eq!(message.take_delivery_timestamp(), Some(123456789));
}
+
+ #[test]
+ fn common_message() {
+ let message_view = MessageView::from_pb_message(
+ pb::Message {
+ topic: Some(pb::Resource {
+ name: "test".to_string(),
+ ..Default::default()
+ }),
+ body: vec![1, 2, 3],
+ user_properties: {
+ let mut properties = HashMap::new();
+ properties.insert("key".to_string(), "value".to_string());
+ properties
+ },
+ system_properties: Some(pb::SystemProperties {
+ message_id: "message_id".to_string(),
+ receipt_handle: Some("receipt_handle".to_string()),
+ tag: Some("tag".to_string()),
+ keys: vec!["key".to_string()],
+ message_group: Some("message_group".to_string()),
+ delivery_timestamp: Some(prost_types::Timestamp {
+ seconds: 123456789,
+ ..Default::default()
+ }),
+ born_host: "born_host".to_string(),
+ born_timestamp: Some(prost_types::Timestamp {
+ seconds: 987654321,
+ ..Default::default()
+ }),
+ delivery_attempt: Some(1),
+ ..Default::default()
+ }),
+ },
+ Endpoints::from_url("localhost:8081").unwrap(),
+ );
+
+ assert_eq!(message_view.message_id(), "message_id");
+ assert_eq!(message_view.topic(), "test");
+ assert_eq!(message_view.body(), &[1, 2, 3]);
+ assert_eq!(message_view.tag(), Some("tag"));
+ assert_eq!(message_view.keys(), &["key"]);
+ assert_eq!(message_view.properties(), &{
+ let mut properties = HashMap::new();
+ properties.insert("key".to_string(), "value".to_string());
+ properties
+ });
+ assert_eq!(message_view.message_group(), Some("message_group"));
+ assert_eq!(message_view.delivery_timestamp(), Some(123456789));
+ assert_eq!(message_view.born_host(), "born_host");
+ assert_eq!(message_view.born_timestamp(), 987654321);
+ assert_eq!(message_view.delivery_attempt(), 1);
+
+ assert_eq!(AckMessageEntry::message_id(&message_view), "message_id");
+ assert_eq!(AckMessageEntry::topic(&message_view), "test");
+ assert_eq!(
+ AckMessageEntry::receipt_handle(&message_view),
+ "receipt_handle"
+ );
+ assert_eq!(
+ AckMessageEntry::endpoints(&message_view).endpoint_url(),
+ "localhost:8081"
+ );
+ }
}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 983d40d0..fc565673 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -209,7 +209,10 @@ impl Producer {
mod tests {
use crate::error::ErrorKind;
use crate::log::terminal_logger;
+ use crate::model::common::Route;
use crate::model::message::MessageImpl;
+ use crate::pb::{Broker, Code, MessageQueue, Status};
+ use std::sync::Arc;
use super::*;
@@ -221,15 +224,30 @@ mod tests {
}
}
- // #[tokio::test]
- // async fn producer_start() {
- // let mut producer_option = ProducerOption::default();
- // producer_option.set_topics(vec!["DefaultCluster".to_string()]);
- // let producer = Producer::new(producer_option,
ClientOption::default())
- // .await
- // .unwrap();
- // producer.start().await.unwrap();
- // }
+ #[tokio::test]
+ async fn producer_start() -> Result<(), ClientError> {
+ let ctx = Client::new_context();
+ ctx.expect().return_once(|_, _, _| {
+ let mut client = Client::default();
+ client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![],
+ }))
+ });
+ client.expect_start().returning(|| ());
+ client
+ .expect_client_id()
+ .return_const("fake_id".to_string());
+ Ok(client)
+ });
+ let mut producer_option = ProducerOption::default();
+ producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+ Producer::new(producer_option, ClientOption::default())?
+ .start()
+ .await?;
+ Ok(())
+ }
#[tokio::test]
async fn producer_transform_messages_to_protobuf() {
@@ -327,4 +345,52 @@ mod tests {
assert_eq!(err.kind, ErrorKind::InvalidMessage);
assert_eq!(err.message, "not all messages have the same message
group");
}
+
+ #[tokio::test]
+ async fn producer_send_one() -> Result<(), ClientError> {
+ let mut producer = new_producer_for_test();
+ producer.client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![MessageQueue {
+ topic: Some(Resource {
+ name: "test_topic".to_string(),
+ resource_namespace: "".to_string(),
+ }),
+ id: 0,
+ permission: 0,
+ broker: Some(Broker {
+ name: "".to_string(),
+ id: 0,
+ endpoints: Some(pb::Endpoints {
+ scheme: 0,
+ addresses: vec![],
+ }),
+ }),
+ accept_message_types: vec![],
+ }],
+ }))
+ });
+ producer.client.expect_send_message().returning(|_, _| {
+ Ok(vec![SendResultEntry {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "".to_string(),
+ }),
+ message_id: "".to_string(),
+ transaction_id: "".to_string(),
+ offset: 0,
+ }])
+ });
+ producer
+ .send_one(
+ MessageImpl::builder()
+ .set_topic("test_topic")
+ .set_body(vec![])
+ .build()
+ .unwrap(),
+ )
+ .await?;
+ Ok(())
+ }
}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 268e5fb4..6ef9b97d 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -27,7 +27,6 @@ use tonic::transport::{Channel, Endpoint};
use crate::conf::ClientOption;
use crate::error::ErrorKind;
-use crate::log::terminal_logger;
use crate::model::common::Endpoints;
use crate::pb::{
AckMessageRequest, AckMessageResponse, HeartbeatRequest,
HeartbeatResponse, QueryRouteRequest,
@@ -90,6 +89,7 @@ impl Session {
#[cfg(test)]
pub(crate) fn mock() -> Self {
+ use crate::log::terminal_logger;
Session {
logger: terminal_logger(),
client_id: "fake_id".to_string(),
@@ -440,6 +440,7 @@ impl SessionManager {
};
}
+ #[allow(dead_code)]
pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>,
ClientError> {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
@@ -467,11 +468,13 @@ mod tests {
async fn session_new() {
let server = RocketMQMockServer::start_default().await;
let logger = terminal_logger();
+ let mut client_option = ClientOption::default();
+ client_option.set_enable_tls(false);
let session = Session::new(
&logger,
&Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
"test_client".to_string(),
- &ClientOption::default(),
+ &client_option,
)
.await;
debug!(logger, "session: {:?}", session);
@@ -504,11 +507,13 @@ mod tests {
);
let logger = terminal_logger();
+ let mut client_option = ClientOption::default();
+ client_option.set_enable_tls(false);
let session = Session::new(
&logger,
&Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
"test_client".to_string(),
- &ClientOption::default(),
+ &client_option,
)
.await;
debug!(logger, "session: {:?}", session);
@@ -538,12 +543,14 @@ mod tests {
);
let logger = terminal_logger();
+ let mut client_option = ClientOption::default();
+ client_option.set_enable_tls(false);
let session_manager =
- SessionManager::new(&logger, "test_client".to_string(),
&ClientOption::default());
+ SessionManager::new(&logger, "test_client".to_string(),
&client_option);
let session = session_manager
.get_or_create_session(
&Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
- build_producer_settings(&ProducerOption::default(),
&ClientOption::default()),
+ build_producer_settings(&ProducerOption::default(),
&client_option),
)
.await
.unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index bccada6b..d06c6410 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -16,8 +16,10 @@
*/
use std::time::Duration;
+use mockall_double::double;
use slog::{info, Logger};
+#[double]
use crate::client::Client;
use crate::conf::{ClientOption, SimpleConsumerOption};
use crate::error::{ClientError, ErrorKind};
@@ -125,3 +127,99 @@ impl SimpleConsumer {
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::log::terminal_logger;
+ use std::sync::Arc;
+
+ use crate::model::common::{FilterType, Route};
+ use crate::pb::{
+ AckMessageResultEntry, Broker, Message, MessageQueue, Resource,
SystemProperties,
+ };
+
+ use super::*;
+
+ #[tokio::test]
+ async fn simple_consumer_start() -> Result<(), ClientError> {
+ let ctx = Client::new_context();
+ ctx.expect().return_once(|_, _, _| {
+ let mut client = Client::default();
+ client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![],
+ }))
+ });
+ client.expect_start().returning(|| ());
+ client
+ .expect_client_id()
+ .return_const("fake_id".to_string());
+ Ok(client)
+ });
+ let mut option = SimpleConsumerOption::default();
+ option.set_consumer_group("test_group");
+ option.set_topics(vec!["test_topic"]);
+ SimpleConsumer::new(option, ClientOption::default())?
+ .start()
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn simple_consumer_consume_message() -> Result<(), ClientError> {
+ let mut client = Client::default();
+ client.expect_topic_route().returning(|_, _| {
+ Ok(Arc::new(Route {
+ index: Default::default(),
+ queue: vec![MessageQueue {
+ topic: Some(Resource {
+ name: "test_topic".to_string(),
+ ..Default::default()
+ }),
+ id: 0,
+ permission: 0,
+ broker: Some(Broker {
+ name: "".to_string(),
+ id: 0,
+ endpoints: Some(pb::Endpoints {
+ scheme: 0,
+ addresses: vec![],
+ }),
+ }),
+ accept_message_types: vec![],
+ }],
+ }))
+ });
+ client.expect_receive_message().returning(|_, _, _, _, _| {
+ Ok(vec![Message {
+ topic: Some(Resource {
+ name: "test_topic".to_string(),
+ ..Default::default()
+ }),
+ system_properties: Some(SystemProperties::default()),
+ ..Default::default()
+ }])
+ });
+ client
+ .expect_ack_message()
+ .returning(|_: MessageView| Ok(AckMessageResultEntry::default()));
+ let simple_consumer = SimpleConsumer {
+ option: SimpleConsumerOption::default(),
+ logger: terminal_logger(),
+ client,
+ };
+
+ let messages = simple_consumer
+ .receive(
+ "test_topic",
+ &FilterExpression::new(FilterType::Tag,
"test_tag".to_string()),
+ )
+ .await?;
+ assert_eq!(messages.len(), 1);
+ simple_consumer
+ .ack(messages.into_iter().next().unwrap())
+ .await?;
+ Ok(())
+ }
+}
diff --git a/rust/src/util.rs b/rust/src/util.rs
index 94e49840..583e7327 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -66,7 +66,8 @@ pub(crate) fn build_endpoints_by_message_queue(
"message queue do not have a available endpoint",
operation,
)
- .with_context("message_queue", format!("{:?}", message_queue)));
+ .with_context("topic", topic)
+ .with_context("queue_id", message_queue.id.to_string()));
}
let broker = message_queue.broker.clone().unwrap();
@@ -159,3 +160,124 @@ pub(crate) fn build_simple_consumer_settings(
..TelemetryCommand::default()
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::model::common::Route;
+ use crate::pb;
+ use crate::pb::{Broker, MessageQueue};
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::Arc;
+
+ use super::*;
+
+ fn build_route() -> Arc<Route> {
+ let message_queue_1 = MessageQueue {
+ topic: None,
+ id: 1,
+ permission: 0,
+ broker: None,
+ accept_message_types: vec![],
+ };
+
+ let message_queue_2 = MessageQueue {
+ topic: None,
+ id: 2,
+ permission: 0,
+ broker: None,
+ accept_message_types: vec![],
+ };
+
+ Arc::new(Route {
+ index: AtomicUsize::new(0),
+ queue: vec![message_queue_1, message_queue_2],
+ })
+ }
+
+ #[test]
+ fn util_select_message_queue() {
+ let route = build_route();
+ let message_queue = select_message_queue(route.clone());
+ assert_eq!(message_queue.id, 1);
+ let message_queue = select_message_queue(route.clone());
+ assert_eq!(message_queue.id, 2);
+ let message_queue = select_message_queue(route);
+ assert_eq!(message_queue.id, 1);
+ }
+
+ #[test]
+ fn util_select_message_queue_by_message_group() {
+ let route = build_route();
+ let message_queue =
+ select_message_queue_by_message_group(route.clone(),
"group1".to_string());
+ assert_eq!(message_queue.id, 1);
+ let message_queue =
+ select_message_queue_by_message_group(route.clone(),
"group1".to_string());
+ assert_eq!(message_queue.id, 1);
+ let message_queue =
+ select_message_queue_by_message_group(route,
"another_group".to_string());
+ assert_eq!(message_queue.id, 2);
+ }
+
+ #[test]
+ fn util_build_endpoints_by_message_queue() {
+ let mut message_queue = MessageQueue {
+ topic: Some(Resource {
+ name: "topic".to_string(),
+ resource_namespace: "".to_string(),
+ }),
+ id: 1,
+ permission: 0,
+ broker: Some(Broker {
+ name: "".to_string(),
+ id: 0,
+ endpoints: Some(pb::Endpoints {
+ scheme: pb::AddressScheme::DomainName as i32,
+ addresses: vec![],
+ }),
+ }),
+ accept_message_types: vec![],
+ };
+ let result = build_endpoints_by_message_queue(&message_queue, "test");
+ assert!(result.is_ok());
+ assert_eq!(result.unwrap().scheme(), pb::AddressScheme::DomainName);
+
+ message_queue.broker = Some(Broker {
+ name: "".to_string(),
+ id: 0,
+ endpoints: None,
+ });
+ let result = build_endpoints_by_message_queue(&message_queue, "test");
+ assert!(result.is_err());
+ let error = result.unwrap_err();
+ assert_eq!(error.kind, ErrorKind::NoBrokerAvailable);
+ assert_eq!(error.operation, "test");
+ assert_eq!(
+ error.message,
+ "message queue do not have a available endpoint"
+ );
+ assert_eq!(error.context.len(), 3);
+
+ message_queue.broker.take();
+ let result = build_endpoints_by_message_queue(&message_queue, "test");
+ assert!(result.is_err());
+ let error = result.unwrap_err();
+ assert_eq!(error.kind, ErrorKind::NoBrokerAvailable);
+ assert_eq!(error.operation, "test");
+ assert_eq!(
+ error.message,
+ "message queue do not have a available endpoint"
+ );
+ assert_eq!(error.context.len(), 2);
+ }
+
+ #[test]
+ fn util_build_producer_settings() {
+ build_producer_settings(&ProducerOption::default(),
&ClientOption::default());
+ }
+
+ #[test]
+ fn util_build_simple_consumer_settings() {
+ build_simple_consumer_settings(&SimpleConsumerOption::default(),
&ClientOption::default());
+ }
+}