This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 90231bde fix: Fix Rust clippy warnings (#458)
90231bde is described below
commit 90231bde4940bbc914c544759a385dffaf1ed415
Author: Zhanhui Li <[email protected]>
AuthorDate: Tue Apr 4 14:25:41 2023 +0800
fix: Fix Rust clippy warnings (#458)
* Fix clippy warnings
Signed-off-by: Li Zhanhui <[email protected]>
* Add doc for Producer
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
---
rust/Cargo.toml | 7 ++++---
rust/src/client.rs | 20 ++++++++++++++++----
rust/src/lib.rs | 7 ++++++-
rust/src/model.rs | 1 +
rust/src/producer.rs | 13 +++++++++++--
rust/src/session.rs | 8 ++++----
6 files changed, 42 insertions(+), 14 deletions(-)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index da016646..1850b95d 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -5,9 +5,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
tokio-rustls = {version = "0.24.0", features = ["default",
"dangerous_configuration"] }
async-trait = "0.1.68"
+lazy_static = "1.4"
tonic = {version = "0.9.0", features = ["tls", "default", "channel",
"tls-roots"]}
prost = "0.11.8"
prost-types = "0.11.8"
@@ -50,4 +51,4 @@ tonic-build = "0.9.0"
[dev-dependencies]
wiremock-grpc = "0.0.3-alpha2"
-futures = "0.3"
\ No newline at end of file
+futures = "0.3"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 326c8b9e..0c80d049 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -28,8 +28,7 @@ use crate::pb::{
};
use crate::session::{RPCClient, Session, SessionManager};
-pub trait Foo {}
-
+#[derive(Debug)]
pub(crate) struct Client {
logger: Logger,
option: ClientOption,
@@ -39,7 +38,9 @@ pub(crate) struct Client {
endpoints: Endpoints,
}
-static CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
+lazy_static::lazy_static! {
+ static ref CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
+}
impl Client {
const OPERATION_CLIENT_NEW: &'static str = "client.new";
@@ -50,7 +51,7 @@ impl Client {
let id = Self::generate_client_id();
let endpoints =
Endpoints::from_access_url(option.access_url().to_string())
.map_err(|e| e.with_operation(Self::OPERATION_CLIENT_NEW))?;
- let session_manager = SessionManager::new(&logger, id.clone(),
&option);
+ let session_manager = SessionManager::new(logger, id.clone(), &option);
Ok(Client {
logger: logger.new(o!("component" => "client")),
option,
@@ -250,12 +251,23 @@ impl Client {
#[cfg(test)]
mod tests {
+ use std::sync::atomic::Ordering;
+
use crate::client::Client;
use crate::conf::ClientOption;
use crate::log::terminal_logger;
use crate::pb::{Code, MessageQueue, QueryRouteResponse, Resource, Status};
use crate::session;
+ use super::CLIENT_ID_SEQUENCE;
+
+ #[test]
+ fn test_client_id_sequence() {
+ let v1 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
+ let v2 = CLIENT_ID_SEQUENCE.fetch_add(1, Ordering::Relaxed);
+ assert!(v2 > v1, "Client ID sequence should be increasing");
+ }
+
#[tokio::test]
async fn client_query_route() {
let logger = terminal_logger();
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 856017a0..35d0d907 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -23,8 +23,13 @@ mod log;
mod client;
mod model;
+
+#[allow(clippy::all)]
#[path = "pb/apache.rocketmq.v2.rs"]
mod pb;
mod session;
-mod producer;
+pub(crate) mod producer;
+
+// Export structs that are part of crate API.
+pub use producer::Producer;
diff --git a/rust/src/model.rs b/rust/src/model.rs
index 902d0723..e549b06b 100644
--- a/rust/src/model.rs
+++ b/rust/src/model.rs
@@ -34,6 +34,7 @@ pub(crate) enum RouteStatus {
Found(Arc<Route>),
}
+#[derive(Debug)]
pub(crate) struct Endpoints {
access_url: String,
scheme: AddressScheme,
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 598920c3..891c9c4a 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -14,15 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-use slog::{info, Logger};
+
+//! Publish messages of various types to brokers.
+//!
+//! `Producer` is a thin wrapper of internal `Client` struct that shoulders
the actual workloads.
+//! Most of its methods take shared reference so that application developers
may use it at will.
use crate::client::Client;
use crate::conf::{ClientOption, ProducerOption};
use crate::error::ClientError;
use crate::log;
use crate::pb::{Message, SendResultEntry};
+use slog::{info, Logger};
-struct Producer {
+/// `Producer` is the core struct, to which application developers should
turn, when publishing messages to brokers.
+///
+/// `Producer` is `Send` and `Sync` by design, so that developers may get
started easily.
+#[derive(Debug)]
+pub struct Producer {
option: ProducerOption,
logger: Logger,
client: Client,
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 08bc997c..182a2354 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -64,7 +64,7 @@ impl Session {
client_id: String,
option: &ClientOption,
) -> Result<Self, ClientError> {
- let peer = endpoints.access_url().clone();
+ let peer = endpoints.access_url().to_owned();
debug!(logger, "creating session, peer={}", peer);
let mut channel_endpoints = Vec::new();
@@ -78,7 +78,7 @@ impl Session {
"No endpoint available.".to_string(),
Self::OPERATION_CREATE,
)
- .with_context("peer", peer));
+ .with_context("peer", peer.clone()));
}
let channel = if channel_endpoints.len() == 1 {
@@ -89,7 +89,7 @@ impl Session {
Self::OPERATION_CREATE,
)
.set_source(e)
- .with_context("peer", peer)
+ .with_context("peer", peer.clone())
})?
} else {
Channel::balance_list(channel_endpoints.into_iter())
@@ -104,7 +104,7 @@ impl Session {
);
Ok(Session {
- logger: logger.new(o!("component" => "session", "peer" =>
peer.to_owned())),
+ logger: logger.new(o!("component" => "session", "peer" =>
peer.clone())),
client_id,
stub,
})