This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8b4cbb [ISSUE #526] feat(rust): support ak/sk authorization (#527)
fa8b4cbb is described below

commit fa8b4cbbcb9d68be77863a4644bae8e888e5b1be
Author: SSpirits <[email protected]>
AuthorDate: Tue Jun 6 13:14:13 2023 +0800

    [ISSUE #526] feat(rust): support ak/sk authorization (#527)
    
    * feat(rust): support ak/sk authorization
    
    Signed-off-by: SSpirits <[email protected]>
    
    * feat(rust): change ak/sk type to option
    
    Signed-off-by: SSpirits <[email protected]>
    
    * Make code idiomatic
    
    Signed-off-by: Li Zhanhui <[email protected]>
    
    * feat(rust): fix license
    
    Signed-off-by: SSpirits <[email protected]>
    
    * feat(rust): optimize code
    
    Signed-off-by: SSpirits <[email protected]>
    
    * feat(rust): fix msrv test
    
    Signed-off-by: SSpirits <[email protected]>
    
    * fix(rust): fix msrv test
    
    Signed-off-by: SSpirits <[email protected]>
    
    ---------
    
    Signed-off-by: SSpirits <[email protected]>
    Signed-off-by: Li Zhanhui <[email protected]>
    Co-authored-by: Li Zhanhui <[email protected]>
---
 .github/workflows/rust_build.yml |  2 +-
 rust/.cargo/Cargo.lock.min       | 65 ++--------------------------------------
 rust/.cargo/config.toml          | 18 +++++++++++
 rust/Cargo.toml                  |  2 +-
 rust/src/client.rs               |  5 ----
 rust/src/conf.rs                 | 25 +++++++++++++++-
 rust/src/session.rs              | 38 ++++++++++++++++++++++-
 7 files changed, 83 insertions(+), 72 deletions(-)

diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index b7e43500..6689dd70 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -65,7 +65,7 @@ jobs:
           toolchain: ${{ matrix.msrv }}
       - name: Check MSRV ${{ matrix.msrv }}
         working-directory: ./rust
-        run: cp .cargo/Cargo.lock.min Cargo.lock && cargo fetch && cargo +${{ 
matrix.msrv }} check --locked --frozen
+        run: cp .cargo/Cargo.lock.min Cargo.lock && cargo +${{ matrix.msrv }} 
fetch && cargo +${{ matrix.msrv }} check --locked --frozen
   build:
     name: "${{ matrix.os }}"
     runs-on: ${{ matrix.os }}
diff --git a/rust/.cargo/Cargo.lock.min b/rust/.cargo/Cargo.lock.min
index 9262a080..3b2ccb86 100644
--- a/rust/.cargo/Cargo.lock.min
+++ b/rust/.cargo/Cargo.lock.min
@@ -136,15 +136,6 @@ version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
-[[package]]
-name = "block-buffer"
-version = "0.10.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
-dependencies = [
- "generic-array",
-]
-
 [[package]]
 name = "bumpalo"
 version = "3.12.0"
@@ -210,16 +201,6 @@ dependencies = [
  "cfg-if",
 ]
 
-[[package]]
-name = "crypto-common"
-version = "0.1.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
-dependencies = [
- "generic-array",
- "typenum",
-]
-
 [[package]]
 name = "ctor"
 version = "0.1.26"
@@ -249,17 +230,6 @@ version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
 
-[[package]]
-name = "digest"
-version = "0.10.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
-dependencies = [
- "block-buffer",
- "crypto-common",
- "subtle",
-]
-
 [[package]]
 name = "dirs-next"
 version = "2.0.0"
@@ -439,16 +409,6 @@ dependencies = [
  "slab",
 ]
 
-[[package]]
-name = "generic-array"
-version = "0.14.7"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
-dependencies = [
- "typenum",
- "version_check",
-]
-
 [[package]]
 name = "getrandom"
 version = "0.2.9"
@@ -521,15 +481,6 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
-[[package]]
-name = "hmac"
-version = "0.12.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
-dependencies = [
- "digest",
-]
-
 [[package]]
 name = "hostname"
 version = "0.3.1"
@@ -1315,7 +1266,7 @@ dependencies = [
 
 [[package]]
 name = "rocketmq"
-version = "0.1.0"
+version = "0.1.1"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -1323,7 +1274,6 @@ dependencies = [
  "byteorder",
  "futures",
  "hex",
- "hmac",
  "hostname",
  "lazy_static",
  "mac_address",
@@ -1338,6 +1288,7 @@ dependencies = [
  "prost 0.11.9",
  "prost-types",
  "regex",
+ "ring",
  "siphasher",
  "slog",
  "slog-async",
@@ -1578,12 +1529,6 @@ version = "0.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
 
-[[package]]
-name = "subtle"
-version = "2.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
-
 [[package]]
 name = "syn"
 version = "1.0.109"
@@ -1974,12 +1919,6 @@ version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
 
-[[package]]
-name = "typenum"
-version = "1.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
-
 [[package]]
 name = "unicode-ident"
 version = "1.0.8"
diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml
new file mode 100644
index 00000000..311a0281
--- /dev/null
+++ b/rust/.cargo/config.toml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[registries.crates-io]
+protocol = "sparse"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80c06fe2..d7655d1f 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -43,7 +43,6 @@ prost-types = "0.11.8"
 thiserror = "1.0"
 anyhow = "1.0.70"
 parking_lot = "0.12"
-hmac = "0.12"
 hostname = "0.3.1"
 os_type = "2.6.0"
 
@@ -67,6 +66,7 @@ mockall = "0.11.4"
 mockall_double= "0.3.0"
 
 siphasher = "0.3.10"
+ring = "0.16.20"
 
 [build-dependencies]
 tonic-build = "0.9.0"
diff --git a/rust/src/client.rs b/rust/src/client.rs
index f13c3a68..4b601fac 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -372,15 +372,10 @@ impl Client {
         mut rpc_client: T,
         messages: Vec<Message>,
     ) -> Result<Vec<SendReceipt>, ClientError> {
-        let message_count = messages.len();
         let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
         Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
 
-        if response.entries.len() != message_count {
-            error!(self.logger, "server do not return illegal send result, 
this may be a bug. except result count: {}, found: {}", response.entries.len(), 
message_count);
-        }
-
         Ok(response
             .entries
             .iter()
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 7ecb6917..95b7adcb 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -17,12 +17,13 @@
 
 //! Configuration of RocketMQ rust client.
 
+use std::time::Duration;
+
 use crate::model::common::ClientType;
 #[allow(unused_imports)]
 use crate::producer::Producer;
 #[allow(unused_imports)]
 use crate::simple_consumer::SimpleConsumer;
-use std::time::Duration;
 
 /// [`ClientOption`] is the configuration of internal client, which manages 
the connection and request with RocketMQ proxy.
 #[derive(Debug, Clone)]
@@ -34,6 +35,8 @@ pub struct ClientOption {
     pub(crate) enable_tls: bool,
     pub(crate) timeout: Duration,
     pub(crate) long_polling_timeout: Duration,
+    pub(crate) access_key: Option<String>,
+    pub(crate) secret_key: Option<String>,
 }
 
 impl Default for ClientOption {
@@ -46,6 +49,8 @@ impl Default for ClientOption {
             enable_tls: true,
             timeout: Duration::from_secs(3),
             long_polling_timeout: Duration::from_secs(40),
+            access_key: None,
+            secret_key: None,
         }
     }
 }
@@ -88,6 +93,24 @@ impl ClientOption {
     pub fn set_long_polling_timeout(&mut self, long_polling_timeout: Duration) 
{
         self.long_polling_timeout = long_polling_timeout;
     }
+
+    /// Get the access key
+    pub fn access_key(&self) -> Option<&String> {
+        self.access_key.as_ref()
+    }
+    /// Set the access key
+    pub fn set_access_key(&mut self, access_key: impl Into<String>) {
+        self.access_key = Some(access_key.into());
+    }
+
+    /// Get the secret key
+    pub fn secret_key(&self) -> Option<&String> {
+        self.secret_key.as_ref()
+    }
+    /// Set the secret key
+    pub fn set_secret_key(&mut self, secret_key: impl Into<String>) {
+        self.secret_key = Some(secret_key.into());
+    }
 }
 
 /// Log format for output.
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 0660931f..229d9e56 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,7 +18,10 @@ use std::collections::HashMap;
 
 use async_trait::async_trait;
 use mockall::automock;
+use ring::hmac;
 use slog::{debug, error, info, o, Logger};
+use time::format_description::well_known::Rfc3339;
+use time::OffsetDateTime;
 use tokio::sync::{mpsc, Mutex};
 use tokio_stream::wrappers::ReceiverStream;
 use tokio_stream::StreamExt;
@@ -217,6 +220,39 @@ impl Session {
             "x-mq-protocol-version",
             AsciiMetadataValue::from_static(PROTOCOL_VERSION),
         );
+
+        let date_time_result = OffsetDateTime::now_local();
+        let date_time = if let Ok(result) = date_time_result {
+            result
+        } else {
+            OffsetDateTime::now_utc()
+        };
+
+        let date_time = date_time.format(&Rfc3339).unwrap();
+
+        metadata.insert(
+            "x-mq-date-time",
+            AsciiMetadataValue::try_from(&date_time).unwrap(),
+        );
+
+        if let Some((access_key, access_secret)) =
+            self.option.access_key().zip(self.option.secret_key())
+        {
+            let key = hmac::Key::new(
+                hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
+                access_secret.as_bytes(),
+            );
+            let signature = hmac::sign(&key, date_time.as_bytes());
+            let signature = hex::encode(signature.as_ref());
+            let authorization = format!(
+                "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, 
Signature={}",
+                access_key, signature
+            );
+            metadata.insert(
+                "authorization",
+                AsciiMetadataValue::try_from(authorization).unwrap(),
+            );
+        }
     }
 
     pub(crate) async fn start(&mut self, settings: TelemetryCommand) -> 
Result<(), ClientError> {
@@ -458,10 +494,10 @@ impl SessionManager {
 
 #[cfg(test)]
 mod tests {
-    use crate::conf::ProducerOption;
     use slog::debug;
     use wiremock_grpc::generate;
 
+    use crate::conf::ProducerOption;
     use crate::log::terminal_logger;
     use crate::util::build_producer_settings;
 

Reply via email to