This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 0bc97568 api(rust): change api signature (#487)
0bc97568 is described below
commit 0bc97568082051f7c0f96f59f7b12a415ce642f7
Author: SSpirits <[email protected]>
AuthorDate: Fri Apr 21 12:10:23 2023 +0800
api(rust): change api signature (#487)
* api(rust): change api signature
* chore(rust): rename SendResult to SendReceipt
---
rust/Cargo.toml | 2 +-
rust/examples/producer.rs | 2 +-
rust/examples/simple_consumer.rs | 51 +++++++++++++++++++++++-----------------
rust/src/client.rs | 21 +++++++++++------
rust/src/error.rs | 38 ++++++++++++++++++++++++++----
rust/src/lib.rs | 6 ++---
rust/src/model/common.rs | 26 ++++++++++++++++++++
rust/src/producer.rs | 20 +++++++---------
rust/src/session.rs | 27 ++++++++++++---------
rust/src/simple_consumer.rs | 22 +++++++++++++----
10 files changed, 150 insertions(+), 65 deletions(-)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 689ab0e5..fc84e0d8 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -26,7 +26,7 @@ authors = [
license = "MIT/Apache-2.0"
readme = "./README.md"
-repository = "https://github.com/apache/rocketmq-clients"
+repository = "https://github.com/apache/rocketmq-clients/tree/master/rust"
documentation = "https://docs.rs/rocketmq"
description = "Rust client for Apache RocketMQ"
keywords = ["rocketmq", "api", "client", "sdk", "grpc"]
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index a7ad0ab7..acacabb8 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -46,6 +46,6 @@ async fn main() {
debug_assert!(result.is_ok(), "send message failed: {:?}", result);
println!(
"send message success, message_id={}",
- result.unwrap().message_id
+ result.unwrap().message_id()
);
}
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index 4011556d..33aae9d4 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -29,33 +29,42 @@ async fn main() {
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
+ client_option.set_enable_tls(false);
// build and start simple consumer
let consumer = SimpleConsumer::new(consumer_option,
client_option).unwrap();
consumer.start().await.unwrap();
- // pop message from rocketmq proxy
- let receive_result = consumer
- .receive(
- "test_topic".to_string(),
- &FilterExpression::new(FilterType::Tag, "test_tag"),
- )
- .await;
- debug_assert!(
- receive_result.is_ok(),
- "receive message failed: {:?}",
- receive_result.unwrap_err()
- );
-
- let messages = receive_result.unwrap();
- for message in messages {
- println!("receive message: {:?}", message);
- // ack message to rocketmq proxy
- let ack_result = consumer.ack(message).await;
+ loop {
+ // pop message from rocketmq proxy
+ let receive_result = consumer
+ .receive(
+ "test_topic".to_string(),
+ &FilterExpression::new(FilterType::Tag, "test_tag"),
+ )
+ .await;
debug_assert!(
- ack_result.is_ok(),
- "ack message failed: {:?}",
- ack_result.unwrap_err()
+ receive_result.is_ok(),
+ "receive message failed: {:?}",
+ receive_result.unwrap_err()
);
+
+ let messages = receive_result.unwrap();
+
+ if messages.is_empty() {
+ println!("no message received");
+ return;
+ }
+
+ for message in messages {
+ println!("receive message: {:?}", message);
+ // ack message to rocketmq proxy
+ let ack_result = consumer.ack(&message).await;
+ debug_assert!(
+ ack_result.is_ok(),
+ "ack message failed: {:?}",
+ ack_result.unwrap_err()
+ );
+ }
}
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 11e4419a..99d410f2 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -28,14 +28,14 @@ use tokio::sync::oneshot;
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{ClientType, Endpoints, Route, RouteStatus};
+use crate::model::common::{ClientType, Endpoints, Route, RouteStatus,
SendReceipt};
use crate::model::message::AckMessageEntry;
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageRequest, AckMessageResultEntry, Code, FilterExpression,
HeartbeatRequest,
HeartbeatResponse, Message, MessageQueue, QueryRouteRequest,
ReceiveMessageRequest, Resource,
- SendMessageRequest, SendResultEntry, Status, TelemetryCommand,
+ SendMessageRequest, Status, TelemetryCommand,
};
#[double]
use crate::session::SessionManager;
@@ -207,7 +207,7 @@ impl Client {
pub(crate) fn topic_route_from_cache(&self, topic: &str) ->
Option<Arc<Route>> {
self.route_table.lock().get(topic).and_then(|route_status| {
if let RouteStatus::Found(route) = route_status {
- debug!(self.logger, "get route for topic={} from cache",
topic);
+ // debug!(self.logger, "get route for topic={} from cache",
topic);
Some(Arc::clone(route))
} else {
None
@@ -359,7 +359,7 @@ impl Client {
&self,
endpoints: &Endpoints,
messages: Vec<Message>,
- ) -> Result<Vec<SendResultEntry>, ClientError> {
+ ) -> Result<Vec<SendReceipt>, ClientError> {
self.send_message_inner(
self.get_session_with_endpoints(endpoints).await.unwrap(),
messages,
@@ -371,7 +371,7 @@ impl Client {
&self,
mut rpc_client: T,
messages: Vec<Message>,
- ) -> Result<Vec<SendResultEntry>, ClientError> {
+ ) -> Result<Vec<SendReceipt>, ClientError> {
let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
@@ -381,7 +381,11 @@ impl Client {
error!(self.logger, "server do not return illegal send result,
this may be a bug. except result count: {}, found: {}", response.entries.len(),
message_count);
}
- Ok(response.entries)
+ Ok(response
+ .entries
+ .iter()
+ .map(SendReceipt::from_pb_send_result)
+ .collect())
}
#[allow(dead_code)]
@@ -431,6 +435,9 @@ impl Client {
for response in responses {
match response.content.unwrap() {
Content::Status(status) => {
+ if status.code() == Code::MessageNotFound {
+ return Ok(vec![]);
+ }
Self::handle_response_status(Some(status),
OPERATION_RECEIVE_MESSAGE)?;
}
Content::Message(message) => {
@@ -445,7 +452,7 @@ impl Client {
#[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
- ack_entry: T,
+ ack_entry: &T,
) -> Result<AckMessageResultEntry, ClientError> {
let result = self
.ack_message_inner(
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 9eb3d519..59d1eeb4 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -14,10 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+//! Error data model of RocketMQ rust client.
+
use std::error::Error;
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
+/// Error type using by [`ClientError`].
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ErrorKind {
#[error("Failed to parse config")]
@@ -51,6 +55,7 @@ pub enum ErrorKind {
Unknown,
}
+/// Error returned by producer or consumer.
pub struct ClientError {
pub(crate) kind: ErrorKind,
pub(crate) message: String,
@@ -62,7 +67,7 @@ pub struct ClientError {
impl Error for ClientError {}
impl ClientError {
- pub fn new(kind: ErrorKind, message: &str, operation: &'static str) ->
Self {
+ pub(crate) fn new(kind: ErrorKind, message: &str, operation: &'static str)
-> Self {
Self {
kind,
message: message.to_string(),
@@ -72,7 +77,32 @@ impl ClientError {
}
}
- pub fn with_operation(mut self, operation: &'static str) -> Self {
+ /// Error type
+ pub fn kind(&self) -> &ErrorKind {
+ &self.kind
+ }
+
+ /// Error message
+ pub fn message(&self) -> &str {
+ &self.message
+ }
+
+ /// Name of operation that produced this error
+ pub fn operation(&self) -> &str {
+ self.operation
+ }
+
+ /// Error context, formatted in key-value pairs
+ pub fn context(&self) -> &Vec<(&'static str, String)> {
+ &self.context
+ }
+
+ /// Source error
+ pub fn source(&self) -> Option<&anyhow::Error> {
+ self.source.as_ref()
+ }
+
+ pub(crate) fn with_operation(mut self, operation: &'static str) -> Self {
if !self.operation.is_empty() {
self.context.push(("called", self.operation.to_string()));
}
@@ -81,12 +111,12 @@ impl ClientError {
self
}
- pub fn with_context(mut self, key: &'static str, value: impl Into<String>)
-> Self {
+ pub(crate) fn with_context(mut self, key: &'static str, value: impl
Into<String>) -> Self {
self.context.push((key, value.into()));
self
}
- pub fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
+ pub(crate) fn set_source(mut self, src: impl Into<anyhow::Error>) -> Self {
debug_assert!(self.source.is_none(), "the source error has been set");
self.source = Some(src.into());
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 1a13a5a9..fcac3202 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -64,7 +64,7 @@
//! debug_assert!(result.is_ok(), "send message failed: {:?}", result);
//! println!(
//! "send message success, message_id={}",
-//! result.unwrap().message_id
+//! result.unwrap().message_id()
//! );
//! }
//! ```
@@ -108,7 +108,7 @@
//! for message in messages {
//! println!("receive message: {:?}", message);
//! // ack message to rocketmq proxy
-//! let ack_result = consumer.ack(message).await;
+//! let ack_result = consumer.ack(&message).await;
//! debug_assert!(
//! ack_result.is_ok(),
//! "ack message failed: {:?}",
@@ -121,7 +121,7 @@
#[allow(dead_code)]
pub mod conf;
-mod error;
+pub mod error;
#[allow(dead_code)]
mod log;
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 4023b959..deaa1235 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -228,6 +228,32 @@ impl FilterExpression {
}
}
+/// Send result returned by producer.
+#[derive(Clone, Debug)]
+pub struct SendReceipt {
+ pub(crate) message_id: String,
+ pub(crate) transaction_id: String,
+}
+
+impl SendReceipt {
+ pub(crate) fn from_pb_send_result(entry: &pb::SendResultEntry) -> Self {
+ SendReceipt {
+ message_id: entry.message_id.clone(),
+ transaction_id: entry.transaction_id.clone(),
+ }
+ }
+
+ /// Get message id
+ pub fn message_id(&self) -> &str {
+ &self.message_id
+ }
+
+ /// Get transaction id
+ pub fn transaction_id(&self) -> &str {
+ &self.transaction_id
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::error::ErrorKind;
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 697b70e3..8de941c8 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -25,9 +25,9 @@ use slog::{info, Logger};
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::{ClientError, ErrorKind};
-use crate::model::common::ClientType;
+use crate::model::common::{ClientType, SendReceipt};
use crate::model::message;
-use crate::pb::{Encoding, Resource, SendResultEntry, SystemProperties};
+use crate::pb::{Encoding, Resource, SystemProperties};
use crate::util::{
build_endpoints_by_message_queue, build_producer_settings,
select_message_queue,
select_message_queue_by_message_group, HOST_NAME,
@@ -185,7 +185,7 @@ impl Producer {
pub async fn send_one(
&self,
message: impl message::Message,
- ) -> Result<SendResultEntry, ClientError> {
+ ) -> Result<SendReceipt, ClientError> {
let results = self.send(vec![message]).await?;
Ok(results[0].clone())
}
@@ -198,7 +198,7 @@ impl Producer {
pub async fn send(
&self,
messages: Vec<impl message::Message>,
- ) -> Result<Vec<SendResultEntry>, ClientError> {
+ ) -> Result<Vec<SendReceipt>, ClientError> {
let (topic, message_group, mut pb_messages) =
self.transform_messages_to_protobuf(messages)?;
@@ -222,12 +222,13 @@ impl Producer {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use crate::error::ErrorKind;
use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl};
- use crate::pb::{Broker, Code, MessageQueue, Status};
- use std::sync::Arc;
+ use crate::pb::{Broker, MessageQueue};
use super::*;
@@ -389,14 +390,9 @@ mod tests {
}))
});
producer.client.expect_send_message().returning(|_, _| {
- Ok(vec![SendResultEntry {
- status: Some(Status {
- code: Code::Ok as i32,
- message: "".to_string(),
- }),
+ Ok(vec![SendReceipt {
message_id: "".to_string(),
transaction_id: "".to_string(),
- offset: 0,
}])
});
producer
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 6ef9b97d..0660931f 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -22,7 +22,7 @@ use slog::{debug, error, info, o, Logger};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
-use tonic::metadata::AsciiMetadataValue;
+use tonic::metadata::{AsciiMetadataValue, MetadataMap};
use tonic::transport::{Channel, Endpoint};
use crate::conf::ClientOption;
@@ -193,8 +193,15 @@ impl Session {
self.endpoints.endpoint_url()
}
- fn sign<T>(&self, mut request: tonic::Request<T>) -> tonic::Request<T> {
+ fn sign<T>(&self, message: T) -> tonic::Request<T> {
+ let mut request = tonic::Request::new(message);
let metadata = request.metadata_mut();
+ self.sign_without_timeout(metadata);
+ request.set_timeout(*self.option.timeout());
+ request
+ }
+
+ fn sign_without_timeout(&self, metadata: &mut MetadataMap) {
let _ = AsciiMetadataValue::try_from(&self.client_id)
.map(|v| metadata.insert("x-mq-client-id", v));
@@ -210,9 +217,6 @@ impl Session {
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);
-
- request.set_timeout(*self.option.timeout());
- request
}
pub(crate) async fn start(&mut self, settings: TelemetryCommand) ->
Result<(), ClientError> {
@@ -226,7 +230,8 @@ impl Session {
.set_source(e)
})?;
- let request = self.sign(tonic::Request::new(ReceiverStream::new(rx)));
+ let mut request = tonic::Request::new(ReceiverStream::new(rx));
+ self.sign_without_timeout(request.metadata_mut());
let response = self.stub.telemetry(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
@@ -299,7 +304,7 @@ impl RPCClient for Session {
&mut self,
request: QueryRouteRequest,
) -> Result<QueryRouteResponse, ClientError> {
- let request = self.sign(tonic::Request::new(request));
+ let request = self.sign(request);
let response = self.stub.query_route(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
@@ -315,7 +320,7 @@ impl RPCClient for Session {
&mut self,
request: HeartbeatRequest,
) -> Result<HeartbeatResponse, ClientError> {
- let request = self.sign(tonic::Request::new(request));
+ let request = self.sign(request);
let response = self.stub.heartbeat(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
@@ -331,7 +336,7 @@ impl RPCClient for Session {
&mut self,
request: SendMessageRequest,
) -> Result<SendMessageResponse, ClientError> {
- let request = self.sign(tonic::Request::new(request));
+ let request = self.sign(request);
let response = self.stub.send_message(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
@@ -348,7 +353,7 @@ impl RPCClient for Session {
request: ReceiveMessageRequest,
) -> Result<Vec<ReceiveMessageResponse>, ClientError> {
let batch_size = request.batch_size;
- let mut request = self.sign(tonic::Request::new(request));
+ let mut request = self.sign(request);
request.set_timeout(*self.option.long_polling_timeout());
let mut stream = self
.stub
@@ -383,7 +388,7 @@ impl RPCClient for Session {
&mut self,
request: AckMessageRequest,
) -> Result<AckMessageResponse, ClientError> {
- let request = self.sign(tonic::Request::new(request));
+ let request = self.sign(request);
let response = self.stub.ack_message(request).await.map_err(|e| {
ClientError::new(
ErrorKind::ClientInternal,
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index eefa93b1..cf74e820 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -48,6 +48,7 @@ pub struct SimpleConsumer {
}
impl SimpleConsumer {
+ const OPERATION_NEW_SIMPLE_CONSUMER: &'static str = "simple_consumer.new";
const OPERATION_START_SIMPLE_CONSUMER: &'static str =
"simple_consumer.start";
const OPERATION_RECEIVE_MESSAGE: &'static str =
"simple_consumer.receive_message";
@@ -56,6 +57,14 @@ impl SimpleConsumer {
option: SimpleConsumerOption,
client_option: ClientOption,
) -> Result<Self, ClientError> {
+ if option.consumer_group().is_empty() {
+ return Err(ClientError::new(
+ ErrorKind::Config,
+ "required option is missing: consumer group is empty",
+ Self::OPERATION_NEW_SIMPLE_CONSUMER,
+ ));
+ }
+
let client_option = ClientOption {
client_type: ClientType::SimpleConsumer,
group: option.consumer_group().to_string(),
@@ -120,12 +129,12 @@ impl SimpleConsumer {
/// * `invisible_duration` - set the invisible duration of messages that
return from the server, these messages will not be visible to other consumers
unless timeout
pub async fn receive_with_batch_size(
&self,
- topic: &str,
+ topic: impl AsRef<str>,
expression: &FilterExpression,
batch_size: i32,
invisible_duration: Duration,
) -> Result<Vec<MessageView>, ClientError> {
- let route = self.client.topic_route(topic, true).await?;
+ let route = self.client.topic_route(topic.as_ref(), true).await?;
let message_queue = select_message_queue(route);
let endpoints =
build_endpoints_by_message_queue(&message_queue,
Self::OPERATION_RECEIVE_MESSAGE)?;
@@ -155,7 +164,10 @@ impl SimpleConsumer {
/// # Arguments
///
/// * `ack_entry` - special message view with handle want to ack
- pub async fn ack(&self, ack_entry: impl AckMessageEntry + 'static) ->
Result<(), ClientError> {
+ pub async fn ack(
+ &self,
+ ack_entry: &(impl AckMessageEntry + 'static),
+ ) -> Result<(), ClientError> {
self.client.ack_message(ack_entry).await?;
Ok(())
}
@@ -238,7 +250,7 @@ mod tests {
});
client
.expect_ack_message()
- .returning(|_: MessageView| Ok(AckMessageResultEntry::default()));
+ .returning(|_: &MessageView| Ok(AckMessageResultEntry::default()));
let simple_consumer = SimpleConsumer {
option: SimpleConsumerOption::default(),
logger: terminal_logger(),
@@ -253,7 +265,7 @@ mod tests {
.await?;
assert_eq!(messages.len(), 1);
simple_consumer
- .ack(messages.into_iter().next().unwrap())
+ .ack(&messages.into_iter().next().unwrap())
.await?;
Ok(())
}