This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new eef25df Rpc context implementation #58 (#83)
eef25df is described below
commit eef25df3ef360e8e2791f78233a8b82173eba64f
Author: daedalus2022 <[email protected]>
AuthorDate: Thu Dec 8 22:31:14 2022 +0800
Rpc context implementation #58 (#83)
* Rpc context implementation #58
* 修复建议补充测试场景
* 修复建议补充测试场景
* advice: use tracing to replace with println
* advice: use tracing to replace with println
Co-authored-by: qunwei <[email protected]>
---
dubbo/Cargo.toml | 3 ++
dubbo/src/context.rs | 99 ++++++++++++++++++++++------------------------------
2 files changed, 44 insertions(+), 58 deletions(-)
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 7b38a28..bdde2ea 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -34,3 +34,6 @@ async-stream = "0.3"
flate2 = "1.0"
dubbo-config = {path = "../config", version = "0.2.0"}
+
+#对象存储
+state = { version = "0.5", features = ["tls"] }
\ No newline at end of file
diff --git a/dubbo/src/context.rs b/dubbo/src/context.rs
index 2cf3692..c438bcd 100644
--- a/dubbo/src/context.rs
+++ b/dubbo/src/context.rs
@@ -15,33 +15,14 @@
* limitations under the License.
*/
-use core::cell::RefCell;
-use std::any::Any;
use std::collections::HashMap;
-use std::fmt;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::thread;
-///
-/// ```rust
-/// use std::collections::HashMap;
-/// use std::sync::Arc;
-///
-/// let mut map = HashMap::<String, SafetyValue>::new();
-/// map.insert("key1".into(), Arc::new("data-1"));
-///
-/// // get a typed value from SafetyValue
-/// let value = map
-/// .get("key1")
-/// .and_then(|f| f.downcast_ref::<String>())
-/// .unwrap();
-///
-/// assert_eq!(value, "data-1");
-/// ```
-type SafetyValue = Arc<dyn Any + Sync + Send>;
+use serde_json::Value;
+use state::Container;
-thread_local! {
- static SERVICE_CONTEXT: RefCell<RpcContext> =
RefCell::new(RpcContext::default());
-}
+pub static APPLICATION_CONTEXT: Container![Send + Sync] = <Container![Send +
Sync]>::new();
///
/// All environment information of during the current call will put into the
context
@@ -53,37 +34,38 @@ thread_local! {
/// After B call C,the RpcContext record the information of B call C
///
#[derive(Clone, Default)]
-pub struct RpcContext {
- pub attachments: HashMap<String, SafetyValue>,
- // TODO
-}
-
-impl RpcContext {
- pub fn current() -> Self {
- get_current(|ctx| ctx.clone())
- }
+pub struct RpcContext {}
- pub fn clear(&mut self) {
- self.attachments.clear();
- }
+pub trait Context {
+ fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>>;
}
-fn get_current<F: FnMut(&RpcContext) -> T, T>(mut f: F) -> T {
- SERVICE_CONTEXT.try_with(|ctx| f(&ctx.borrow())).unwrap()
-}
+impl Context for RpcContext {
+ fn get_attachments() -> Option<Arc<Mutex<HashMap<String, Value>>>> {
+ let local =
APPLICATION_CONTEXT.try_get_local::<Arc<Mutex<HashMap<String, Value>>>>();
+
+ tracing::debug!("{:?} - {:?}", thread::current().id(), local);
-impl fmt::Debug for RpcContext {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Context")
- .field("attachments", &self.attachments)
- .finish()
+ match local {
+ Some(attachment) => Some(attachment.clone()),
+ None => {
+ let attachment = HashMap::<String, Value>::new();
+ let mutex = Arc::new(Mutex::new(attachment));
+ let mutex_clone = Arc::clone(&mutex);
+ APPLICATION_CONTEXT.set_local(move || {
+ return Arc::clone(&mutex_clone);
+ });
+ Some(Arc::clone(&mutex))
+ }
+ }
}
}
#[cfg(test)]
mod tests {
+ use tokio::time;
+
use super::*;
- use std::thread::sleep;
use std::time::Duration;
#[test]
@@ -96,25 +78,26 @@ mod tests {
let mut handles = Vec::with_capacity(10);
- for i in 0..10 {
+ for i in 0..=10 {
handles.push(rt.spawn(async move {
- let mut attachments = RpcContext::current().attachments;
- attachments.insert("key1".into(),
Arc::new(format!("data-{i}")));
-
- if i == 10 {
- attachments.insert("key2".into(), Arc::new(2));
- assert_eq!(attachments.len(), 2);
- } else {
- assert_eq!(attachments.len(), 1);
- }
+ if let Some(attachments) = RpcContext::get_attachments() {
+ let mut attachments = attachments.lock().unwrap();
+ attachments.insert("key1".into(),
Value::from(format!("data-{i}")));
+
+ assert!(attachments.len() > 0);
+ };
+
+ time::sleep(Duration::from_millis(1000)).await;
+
+ if let Some(attachments) = RpcContext::get_attachments() {
+ let attachments = attachments.lock().unwrap();
+ assert!(attachments.len() > 0);
+ };
}));
}
- sleep(Duration::from_millis(500));
-
for handle in handles {
rt.block_on(handle).unwrap();
}
- assert_eq!(RpcContext::current().attachments.len(), 0);
}
}