This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f99a835d test(rust): add unit tests (#484)
f99a835d is described below

commit f99a835dfac9b7e5ba3c1e67eece1cde3aee4b51
Author: SSpirits <[email protected]>
AuthorDate: Tue Apr 18 20:00:01 2023 +0800

    test(rust): add unit tests (#484)
    
    * test(rust): add unit tests
    
    * doc(rust): fix README.md of rust sdk
---
 rust/README.md              |   6 +++
 rust/src/client.rs          |  37 +++++++------
 rust/src/conf.rs            |  33 +++++++++++-
 rust/src/error.rs           |  19 +++++++
 rust/src/lib.rs             |   2 -
 rust/src/model/common.rs    |   1 +
 rust/src/model/message.rs   | 107 ++++++++++++++++++++++++++++++--------
 rust/src/producer.rs        |  84 ++++++++++++++++++++++++++----
 rust/src/session.rs         |  17 ++++--
 rust/src/simple_consumer.rs |  98 ++++++++++++++++++++++++++++++++++
 rust/src/util.rs            | 124 +++++++++++++++++++++++++++++++++++++++++++-
 11 files changed, 470 insertions(+), 58 deletions(-)

diff --git a/rust/README.md b/rust/README.md
index 5c484837..e8621d5f 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -16,6 +16,12 @@ Here are some preparations you may need to know (or refer to 
[here](https://rock
 2. protoc 3.15.0+
 3. setup name server, broker, and 
[proxy](https://github.com/apache/rocketmq/tree/develop/proxy).
 
+### Run Tests
+
+```sh
+cargo llvm-cov --ignore-filename-regex pb/ --open
+```
+
 ### Run Example
 
 Run the following command to start the example:
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 7c0fbdf3..fe5cf340 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -56,15 +56,15 @@ lazy_static::lazy_static! {
     static ref CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
 }
 
+const OPERATION_CLIENT_NEW: &'static str = "client.new";
+const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
+const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
+const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
+const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
+const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
+
 #[automock]
 impl Client {
-    const OPERATION_CLIENT_NEW: &'static str = "client.new";
-    const OPERATION_QUERY_ROUTE: &'static str = "client.query_route";
-    const OPERATION_HEARTBEAT: &'static str = "client.heartbeat";
-    const OPERATION_SEND_MESSAGE: &'static str = "client.send_message";
-    const OPERATION_RECEIVE_MESSAGE: &'static str = "client.receive_message";
-    const OPERATION_ACK_MESSAGE: &'static str = "client.ack_message";
-
     pub(crate) fn new(
         logger: &Logger,
         option: ClientOption,
@@ -72,7 +72,7 @@ impl Client {
     ) -> Result<Self, ClientError> {
         let id = Self::generate_client_id();
         let endpoints = Endpoints::from_url(option.access_url())
-            .map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
+            .map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
         let session_manager = SessionManager::new(logger, id.clone(), &option);
         Ok(Client {
             logger: logger.new(o!("component" => "client")),
@@ -120,7 +120,7 @@ impl Client {
                                 continue;
                             }
                             let result =
-                                
Self::handle_response_status(response.unwrap().status, 
Self::OPERATION_HEARTBEAT);
+                                
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
                             if result.is_err() {
                                 error!(
                                     logger,
@@ -137,6 +137,7 @@ impl Client {
         });
     }
 
+    #[allow(dead_code)]
     pub(crate) fn client_id(&self) -> &str {
         &self.id
     }
@@ -214,6 +215,7 @@ impl Client {
         })
     }
 
+    #[allow(dead_code)]
     pub(crate) async fn topic_route(
         &self,
         topic: &str,
@@ -242,7 +244,7 @@ impl Client {
         };
 
         let response = rpc_client.query_route(request).await?;
-        Self::handle_response_status(response.status, 
Self::OPERATION_QUERY_ROUTE)?;
+        Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
 
         let route = Route {
             index: AtomicUsize::new(0),
@@ -288,7 +290,7 @@ impl Client {
                 Err(_e) => Err(ClientError::new(
                     ErrorKind::ChannelReceive,
                     "wait for inflight query topic route request failed",
-                    Self::OPERATION_QUERY_ROUTE,
+                    OPERATION_QUERY_ROUTE,
                 )),
             };
         }
@@ -327,7 +329,7 @@ impl Client {
                     let _ = item.send(Err(ClientError::new(
                         ErrorKind::Server,
                         "query topic route failed",
-                        Self::OPERATION_QUERY_ROUTE,
+                        OPERATION_QUERY_ROUTE,
                     )));
                 }
             };
@@ -352,6 +354,7 @@ impl Client {
         Ok(response)
     }
 
+    #[allow(dead_code)]
     pub(crate) async fn send_message(
         &self,
         endpoints: &Endpoints,
@@ -372,7 +375,7 @@ impl Client {
         let message_count = messages.len();
         let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
-        Self::handle_response_status(response.status, 
Self::OPERATION_SEND_MESSAGE)?;
+        Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
 
         if response.entries.len() != message_count {
             error!(self.logger, "server do not return illegal send result, 
this may be a bug. except result count: {}, found: {}", response.entries.len(), 
message_count);
@@ -381,6 +384,7 @@ impl Client {
         Ok(response.entries)
     }
 
+    #[allow(dead_code)]
     pub(crate) async fn receive_message(
         &self,
         endpoints: &Endpoints,
@@ -427,7 +431,7 @@ impl Client {
         for response in responses {
             match response.content.unwrap() {
                 Content::Status(status) => {
-                    Self::handle_response_status(Some(status), 
Self::OPERATION_RECEIVE_MESSAGE)?;
+                    Self::handle_response_status(Some(status), 
OPERATION_RECEIVE_MESSAGE)?;
                 }
                 Content::Message(message) => {
                     messages.push(message);
@@ -438,6 +442,7 @@ impl Client {
         Ok(messages)
     }
 
+    #[allow(dead_code)]
     pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
         &self,
         ack_entry: T,
@@ -475,7 +480,7 @@ impl Client {
             entries,
         };
         let response = rpc_client.ack_message(request).await?;
-        Self::handle_response_status(response.status, 
Self::OPERATION_ACK_MESSAGE)?;
+        Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
         Ok(response.entries)
     }
 }
@@ -623,7 +628,7 @@ mod tests {
         assert!(result.is_ok(), "should not return error when status is Ok");
     }
 
-    fn new_topic_route_response() -> Result<QueryRouteResponse, ClientError> {
+    pub(crate) fn new_topic_route_response() -> Result<QueryRouteResponse, 
ClientError> {
         Ok(QueryRouteResponse {
             status: Some(Status {
                 code: Code::Ok as i32,
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 8423d901..bb6faf6d 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -35,7 +35,7 @@ impl Default for ClientOption {
             group: "".to_string(),
             namespace: "".to_string(),
             access_url: "localhost:8081".to_string(),
-            enable_tls: false,
+            enable_tls: true,
             timeout: Duration::from_secs(3),
             long_polling_timeout: Duration::from_secs(40),
         }
@@ -72,7 +72,7 @@ impl ClientOption {
     }
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq, PartialEq)]
 pub enum LoggingFormat {
     Terminal,
     Json,
@@ -195,3 +195,32 @@ impl SimpleConsumerOption {
         self.namespace = name_space.into();
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn conf_client_option() {
+        let option = ClientOption::default();
+        assert_eq!(option.access_url(), "localhost:8081");
+        assert!(option.enable_tls());
+        assert_eq!(option.timeout(), &Duration::from_secs(3));
+        assert_eq!(option.long_polling_timeout(), &Duration::from_secs(40));
+    }
+
+    #[test]
+    fn conf_producer_option() {
+        let option = ProducerOption::default();
+        assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
+        assert!(option.prefetch_route());
+        assert!(option.validate_message_type());
+    }
+
+    #[test]
+    fn conf_simple_consumer_option() {
+        let option = SimpleConsumerOption::default();
+        assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
+        assert!(option.prefetch_route());
+    }
+}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index e7299a5f..9eb3d519 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -158,3 +158,22 @@ impl Debug for ClientError {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn error_client_error() {
+        let err = ClientError::new(ErrorKind::Config, "fake_message", 
"error_client_error")
+            .with_operation("another_operation")
+            .with_context("context_key", "context_value")
+            .set_source(anyhow::anyhow!("fake_source_error"));
+        assert_eq!(
+            err.to_string(),
+            "Failed to parse config at another_operation, context: { called: 
error_client_error, context_key: context_value } => fake_message, source: 
fake_source_error"
+        );
+        assert_eq!(format!("{:?}", err), "Failed to parse config at 
another_operation => fake_message\n\nContext:\n    called: error_client_error\n 
   context_key: context_value\n\nSource: fake_source_error\n");
+        assert_eq!(format!("{:#?}", err), "Error {\n    kind: Config,\n    
message: \"fake_message\",\n    operation: \"another_operation\",\n    context: 
[\n        (\n            \"called\",\n            \"error_client_error\",\n    
    ),\n        (\n            \"context_key\",\n            
\"context_value\",\n        ),\n    ],\n    source: Some(\n        
\"fake_source_error\",\n    ),\n}");
+    }
+}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index c6d1cc26..7152611a 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -16,7 +16,6 @@
  */
 #[allow(dead_code)]
 pub mod conf;
-#[allow(dead_code)]
 mod error;
 #[allow(dead_code)]
 mod log;
@@ -28,7 +27,6 @@ mod client;
 mod pb;
 mod session;
 
-#[allow(dead_code)]
 pub mod model;
 mod util;
 
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 9f1e947c..a488b152 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -24,6 +24,7 @@ use crate::error::{ClientError, ErrorKind};
 use crate::pb;
 use crate::pb::{Address, AddressScheme, MessageQueue};
 
+#[allow(dead_code)]
 #[derive(Debug, Clone)]
 pub(crate) enum ClientType {
     Producer = 1,
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 259a4847..5ca53df1 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -94,29 +94,33 @@ impl MessageImpl {
     }
 
     pub fn fifo_message_builder(
-        topic: String,
+        topic: impl Into<String>,
         body: Vec<u8>,
-        message_group: String,
+        message_group: impl Into<String>,
     ) -> MessageBuilder {
         MessageBuilder {
             message: MessageImpl {
                 message_id: UNIQ_ID_GENERATOR.lock().next_id(),
-                topic,
+                topic: topic.into(),
                 body: Some(body),
                 tags: None,
                 keys: None,
                 properties: None,
-                message_group: Some(message_group),
+                message_group: Some(message_group.into()),
                 delivery_timestamp: None,
             },
         }
     }
 
-    pub fn delay_message_builder(topic: String, body: Vec<u8>, delay_time: 
i64) -> MessageBuilder {
+    pub fn delay_message_builder(
+        topic: impl Into<String>,
+        body: Vec<u8>,
+        delay_time: i64,
+    ) -> MessageBuilder {
         MessageBuilder {
             message: MessageImpl {
                 message_id: UNIQ_ID_GENERATOR.lock().next_id(),
-                topic,
+                topic: topic.into(),
                 body: Some(body),
                 tags: None,
                 keys: None,
@@ -313,14 +317,14 @@ mod tests {
     use super::*;
 
     #[test]
-    fn test_message() {
+    fn common_test_message() {
         let mut properties = HashMap::new();
-        properties.insert("key".to_string(), "value".to_string());
+        properties.insert("key", "value".to_string());
         let message = MessageImpl::builder()
-            .set_topic("test".to_string())
+            .set_topic("test")
             .set_body(vec![1, 2, 3])
-            .set_tags("tag".to_string())
-            .set_keys(vec!["key".to_string()])
+            .set_tags("tag")
+            .set_keys(vec!["key"])
             .set_properties(properties)
             .build();
         assert!(message.is_ok());
@@ -337,9 +341,9 @@ mod tests {
         });
 
         let message = MessageImpl::builder()
-            .set_topic("test".to_string())
+            .set_topic("test")
             .set_body(vec![1, 2, 3])
-            .set_message_group("message_group".to_string())
+            .set_message_group("message_group")
             .set_delivery_timestamp(123456789)
             .build();
         assert!(message.is_err());
@@ -350,23 +354,80 @@ mod tests {
             "message_group and delivery_timestamp can not be set at the same 
time."
         );
 
-        let message = MessageImpl::builder()
-            .set_topic("test".to_string())
-            .set_body(vec![1, 2, 3])
-            .set_message_group("message_group".to_string())
-            .build();
+        let message =
+            MessageImpl::fifo_message_builder("test", vec![1, 2, 3], 
"message_group").build();
         let mut message = message.unwrap();
         assert_eq!(
             message.take_message_group(),
             Some("message_group".to_string())
         );
 
-        let message = MessageImpl::builder()
-            .set_topic("test".to_string())
-            .set_body(vec![1, 2, 3])
-            .set_delivery_timestamp(123456789)
-            .build();
+        let message = MessageImpl::delay_message_builder("test", vec![1, 2, 
3], 123456789).build();
         let mut message = message.unwrap();
         assert_eq!(message.take_delivery_timestamp(), Some(123456789));
     }
+
+    #[test]
+    fn common_message() {
+        let message_view = MessageView::from_pb_message(
+            pb::Message {
+                topic: Some(pb::Resource {
+                    name: "test".to_string(),
+                    ..Default::default()
+                }),
+                body: vec![1, 2, 3],
+                user_properties: {
+                    let mut properties = HashMap::new();
+                    properties.insert("key".to_string(), "value".to_string());
+                    properties
+                },
+                system_properties: Some(pb::SystemProperties {
+                    message_id: "message_id".to_string(),
+                    receipt_handle: Some("receipt_handle".to_string()),
+                    tag: Some("tag".to_string()),
+                    keys: vec!["key".to_string()],
+                    message_group: Some("message_group".to_string()),
+                    delivery_timestamp: Some(prost_types::Timestamp {
+                        seconds: 123456789,
+                        ..Default::default()
+                    }),
+                    born_host: "born_host".to_string(),
+                    born_timestamp: Some(prost_types::Timestamp {
+                        seconds: 987654321,
+                        ..Default::default()
+                    }),
+                    delivery_attempt: Some(1),
+                    ..Default::default()
+                }),
+            },
+            Endpoints::from_url("localhost:8081").unwrap(),
+        );
+
+        assert_eq!(message_view.message_id(), "message_id");
+        assert_eq!(message_view.topic(), "test");
+        assert_eq!(message_view.body(), &[1, 2, 3]);
+        assert_eq!(message_view.tag(), Some("tag"));
+        assert_eq!(message_view.keys(), &["key"]);
+        assert_eq!(message_view.properties(), &{
+            let mut properties = HashMap::new();
+            properties.insert("key".to_string(), "value".to_string());
+            properties
+        });
+        assert_eq!(message_view.message_group(), Some("message_group"));
+        assert_eq!(message_view.delivery_timestamp(), Some(123456789));
+        assert_eq!(message_view.born_host(), "born_host");
+        assert_eq!(message_view.born_timestamp(), 987654321);
+        assert_eq!(message_view.delivery_attempt(), 1);
+
+        assert_eq!(AckMessageEntry::message_id(&message_view), "message_id");
+        assert_eq!(AckMessageEntry::topic(&message_view), "test");
+        assert_eq!(
+            AckMessageEntry::receipt_handle(&message_view),
+            "receipt_handle"
+        );
+        assert_eq!(
+            AckMessageEntry::endpoints(&message_view).endpoint_url(),
+            "localhost:8081"
+        );
+    }
 }
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 983d40d0..fc565673 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -209,7 +209,10 @@ impl Producer {
 mod tests {
     use crate::error::ErrorKind;
     use crate::log::terminal_logger;
+    use crate::model::common::Route;
     use crate::model::message::MessageImpl;
+    use crate::pb::{Broker, Code, MessageQueue, Status};
+    use std::sync::Arc;
 
     use super::*;
 
@@ -221,15 +224,30 @@ mod tests {
         }
     }
 
-    // #[tokio::test]
-    // async fn producer_start() {
-    //     let mut producer_option = ProducerOption::default();
-    //     producer_option.set_topics(vec!["DefaultCluster".to_string()]);
-    //     let producer = Producer::new(producer_option, 
ClientOption::default())
-    //         .await
-    //         .unwrap();
-    //     producer.start().await.unwrap();
-    // }
+    #[tokio::test]
+    async fn producer_start() -> Result<(), ClientError> {
+        let ctx = Client::new_context();
+        ctx.expect().return_once(|_, _, _| {
+            let mut client = Client::default();
+            client.expect_topic_route().returning(|_, _| {
+                Ok(Arc::new(Route {
+                    index: Default::default(),
+                    queue: vec![],
+                }))
+            });
+            client.expect_start().returning(|| ());
+            client
+                .expect_client_id()
+                .return_const("fake_id".to_string());
+            Ok(client)
+        });
+        let mut producer_option = ProducerOption::default();
+        producer_option.set_topics(vec!["DefaultCluster".to_string()]);
+        Producer::new(producer_option, ClientOption::default())?
+            .start()
+            .await?;
+        Ok(())
+    }
 
     #[tokio::test]
     async fn producer_transform_messages_to_protobuf() {
@@ -327,4 +345,52 @@ mod tests {
         assert_eq!(err.kind, ErrorKind::InvalidMessage);
         assert_eq!(err.message, "not all messages have the same message 
group");
     }
+
+    #[tokio::test]
+    async fn producer_send_one() -> Result<(), ClientError> {
+        let mut producer = new_producer_for_test();
+        producer.client.expect_topic_route().returning(|_, _| {
+            Ok(Arc::new(Route {
+                index: Default::default(),
+                queue: vec![MessageQueue {
+                    topic: Some(Resource {
+                        name: "test_topic".to_string(),
+                        resource_namespace: "".to_string(),
+                    }),
+                    id: 0,
+                    permission: 0,
+                    broker: Some(Broker {
+                        name: "".to_string(),
+                        id: 0,
+                        endpoints: Some(pb::Endpoints {
+                            scheme: 0,
+                            addresses: vec![],
+                        }),
+                    }),
+                    accept_message_types: vec![],
+                }],
+            }))
+        });
+        producer.client.expect_send_message().returning(|_, _| {
+            Ok(vec![SendResultEntry {
+                status: Some(Status {
+                    code: Code::Ok as i32,
+                    message: "".to_string(),
+                }),
+                message_id: "".to_string(),
+                transaction_id: "".to_string(),
+                offset: 0,
+            }])
+        });
+        producer
+            .send_one(
+                MessageImpl::builder()
+                    .set_topic("test_topic")
+                    .set_body(vec![])
+                    .build()
+                    .unwrap(),
+            )
+            .await?;
+        Ok(())
+    }
 }
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 268e5fb4..6ef9b97d 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -27,7 +27,6 @@ use tonic::transport::{Channel, Endpoint};
 
 use crate::conf::ClientOption;
 use crate::error::ErrorKind;
-use crate::log::terminal_logger;
 use crate::model::common::Endpoints;
 use crate::pb::{
     AckMessageRequest, AckMessageResponse, HeartbeatRequest, 
HeartbeatResponse, QueryRouteRequest,
@@ -90,6 +89,7 @@ impl Session {
 
     #[cfg(test)]
     pub(crate) fn mock() -> Self {
+        use crate::log::terminal_logger;
         Session {
             logger: terminal_logger(),
             client_id: "fake_id".to_string(),
@@ -440,6 +440,7 @@ impl SessionManager {
         };
     }
 
+    #[allow(dead_code)]
     pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, 
ClientError> {
         let session_map = self.session_map.lock().await;
         let mut sessions = Vec::new();
@@ -467,11 +468,13 @@ mod tests {
     async fn session_new() {
         let server = RocketMQMockServer::start_default().await;
         let logger = terminal_logger();
+        let mut client_option = ClientOption::default();
+        client_option.set_enable_tls(false);
         let session = Session::new(
             &logger,
             &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
             "test_client".to_string(),
-            &ClientOption::default(),
+            &client_option,
         )
         .await;
         debug!(logger, "session: {:?}", session);
@@ -504,11 +507,13 @@ mod tests {
         );
 
         let logger = terminal_logger();
+        let mut client_option = ClientOption::default();
+        client_option.set_enable_tls(false);
         let session = Session::new(
             &logger,
             &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
             "test_client".to_string(),
-            &ClientOption::default(),
+            &client_option,
         )
         .await;
         debug!(logger, "session: {:?}", session);
@@ -538,12 +543,14 @@ mod tests {
         );
 
         let logger = terminal_logger();
+        let mut client_option = ClientOption::default();
+        client_option.set_enable_tls(false);
         let session_manager =
-            SessionManager::new(&logger, "test_client".to_string(), 
&ClientOption::default());
+            SessionManager::new(&logger, "test_client".to_string(), 
&client_option);
         let session = session_manager
             .get_or_create_session(
                 &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
-                build_producer_settings(&ProducerOption::default(), 
&ClientOption::default()),
+                build_producer_settings(&ProducerOption::default(), 
&client_option),
             )
             .await
             .unwrap();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index bccada6b..d06c6410 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -16,8 +16,10 @@
  */
 use std::time::Duration;
 
+use mockall_double::double;
 use slog::{info, Logger};
 
+#[double]
 use crate::client::Client;
 use crate::conf::{ClientOption, SimpleConsumerOption};
 use crate::error::{ClientError, ErrorKind};
@@ -125,3 +127,99 @@ impl SimpleConsumer {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::log::terminal_logger;
+    use std::sync::Arc;
+
+    use crate::model::common::{FilterType, Route};
+    use crate::pb::{
+        AckMessageResultEntry, Broker, Message, MessageQueue, Resource, 
SystemProperties,
+    };
+
+    use super::*;
+
+    #[tokio::test]
+    async fn simple_consumer_start() -> Result<(), ClientError> {
+        let ctx = Client::new_context();
+        ctx.expect().return_once(|_, _, _| {
+            let mut client = Client::default();
+            client.expect_topic_route().returning(|_, _| {
+                Ok(Arc::new(Route {
+                    index: Default::default(),
+                    queue: vec![],
+                }))
+            });
+            client.expect_start().returning(|| ());
+            client
+                .expect_client_id()
+                .return_const("fake_id".to_string());
+            Ok(client)
+        });
+        let mut option = SimpleConsumerOption::default();
+        option.set_consumer_group("test_group");
+        option.set_topics(vec!["test_topic"]);
+        SimpleConsumer::new(option, ClientOption::default())?
+            .start()
+            .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn simple_consumer_consume_message() -> Result<(), ClientError> {
+        let mut client = Client::default();
+        client.expect_topic_route().returning(|_, _| {
+            Ok(Arc::new(Route {
+                index: Default::default(),
+                queue: vec![MessageQueue {
+                    topic: Some(Resource {
+                        name: "test_topic".to_string(),
+                        ..Default::default()
+                    }),
+                    id: 0,
+                    permission: 0,
+                    broker: Some(Broker {
+                        name: "".to_string(),
+                        id: 0,
+                        endpoints: Some(pb::Endpoints {
+                            scheme: 0,
+                            addresses: vec![],
+                        }),
+                    }),
+                    accept_message_types: vec![],
+                }],
+            }))
+        });
+        client.expect_receive_message().returning(|_, _, _, _, _| {
+            Ok(vec![Message {
+                topic: Some(Resource {
+                    name: "test_topic".to_string(),
+                    ..Default::default()
+                }),
+                system_properties: Some(SystemProperties::default()),
+                ..Default::default()
+            }])
+        });
+        client
+            .expect_ack_message()
+            .returning(|_: MessageView| Ok(AckMessageResultEntry::default()));
+        let simple_consumer = SimpleConsumer {
+            option: SimpleConsumerOption::default(),
+            logger: terminal_logger(),
+            client,
+        };
+
+        let messages = simple_consumer
+            .receive(
+                "test_topic",
+                &FilterExpression::new(FilterType::Tag, 
"test_tag".to_string()),
+            )
+            .await?;
+        assert_eq!(messages.len(), 1);
+        simple_consumer
+            .ack(messages.into_iter().next().unwrap())
+            .await?;
+        Ok(())
+    }
+}
diff --git a/rust/src/util.rs b/rust/src/util.rs
index 94e49840..583e7327 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -66,7 +66,8 @@ pub(crate) fn build_endpoints_by_message_queue(
             "message queue do not have a available endpoint",
             operation,
         )
-        .with_context("message_queue", format!("{:?}", message_queue)));
+        .with_context("topic", topic)
+        .with_context("queue_id", message_queue.id.to_string()));
     }
 
     let broker = message_queue.broker.clone().unwrap();
@@ -159,3 +160,124 @@ pub(crate) fn build_simple_consumer_settings(
         ..TelemetryCommand::default()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::model::common::Route;
+    use crate::pb;
+    use crate::pb::{Broker, MessageQueue};
+    use std::sync::atomic::AtomicUsize;
+    use std::sync::Arc;
+
+    use super::*;
+
+    fn build_route() -> Arc<Route> {
+        let message_queue_1 = MessageQueue {
+            topic: None,
+            id: 1,
+            permission: 0,
+            broker: None,
+            accept_message_types: vec![],
+        };
+
+        let message_queue_2 = MessageQueue {
+            topic: None,
+            id: 2,
+            permission: 0,
+            broker: None,
+            accept_message_types: vec![],
+        };
+
+        Arc::new(Route {
+            index: AtomicUsize::new(0),
+            queue: vec![message_queue_1, message_queue_2],
+        })
+    }
+
+    #[test]
+    fn util_select_message_queue() {
+        let route = build_route();
+        let message_queue = select_message_queue(route.clone());
+        assert_eq!(message_queue.id, 1);
+        let message_queue = select_message_queue(route.clone());
+        assert_eq!(message_queue.id, 2);
+        let message_queue = select_message_queue(route);
+        assert_eq!(message_queue.id, 1);
+    }
+
+    #[test]
+    fn util_select_message_queue_by_message_group() {
+        let route = build_route();
+        let message_queue =
+            select_message_queue_by_message_group(route.clone(), 
"group1".to_string());
+        assert_eq!(message_queue.id, 1);
+        let message_queue =
+            select_message_queue_by_message_group(route.clone(), 
"group1".to_string());
+        assert_eq!(message_queue.id, 1);
+        let message_queue =
+            select_message_queue_by_message_group(route, 
"another_group".to_string());
+        assert_eq!(message_queue.id, 2);
+    }
+
+    #[test]
+    fn util_build_endpoints_by_message_queue() {
+        let mut message_queue = MessageQueue {
+            topic: Some(Resource {
+                name: "topic".to_string(),
+                resource_namespace: "".to_string(),
+            }),
+            id: 1,
+            permission: 0,
+            broker: Some(Broker {
+                name: "".to_string(),
+                id: 0,
+                endpoints: Some(pb::Endpoints {
+                    scheme: pb::AddressScheme::DomainName as i32,
+                    addresses: vec![],
+                }),
+            }),
+            accept_message_types: vec![],
+        };
+        let result = build_endpoints_by_message_queue(&message_queue, "test");
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap().scheme(), pb::AddressScheme::DomainName);
+
+        message_queue.broker = Some(Broker {
+            name: "".to_string(),
+            id: 0,
+            endpoints: None,
+        });
+        let result = build_endpoints_by_message_queue(&message_queue, "test");
+        assert!(result.is_err());
+        let error = result.unwrap_err();
+        assert_eq!(error.kind, ErrorKind::NoBrokerAvailable);
+        assert_eq!(error.operation, "test");
+        assert_eq!(
+            error.message,
+            "message queue do not have a available endpoint"
+        );
+        assert_eq!(error.context.len(), 3);
+
+        message_queue.broker.take();
+        let result = build_endpoints_by_message_queue(&message_queue, "test");
+        assert!(result.is_err());
+        let error = result.unwrap_err();
+        assert_eq!(error.kind, ErrorKind::NoBrokerAvailable);
+        assert_eq!(error.operation, "test");
+        assert_eq!(
+            error.message,
+            "message queue do not have a available endpoint"
+        );
+        assert_eq!(error.context.len(), 2);
+    }
+
+    #[test]
+    fn util_build_producer_settings() {
+        build_producer_settings(&ProducerOption::default(), 
&ClientOption::default());
+    }
+
+    #[test]
+    fn util_build_simple_consumer_settings() {
+        build_simple_consumer_settings(&SimpleConsumerOption::default(), 
&ClientOption::default());
+    }
+}

Reply via email to