This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f54b8b77 feat: add some basic message model(id,view,message) (#462)
f54b8b77 is described below

commit f54b8b779d380263828d28ddb56782e489004b3a
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Wed Apr 5 09:57:16 2023 +0800

    feat: add some basic message model(id,view,message) (#462)
    
    * feat: add some basic message model(id,view,message)
    
    * feat: add some basic message model(id,view,message)
---
 rust/Cargo.toml                             |   7 ++
 rust/src/lib.rs                             |   2 +
 rust/src/models/message.rs                  |  47 ++++++++++
 rust/src/models/message_id.rs               | 140 ++++++++++++++++++++++++++++
 rust/src/{lib.rs => models/message_view.rs} |  28 ++----
 rust/src/{lib.rs => models/mod.rs}          |  22 +----
 6 files changed, 208 insertions(+), 38 deletions(-)

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 1850b95d..d4154951 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -42,6 +42,13 @@ slog-json = "2.6.1"
 opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
 opentelemetry-otlp = { version = "0.12.0", features = ["metrics", 
"grpc-tonic"] }
 
+byteorder = "1"
+mac_address = "1.1.4"
+hex = "0.4.3"
+time = "0.3.19"
+once_cell = "1.9.0"
+tokio-stream="0.1.12"
+
 minitrace = "0.4.1"
 
 mockall = "0.11.4"
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index 35d0d907..17b9bd0d 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -33,3 +33,5 @@ pub(crate) mod producer;
 
 // Export structs that are part of crate API.
 pub use producer::Producer;
+
+pub mod models;
diff --git a/rust/src/models/message.rs b/rust/src/models/message.rs
new file mode 100644
index 00000000..f8366aa1
--- /dev/null
+++ b/rust/src/models/message.rs
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{
+    collections::HashMap,
+    io::Write,
+    mem, process,
+    sync::Arc,
+    sync::{atomic::AtomicUsize, Weak},
+};
+pub(crate) struct MessageImpl {
+    pub(crate) keys: Vec<String>,
+    pub(crate) body: Vec<u8>,
+    pub(crate) topic: String,
+    pub(crate) tags: String,
+    pub(crate) message_group: String,
+    pub(crate) delivery_timestamp: i64,
+    pub(crate) properties: HashMap<String, String>,
+}
+
+impl MessageImpl {
+    pub fn new(topic: &str, tags: &str, keys: Vec<String>, body: &str) -> Self 
{
+        MessageImpl {
+            keys: keys,
+            body: body.as_bytes().to_vec(),
+            topic: topic.to_string(),
+            tags: tags.to_string(),
+            message_group: "".to_string(),
+            delivery_timestamp: 0,
+            properties: HashMap::new(),
+        }
+    }
+}
diff --git a/rust/src/models/message_id.rs b/rust/src/models/message_id.rs
new file mode 100644
index 00000000..92d4e446
--- /dev/null
+++ b/rust/src/models/message_id.rs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use byteorder::{BigEndian, WriteBytesExt};
+use once_cell::sync::Lazy;
+use parking_lot::Mutex;
+use std::io::Write;
+use std::process;
+use std::time::SystemTime;
+use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
+
+/**
+ * The codec for the message-id.
+ *
+ * <p>Codec here provides the following two functions:
+ * 1. Provide decoding function of message-id of all versions above v0.
+ * 2. Provide a generator of message-id of v1 version.
+ *
+ * <p>The message-id of versions above V1 consists of 17 bytes in total. The 
first two bytes represent the version
+ * number. For V1, these two bytes are 0x0001.
+ *
+ * <h3>V1 message id example</h3>
+ *
+ * <pre>
+ * ┌──┬────────────┬────┬────────┬────────┐
+ * │01│56F7E71C361B│21BC│024CCDBE│00000000│
+ * └──┴────────────┴────┴────────┴────────┘
+ * </pre>
+ *
+ * <h3>V1 version message id generation rules</h3>
+ *
+ * <pre>
+ *                     process id(lower 2bytes)
+ *                             ▲
+ * mac address(lower 6bytes)   │   sequence number(big endian)
+ *                    ▲        │          ▲ (4bytes)
+ *                    │        │          │
+ *              ┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
+ *       0x01+  │     6     │ │2│ │ 4 │ │ 4 │
+ *              └───────────┘ └─┘ └─┬─┘ └───┘
+ *                                  │
+ *                                  ▼
+ *           seconds since 2021-01-01 00:00:00(UTC+0)
+ *                         (lower 4bytes)
+ * </pre>
+ */
+
+// inspired by https://github.com/messense/rocketmq-rs
+pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = 
Lazy::new(|| {
+    let mut wtr = Vec::new();
+    wtr.write_u8(1).unwrap();
+    //mac
+    let x = mac_address::get_mac_address().unwrap();
+    let ma = match x {
+        Some(ma) => ma,
+        None => {
+            panic!("mac address is none")
+        }
+    };
+    wtr.write_all(&ma.bytes()).unwrap();
+    //processid
+    wtr.write_u16::<byteorder::BigEndian>(process::id() as u16)
+        .unwrap();
+    let generator = UniqueIdGenerator {
+        counter: 0,
+        start_timestamp: 0,
+        next_timestamp: 0,
+        prefix: hex::encode_upper(wtr),
+    };
+    Mutex::new(generator)
+});
+
+pub struct UniqueIdGenerator {
+    counter: i16,
+    prefix: String,
+    start_timestamp: i64,
+    next_timestamp: i64,
+}
+
+impl UniqueIdGenerator {
+    pub fn generate(&mut self) -> String {
+        if SystemTime::now()
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_secs() as i64
+            > self.next_timestamp
+        {
+            // update timestamp
+            let now = OffsetDateTime::now_utc();
+            let year = now.year();
+            let month = now.month();
+            self.start_timestamp = PrimitiveDateTime::new(
+                Date::from_calendar_date(year, month, 1).unwrap(),
+                Time::from_hms(0, 0, 0).unwrap(),
+            )
+            .assume_offset(now.offset())
+            .unix_timestamp();
+            self.next_timestamp = (PrimitiveDateTime::new(
+                Date::from_calendar_date(year, month, 1).unwrap(),
+                Time::from_hms(0, 0, 0).unwrap(),
+            )
+            .assume_offset(now.offset())
+                + time::Duration::days(30))
+            .unix_timestamp();
+        }
+        self.counter = self.counter.wrapping_add(1);
+        let mut buf = Vec::new();
+        buf.write_i32::<BigEndian>(
+            ((OffsetDateTime::now_utc().unix_timestamp() - 
self.start_timestamp) * 1000) as i32,
+        )
+        .unwrap();
+        buf.write_i16::<BigEndian>(self.counter).unwrap();
+        self.prefix.clone() + &hex::encode(buf)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    #[test]
+    fn text_generate_uniq_id() {
+        use super::UNIQ_ID_GENERATOR;
+        for i in 0..10 {
+            let uid = UNIQ_ID_GENERATOR.lock().generate();
+            println!("i: {}, uid: {}", i, uid);
+        }
+    }
+}
diff --git a/rust/src/lib.rs b/rust/src/models/message_view.rs
similarity index 73%
copy from rust/src/lib.rs
copy to rust/src/models/message_view.rs
index 35d0d907..f0849b0b 100644
--- a/rust/src/lib.rs
+++ b/rust/src/models/message_view.rs
@@ -14,22 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#[allow(dead_code)]
-mod conf;
-#[allow(dead_code)]
-mod error;
-#[allow(dead_code)]
-mod log;
-
-mod client;
-mod model;
-
-#[allow(clippy::all)]
-#[path = "pb/apache.rocketmq.v2.rs"]
-mod pb;
-mod session;
-
-pub(crate) mod producer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
+#[derive(Debug)]
+pub(crate) struct MessageView {
+    pub(crate) body: Vec<u8>,
+    pub(crate) message_id: String,
+    pub(crate) topic: String,
+    pub(crate) consume_group: String,
+    pub(crate) endpoint: String,
+    pub(crate) receipt_handle: String,
+}
diff --git a/rust/src/lib.rs b/rust/src/models/mod.rs
similarity index 73%
copy from rust/src/lib.rs
copy to rust/src/models/mod.rs
index 35d0d907..8496998d 100644
--- a/rust/src/lib.rs
+++ b/rust/src/models/mod.rs
@@ -14,22 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#[allow(dead_code)]
-mod conf;
-#[allow(dead_code)]
-mod error;
-#[allow(dead_code)]
-mod log;
-
-mod client;
-mod model;
-
-#[allow(clippy::all)]
-#[path = "pb/apache.rocketmq.v2.rs"]
-mod pb;
-mod session;
-
-pub(crate) mod producer;
-
-// Export structs that are part of crate API.
-pub use producer::Producer;
+pub(crate) mod message;
+pub(crate) mod message_id;
+pub(crate) mod message_view;

Reply via email to