This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7227060 feat:tls (#104)
7227060 is described below
commit 72270600b1f359ea96ab6bf6efe5feef333d82c1
Author: baerwang <[email protected]>
AuthorDate: Tue Apr 11 08:32:24 2023 +0800
feat:tls (#104)
Co-authored-by: wangxiaoxiong <[email protected]>
Co-authored-by: Yang Yang <[email protected]>
---
dubbo/Cargo.toml | 6 ++++-
dubbo/src/triple/server/builder.rs | 38 ++++++++++++++++++++++++++-
dubbo/src/triple/transport/service.rs | 48 ++++++++++++++++++++++++++++++++---
dubbo/src/utils/mod.rs | 1 +
dubbo/src/utils/{mod.rs => tls.rs} | 21 +++++++++++++--
5 files changed, 106 insertions(+), 8 deletions(-)
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 5f79fe8..8b4b1aa 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -16,8 +16,12 @@ tower-service.workspace = true
http-body = "0.4.4"
tower = { workspace = true, features = ["timeout"] }
futures-util = "0.3.23"
+futures-core ="0.3.23"
+argh = "0.1"
+rustls-pemfile = "1.0.0"
+tokio-rustls="0.23.4"
+tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs",
"macros", "net", "signal", "full" ] }
futures-core = "0.3.23"
-tokio = { workspace = true, features = ["rt-multi-thread", "time", "fs",
"macros", "net", "signal"] }
prost = "0.10.4"
async-trait = "0.1.56"
tower-layer.workspace = true
diff --git a/dubbo/src/triple/server/builder.rs
b/dubbo/src/triple/server/builder.rs
index 15a7e93..c85f83b 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -17,6 +17,7 @@
use std::{
net::{SocketAddr, ToSocketAddrs},
+ path::Path,
str::FromStr,
};
@@ -25,13 +26,17 @@ use dubbo_logger::tracing;
use http::{Request, Response, Uri};
use hyper::body::Body;
use tower_service::Service;
+use tokio_rustls::rustls::{Certificate, PrivateKey};
-use crate::{triple::transport::DubboServer, BoxBody};
+use crate::{common::url::Url, triple::transport::DubboServer};
+use crate::{utils, BoxBody};
#[derive(Clone, Default, Debug)]
pub struct ServerBuilder {
pub listener: String,
pub addr: Option<SocketAddr>,
+ pub certs: Vec<Certificate>,
+ pub keys: Vec<PrivateKey>,
pub service_names: Vec<String>,
server: DubboServer,
}
@@ -45,6 +50,26 @@ impl ServerBuilder {
Self { listener, ..self }
}
+ pub fn with_tls(self, certs: &str, keys: &str) -> ServerBuilder {
+ Self {
+ certs: match utils::tls::load_certs(Path::new(certs)) {
+ Ok(v) => v,
+ Err(err) => {
+ tracing::error!("error loading tls certs {:?}", err);
+ Vec::new()
+ }
+ },
+ keys: match utils::tls::load_keys(Path::new(keys)) {
+ Ok(v) => v,
+ Err(err) => {
+ tracing::error!("error loading tls keys {:?}", err);
+ Vec::new()
+ }
+ },
+ ..self
+ }
+ }
+
pub fn with_addr(self, addr: &'static str) -> ServerBuilder {
Self {
addr: addr.to_socket_addrs().unwrap().next(),
@@ -61,6 +86,13 @@ impl ServerBuilder {
pub fn build(self) -> Self {
let mut server = self.server.with_listener(self.listener.clone());
+
+ {
+ if self.certs.len() != 0 && self.keys.len() != 0 {
+ server = server.with_tls(self.certs.clone(),
self.keys.clone());
+ }
+ }
+
{
let lock =
crate::protocol::triple::TRIPLE_SERVICES.read().unwrap();
for name in self.service_names.iter() {
@@ -73,6 +105,8 @@ impl ServerBuilder {
server = server.add_service(name.clone(), svc.clone());
}
}
+
+ {}
Self { server, ..self }
}
@@ -114,6 +148,8 @@ impl From<Url> for ServerBuilder {
addr: authority.to_string().to_socket_addrs().unwrap().next(),
service_names: vec![u.service_name],
server: DubboServer::default(),
+ certs: Vec::new(),
+ keys: Vec::new(),
}
}
}
diff --git a/dubbo/src/triple/transport/service.rs
b/dubbo/src/triple/transport/service.rs
index 8e86100..14afabf 100644
--- a/dubbo/src/triple/transport/service.rs
+++ b/dubbo/src/triple/transport/service.rs
@@ -15,7 +15,9 @@
* limitations under the License.
*/
+use std::io;
use std::net::SocketAddr;
+use std::sync::Arc;
use dubbo_logger::tracing;
use futures_core::Future;
@@ -23,8 +25,12 @@ use http::{Request, Response};
use hyper::body::Body;
use tokio::time::Duration;
use tower_service::Service;
+use tokio_rustls::rustls::{Certificate, PrivateKey};
+use tokio_rustls::{rustls, TlsAcceptor};
-use super::{listener::get_listener, router::DubboRouter};
+use super::listener::get_listener;
+use super::router::DubboRouter;
+use crate::triple::transport::io::BoxIO;
use crate::BoxBody;
#[derive(Default, Clone, Debug)]
@@ -38,6 +44,8 @@ pub struct DubboServer {
http2_keepalive_timeout: Option<Duration>,
router: DubboRouter,
listener: Option<String>,
+ certs: Vec<Certificate>,
+ keys: Vec<PrivateKey>,
}
impl DubboServer {
@@ -93,6 +101,14 @@ impl DubboServer {
..self
}
}
+
+ pub fn with_tls(self, certs: Vec<Certificate>, keys: Vec<PrivateKey>) ->
Self {
+ Self {
+ certs: certs,
+ keys: keys,
+ ..self
+ }
+ }
}
impl DubboServer {
@@ -107,6 +123,8 @@ impl DubboServer {
max_frame_size: None,
router: DubboRouter::new(),
listener: None,
+ certs: Vec::new(),
+ keys: Vec::new(),
}
}
}
@@ -147,10 +165,25 @@ impl DubboServer {
None => {
return Err(Box::new(crate::status::DubboError::new(
"listener name is empty".to_string(),
- )))
+ )));
}
};
+ let acceptor: Option<TlsAcceptor>;
+ if self.certs.len() != 0 && !self.keys.len() != 0 {
+ let mut keys = self.keys;
+
+ let config = rustls::ServerConfig::builder()
+ .with_safe_defaults()
+ .with_no_client_auth()
+ .with_single_cert(self.certs, keys.remove(0))
+ .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput,
err))?;
+
+ acceptor = Some(TlsAcceptor::from(Arc::new(config)));
+ } else {
+ acceptor = None;
+ }
+
let listener = match get_listener(name, addr).await {
Ok(v) => v,
Err(err) => return Err(err),
@@ -166,6 +199,14 @@ impl DubboServer {
match res {
Ok(conn) => {
let (io, local_addr) = conn;
+ let b :BoxIO;
+
+ if !acceptor.is_none() {
+ b =
BoxIO::new(acceptor.as_ref().unwrap().clone().accept(io).await?);
+ } else {
+ b = io;
+ }
+
tracing::debug!("hyper serve, local address:
{:?}", local_addr);
let c = hyper::server::conn::Http::new()
.http2_only(self.accept_http2)
@@ -175,10 +216,9 @@ impl DubboServer {
.http2_keep_alive_interval(self.http2_keepalive_interval)
.http2_keep_alive_timeout(http2_keepalive_timeout)
.http2_max_frame_size(self.max_frame_size)
- .serve_connection(io,
svc.clone()).with_upgrades();
+
.serve_connection(b,svc.clone()).with_upgrades();
tokio::spawn(c);
-
},
Err(err) => tracing::error!("hyper serve, err: {:?}",
err),
}
diff --git a/dubbo/src/utils/mod.rs b/dubbo/src/utils/mod.rs
index f088d72..e885a96 100644
--- a/dubbo/src/utils/mod.rs
+++ b/dubbo/src/utils/mod.rs
@@ -17,3 +17,4 @@
pub mod boxed;
pub mod boxed_clone;
+pub mod tls;
diff --git a/dubbo/src/utils/mod.rs b/dubbo/src/utils/tls.rs
similarity index 52%
copy from dubbo/src/utils/mod.rs
copy to dubbo/src/utils/tls.rs
index f088d72..0072bf2 100644
--- a/dubbo/src/utils/mod.rs
+++ b/dubbo/src/utils/tls.rs
@@ -15,5 +15,22 @@
* limitations under the License.
*/
-pub mod boxed;
-pub mod boxed_clone;
+use rustls_pemfile::{certs, rsa_private_keys};
+use std::{
+ fs::File,
+ io::{self, BufReader},
+ path::Path,
+};
+use tokio_rustls::rustls::{Certificate, PrivateKey};
+
+pub fn load_certs(path: &Path) -> io::Result<Vec<Certificate>> {
+ certs(&mut BufReader::new(File::open(path)?))
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid
cert"))
+ .map(|mut certs| certs.drain(..).map(Certificate).collect())
+}
+
+pub fn load_keys(path: &Path) -> io::Result<Vec<PrivateKey>> {
+ rsa_private_keys(&mut BufReader::new(File::open(path)?))
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid
key"))
+ .map(|mut keys| keys.drain(..).map(PrivateKey).collect())
+}