This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new fa8b4cbb [ISSUE #526] feat(rust): support ak/sk authorization (#527)
fa8b4cbb is described below
commit fa8b4cbbcb9d68be77863a4644bae8e888e5b1be
Author: SSpirits <[email protected]>
AuthorDate: Tue Jun 6 13:14:13 2023 +0800
[ISSUE #526] feat(rust): support ak/sk authorization (#527)
* feat(rust): support ak/sk authorization
Signed-off-by: SSpirits <[email protected]>
* feat(rust): change ak/sk type to option
Signed-off-by: SSpirits <[email protected]>
* Make code idiomatic
Signed-off-by: Li Zhanhui <[email protected]>
* feat(rust): fix license
Signed-off-by: SSpirits <[email protected]>
* feat(rust): optimize code
Signed-off-by: SSpirits <[email protected]>
* feat(rust): fix msrv test
Signed-off-by: SSpirits <[email protected]>
* fix(rust): fix msrv test
Signed-off-by: SSpirits <[email protected]>
---------
Signed-off-by: SSpirits <[email protected]>
Signed-off-by: Li Zhanhui <[email protected]>
Co-authored-by: Li Zhanhui <[email protected]>
---
.github/workflows/rust_build.yml | 2 +-
rust/.cargo/Cargo.lock.min | 65 ++--------------------------------------
rust/.cargo/config.toml | 18 +++++++++++
rust/Cargo.toml | 2 +-
rust/src/client.rs | 5 ----
rust/src/conf.rs | 25 +++++++++++++++-
rust/src/session.rs | 38 ++++++++++++++++++++++-
7 files changed, 83 insertions(+), 72 deletions(-)
diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index b7e43500..6689dd70 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -65,7 +65,7 @@ jobs:
toolchain: ${{ matrix.msrv }}
- name: Check MSRV ${{ matrix.msrv }}
working-directory: ./rust
- run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{
matrix.msrv }} check --locked --frozen
+ run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }}
fetch && cargo +${{ matrix.msrv }} check --locked --frozen
build:
name: "${{ matrix.os }}"
runs-on: ${{ matrix.os }}
diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min
index 9262a080..3b2ccb86 100644
--- a/rust/.cargo/Cargo.lock.min
+++ b/rust/.cargo/Cargo.lock.min
@@ -136,15 +136,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
-[[package]]
-name = "block-buffer"
-version = "0.10.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
-dependencies = [
- "generic-array",
-]
-
[[package]]
name = "bumpalo"
version = "3.12.0"
@@ -210,16 +201,6 @@ dependencies = [
"cfg-if",
]
-[[package]]
-name = "crypto-common"
-version = "0.1.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
-dependencies = [
- "generic-array",
- "typenum",
-]
-
[[package]]
name = "ctor"
version = "0.1.26"
@@ -249,17 +230,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
-[[package]]
-name = "digest"
-version = "0.10.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
-dependencies = [
- "block-buffer",
- "crypto-common",
- "subtle",
-]
-
[[package]]
name = "dirs-next"
version = "2.0.0"
@@ -439,16 +409,6 @@ dependencies = [
"slab",
]
-[[package]]
-name = "generic-array"
-version = "0.14.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
-dependencies = [
- "typenum",
- "version_check",
-]
-
[[package]]
name = "getrandom"
version = "0.2.9"
@@ -521,15 +481,6 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
-[[package]]
-name = "hmac"
-version = "0.12.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
-dependencies = [
- "digest",
-]
-
[[package]]
name = "hostname"
version = "0.3.1"
@@ -1315,7 +1266,7 @@ dependencies = [
[[package]]
name = "rocketmq"
-version = "0.1.0"
+version = "0.1.1"
dependencies = [
"anyhow",
"async-trait",
@@ -1323,7 +1274,6 @@ dependencies = [
"byteorder",
"futures",
"hex",
- "hmac",
"hostname",
"lazy_static",
"mac_address",
@@ -1338,6 +1288,7 @@ dependencies = [
"prost 0.11.9",
"prost-types",
"regex",
+ "ring",
"siphasher",
"slog",
"slog-async",
@@ -1578,12 +1529,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
-[[package]]
-name = "subtle"
-version = "2.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
-
[[package]]
name = "syn"
version = "1.0.109"
@@ -1974,12 +1919,6 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
-[[package]]
-name = "typenum"
-version = "1.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
-
[[package]]
name = "unicode-ident"
version = "1.0.8"
diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml
new file mode 100644
index 00000000..311a0281
--- /dev/null
+++ b/rust/.cargo/config.toml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[registries.crates-io]
+protocol = "sparse"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80c06fe2..d7655d1f 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -43,7 +43,6 @@ prost-types = "0.11.8"
thiserror = "1.0"
anyhow = "1.0.70"
parking_lot = "0.12"
-hmac = "0.12"
hostname = "0.3.1"
os_type = "2.6.0"
@@ -67,6 +66,7 @@ mockall = "0.11.4"
mockall_double= "0.3.0"
siphasher = "0.3.10"
+ring = "0.16.20"
[build-dependencies]
tonic-build = "0.9.0"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index f13c3a68..4b601fac 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -372,15 +372,10 @@ impl Client {
mut rpc_client: T,
messages: Vec<Message>,
) -> Result<Vec<SendReceipt>, ClientError> {
- let message_count = messages.len();
let request = SendMessageRequest { messages };
let response = rpc_client.send_message(request).await?;
Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
- if response.entries.len() != message_count {
- error!(self.logger, "server do not return illegal send result,
this may be a bug. except result count: {}, found: {}", response.entries.len(),
message_count);
- }
-
Ok(response
.entries
.iter()
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 7ecb6917..95b7adcb 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -17,12 +17,13 @@
//! Configuration of RocketMQ rust client.
+use std::time::Duration;
+
use crate::model::common::ClientType;
#[allow(unused_imports)]
use crate::producer::Producer;
#[allow(unused_imports)]
use crate::simple_consumer::SimpleConsumer;
-use std::time::Duration;
/// [`ClientOption`] is the configuration of internal client, which manages
the connection and request with RocketMQ proxy.
#[derive(Debug, Clone)]
@@ -34,6 +35,8 @@ pub struct ClientOption {
pub(crate) enable_tls: bool,
pub(crate) timeout: Duration,
pub(crate) long_polling_timeout: Duration,
+ pub(crate) access_key: Option<String>,
+ pub(crate) secret_key: Option<String>,
}
impl Default for ClientOption {
@@ -46,6 +49,8 @@ impl Default for ClientOption {
enable_tls: true,
timeout: Duration::from_secs(3),
long_polling_timeout: Duration::from_secs(40),
+ access_key: None,
+ secret_key: None,
}
}
}
@@ -88,6 +93,24 @@ impl ClientOption {
pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration)
{
self.long_polling_timeout = long_polling_timeout;
}
+
+ /// Get the access key
+ pub fn access_key(&self) -> Option<&String> {
+ self.access_key.as_ref()
+ }
+ /// Set the access key
+ pub fn set_access_key(&mut self, access_key: impl Into<String>) {
+ self.access_key = Some(access_key.into());
+ }
+
+ /// Get the secret key
+ pub fn secret_key(&self) -> Option<&String> {
+ self.secret_key.as_ref()
+ }
+ /// Set the secret key
+ pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
+ self.secret_key = Some(secret_key.into());
+ }
}
/// Log format for output.
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 0660931f..229d9e56 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,7 +18,10 @@ use std::collections::HashMap;
use async_trait::async_trait;
use mockall::automock;
+use ring::hmac;
use slog::{debug, error, info, o, Logger};
+use time::format_description::well_known::Rfc3339;
+use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
@@ -217,6 +220,39 @@ impl Session {
"x-mq-protocol-version",
AsciiMetadataValue::from_static(PROTOCOL_VERSION),
);
+
+ let date_time_result = OffsetDateTime::now_local();
+ let date_time = if let Ok(result) = date_time_result {
+ result
+ } else {
+ OffsetDateTime::now_utc()
+ };
+
+ let date_time = date_time.format(&Rfc3339).unwrap();
+
+ metadata.insert(
+ "x-mq-date-time",
+ AsciiMetadataValue::try_from(&date_time).unwrap(),
+ );
+
+ if let Some((access_key, access_secret)) =
+ self.option.access_key().zip(self.option.secret_key())
+ {
+ let key = hmac::Key::new(
+ hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
+ access_secret.as_bytes(),
+ );
+ let signature = hmac::sign(&key, date_time.as_bytes());
+ let signature = hex::encode(signature.as_ref());
+ let authorization = format!(
+ "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time,
Signature={}",
+ access_key, signature
+ );
+ metadata.insert(
+ "authorization",
+ AsciiMetadataValue::try_from(authorization).unwrap(),
+ );
+ }
}
pub(crate) async fn start(&mut self, settings: TelemetryCommand) ->
Result<(), ClientError> {
@@ -458,10 +494,10 @@ impl SessionManager {
#[cfg(test)]
mod tests {
- use crate::conf::ProducerOption;
use slog::debug;
use wiremock_grpc::generate;
+ use crate::conf::ProducerOption;
use crate::log::terminal_logger;
use crate::util::build_producer_settings;