This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 29d332be [ISSUE #563] Optimize example and doc for Rust SDK (#564)
29d332be is described below

commit 29d332bedb090260ebb3965a3309d2d8ce0404b9
Author: SSpirits <[email protected]>
AuthorDate: Mon Jul 10 14:55:19 2023 +0800

    [ISSUE #563] Optimize example and doc for Rust SDK (#564)
    
    * feat(rust): optimize for example and doc
    
    Signed-off-by: SSpirits <[email protected]>
    
    * feat(rust): optimize readme
    
    Signed-off-by: SSpirits <[email protected]>
    
    * fix(rust): fix unit test
    
    Signed-off-by: SSpirits <[email protected]>
    
    ---------
    
    Signed-off-by: SSpirits <[email protected]>
---
 rust/README.md                        |  2 +-
 rust/examples/transaction_producer.rs | 39 +++++++++++++++++++++++++++++------
 rust/src/client.rs                    | 17 +++++++++++----
 rust/src/model/transaction.rs         |  8 ++++++-
 rust/src/producer.rs                  | 20 ++++++++++++++----
 rust/src/simple_consumer.rs           |  4 ++--
 6 files changed, 72 insertions(+), 18 deletions(-)

diff --git a/rust/README.md b/rust/README.md
index 8e5c1b7f..dc80d132 100644
--- a/rust/README.md
+++ b/rust/README.md
@@ -42,5 +42,5 @@ cargo run --example simple_consumer
 [codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
 [crates-image]: https://img.shields.io/crates/v/rocketmq.svg
 [crates-url]: https://crates.io/crates/rocketmq
-[rust-doc-image]: https://img.shields.io/crates/v/rocketmq.svg
+[rust-doc-image]: https://img.shields.io/docsrs/rocketmq
 [rust-doc-url]: https://docs.rs/rocketmq
diff --git a/rust/examples/transaction_producer.rs 
b/rust/examples/transaction_producer.rs
index 07516eb0..7423cc9b 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -14,11 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use std::collections::HashSet;
+use std::sync::Mutex;
+
 use rocketmq::conf::{ClientOption, ProducerOption};
 use rocketmq::model::message::MessageBuilder;
 use rocketmq::model::transaction::{Transaction, TransactionResolution};
 use rocketmq::Producer;
 
+lazy_static::lazy_static! {
+    static  ref MESSAGE_ID_SET: Mutex<HashSet<String>> = 
Mutex::new(HashSet::new());
+}
+
 #[tokio::main]
 async fn main() {
     // recommend to specify which topic(s) you would like to send message to
@@ -30,16 +37,28 @@ async fn main() {
     let mut client_option = ClientOption::default();
     client_option.set_access_url("localhost:8081");
 
-    // build and start producer
+    // build and start transaction producer, which has TransactionChecker
     let mut producer = Producer::new_transaction_producer(
         producer_option,
         client_option,
         Box::new(|transaction_id, message| {
-            println!(
-                "receive transaction check request: transaction_id: {}, 
message: {:?}",
-                transaction_id, message
-            );
-            TransactionResolution::COMMIT
+            if MESSAGE_ID_SET
+                .lock()
+                .unwrap()
+                .contains(message.message_id())
+            {
+                println!(
+                    "commit transaction: transaction_id: {}, message_id: {}",
+                    transaction_id, message.message_id()
+                );
+                TransactionResolution::COMMIT
+            } else {
+                println!(
+                    "rollback transaction due to unknown message: 
transaction_id: {}, message_id: {}",
+                    transaction_id, message.message_id()
+                );
+                TransactionResolution::ROLLBACK
+            }
         }),
     )
     .unwrap();
@@ -65,6 +84,14 @@ async fn main() {
         transaction.message_id(),
         transaction.transaction_id()
     );
+
+    MESSAGE_ID_SET
+        .lock()
+        .unwrap()
+        .insert(transaction.message_id().to_string());
+
+    // commit transaction manually
+    // delete following two lines so that RocketMQ server will check 
transaction status periodically
     let result = transaction.commit().await;
     debug_assert!(result.is_ok(), "commit transaction failed: {:?}", result);
 }
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 27cfb4df..57bddb84 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -31,7 +31,7 @@ use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, 
SendReceipt};
 use crate::model::message::{AckMessageEntry, MessageView};
-use crate::model::transaction::TransactionChecker;
+use crate::model::transaction::{TransactionChecker, TransactionResolution};
 use crate::pb;
 use crate::pb::receive_message_response::Content;
 use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
@@ -109,6 +109,10 @@ impl Client {
         self.telemetry_command_tx.is_some()
     }
 
+    pub(crate) fn has_transaction_checker(&self) -> bool {
+        self.transaction_checker.is_some()
+    }
+
     pub(crate) fn set_transaction_checker(&mut self, transaction_checker: 
Box<TransactionChecker>) {
         if self.is_started() {
             panic!("client {} is started, can not be modified", self.id)
@@ -116,7 +120,7 @@ impl Client {
         self.transaction_checker = Some(transaction_checker);
     }
 
-    pub(crate) async fn start(&mut self) {
+    pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
         let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
@@ -127,9 +131,13 @@ impl Client {
         // send heartbeat and handle telemetry command
         let (tx, mut rx) = mpsc::channel(16);
         self.telemetry_command_tx = Some(tx);
-        let rpc_client = self.get_session().await.unwrap();
+        let rpc_client = self.get_session().await?;
         let endpoints = self.access_endpoints.clone();
         let transaction_checker = self.transaction_checker.take();
+        // give a placeholder
+        if transaction_checker.is_some() {
+            self.transaction_checker = Some(Box::new(|_, _| 
TransactionResolution::UNKNOWN));
+        }
         tokio::spawn(async move {
             rpc_client.is_started();
             let mut interval = 
tokio::time::interval(std::time::Duration::from_secs(30));
@@ -181,6 +189,7 @@ impl Client {
                 }
             }
         });
+        Ok(())
     }
 
     async fn handle_telemetry_command<T: RPCClient + 'static>(
@@ -690,7 +699,7 @@ pub(crate) mod tests {
             .returning(|_, _, _| Ok(Session::mock()));
 
         let mut client = new_client_with_session_manager(session_manager);
-        client.start().await;
+        client.start().await?;
 
         // TODO use countdown latch instead sleeping
         // wait for run
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 62a4c9a7..2f74679f 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -125,12 +125,18 @@ pub enum TransactionResolution {
     COMMIT = 1,
     /// Notify server that current transaction should be roll-backed.
     ROLLBACK = 2,
-    /// Notify the server that the state of this transaction is not sure. You 
should be cautious before return unknown
+    /// Notify server that the state of this transaction is not sure. You 
should be cautious before return unknown
     /// because the examination from the server will be performed periodically.
     UNKNOWN = 0,
 }
 
 /// A closure to check the state of transaction.
+/// RocketMQ Server will call producer periodically to check the state of 
uncommitted transaction.
+///
+/// # Arguments
+///
+/// * transaction id
+/// * message
 pub type TransactionChecker = dyn Fn(String, MessageView) -> 
TransactionResolution + Send + Sync;
 
 #[cfg(test)]
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index fd82cdc4..d5e768c5 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -50,6 +50,7 @@ pub struct Producer {
 
 impl Producer {
     const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message";
+    const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = 
"producer.send_transaction_message";
 
     /// Create a new producer instance
     ///
@@ -79,7 +80,7 @@ impl Producer {
     ///
     /// * `option` - producer option
     /// * `client_option` - client option
-    /// * `transaction_checker` - A closure to check the state of transaction.
+    /// * `transaction_checker` - handle server query for uncommitted 
transaction status
     pub fn new_transaction_producer(
         option: ProducerOption,
         client_option: ClientOption,
@@ -103,7 +104,7 @@ impl Producer {
 
     /// Start the producer
     pub async fn start(&mut self) -> Result<(), ClientError> {
-        self.client.start().await;
+        self.client.start().await?;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
@@ -265,6 +266,13 @@ impl Producer {
         &self,
         mut message: impl message::Message,
     ) -> Result<impl Transaction, ClientError> {
+        if !self.client.has_transaction_checker() {
+            return Err(ClientError::new(
+                ErrorKind::InvalidMessage,
+                "this producer can not send transaction message, please create 
a transaction producer using producer::new_transaction_producer",
+                Self::OPERATION_SEND_TRANSACTION_MESSAGE,
+            ));
+        }
         let topic = message.take_topic();
         let receipt = self.send(message).await?;
         Ok(TransactionImpl::new(
@@ -313,7 +321,7 @@ mod tests {
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());
@@ -340,7 +348,7 @@ mod tests {
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client.expect_set_transaction_checker().returning(|_| ());
             client
                 .expect_client_id()
@@ -543,6 +551,10 @@ mod tests {
             .client
             .expect_get_session()
             .return_once(|| Ok(Session::mock()));
+        producer
+            .client
+            .expect_has_transaction_checker()
+            .return_once(|| true);
 
         let _ = producer
             .send_transaction_message(
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index 48e0ce9f..d4e222ea 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -90,7 +90,7 @@ impl SimpleConsumer {
                 Self::OPERATION_START_SIMPLE_CONSUMER,
             ));
         }
-        self.client.start().await;
+        self.client.start().await?;
         if let Some(topics) = self.option.topics() {
             for topic in topics {
                 self.client.topic_route(topic, true).await?;
@@ -198,7 +198,7 @@ mod tests {
                     queue: vec![],
                 }))
             });
-            client.expect_start().returning(|| ());
+            client.expect_start().returning(|| Ok(()));
             client
                 .expect_client_id()
                 .return_const("fake_id".to_string());

Reply via email to