This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 71d261d5 feat: impl basic auth (#1531)
71d261d5 is described below
commit 71d261d5000b71eaffe19d4ae79f7d5cec1c614f
Author: 鲍金日 <[email protected]>
AuthorDate: Wed May 15 17:04:37 2024 +0800
feat: impl basic auth (#1531)
## Rationale
Close https://github.com/apache/incubator-horaedb/issues/929
## Detailed Changes
- Added file authentication
- Modify the query and write paths, and add authentication
## Test Plan
- Existed tests
- Manual tests
---------
Co-authored-by: jiacai2050 <[email protected]>
---
Cargo.lock | 1 +
src/proxy/Cargo.toml | 1 +
src/proxy/src/auth/mod.rs | 37 ++++++++
src/proxy/src/auth/with_file.rs | 136 +++++++++++++++++++++++++++++
src/proxy/src/context.rs | 9 ++
src/proxy/src/forward.rs | 13 ++-
src/proxy/src/grpc/prom_query.rs | 1 +
src/proxy/src/grpc/sql_query.rs | 1 +
src/proxy/src/http/prom.rs | 19 ++--
src/proxy/src/http/sql.rs | 2 +-
src/proxy/src/influxdb/mod.rs | 2 +-
src/proxy/src/lib.rs | 20 ++++-
src/proxy/src/opentsdb/mod.rs | 2 +-
src/proxy/src/read.rs | 1 +
src/proxy/src/write.rs | 1 +
src/server/src/config.rs | 6 +-
src/server/src/grpc/mod.rs | 18 +++-
src/server/src/grpc/storage_service/mod.rs | 48 ++++++++--
src/server/src/http.rs | 19 +++-
src/server/src/server.rs | 18 ++++
20 files changed, 324 insertions(+), 31 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 8064a066..04edee33 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5430,6 +5430,7 @@ dependencies = [
"arrow 49.0.0",
"arrow_ext",
"async-trait",
+ "base64 0.13.1",
"bytes",
"catalog",
"clru",
diff --git a/src/proxy/Cargo.toml b/src/proxy/Cargo.toml
index 1f66b131..c6831b36 100644
--- a/src/proxy/Cargo.toml
+++ b/src/proxy/Cargo.toml
@@ -34,6 +34,7 @@ workspace = true
arrow = { workspace = true }
arrow_ext = { workspace = true }
async-trait = { workspace = true }
+base64 = { workspace = true }
bytes = { workspace = true }
catalog = { workspace = true }
clru = { workspace = true }
diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs
new file mode 100644
index 00000000..b0b52691
--- /dev/null
+++ b/src/proxy/src/auth/mod.rs
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+pub mod with_file;
+
+/// Header of authorization
+pub const AUTHORIZATION: &str = "authorization";
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub enum AuthType {
+ #[default]
+ #[serde(rename = "file")]
+ File,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub struct Config {
+ pub enable: bool,
+ pub auth_type: AuthType,
+ pub source: String,
+}
diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs
new file mode 100644
index 00000000..116005ce
--- /dev/null
+++ b/src/proxy/src/auth/with_file.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::HashMap,
+ fs::File,
+ io::{self, BufRead},
+ path::Path,
+};
+
+use generic_error::BoxError;
+use snafu::{OptionExt, ResultExt};
+use tonic::service::Interceptor;
+
+use crate::{
+ auth::AUTHORIZATION,
+ error::{Internal, InternalNoCause, Result},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct AuthWithFile {
+ enable: bool,
+ file_path: String,
+ // name -> password
+ users: HashMap<String, String>,
+}
+
+impl AuthWithFile {
+ pub fn new(enable: bool, file_path: String) -> Self {
+ Self {
+ enable,
+ file_path,
+ users: HashMap::new(),
+ }
+ }
+
+ // Load a csv format config
+ pub fn load_credential(&mut self) -> Result<()> {
+ if !self.enable {
+ return Ok(());
+ }
+
+ let path = Path::new(&self.file_path);
+ if !path.exists() {
+ return InternalNoCause {
+ msg: format!("file not existed: {:?}", path),
+ }
+ .fail();
+ }
+
+ let file = File::open(path).box_err().context(Internal {
+ msg: "failed to open file",
+ })?;
+ let reader = io::BufReader::new(file);
+
+ for line in reader.lines() {
+ let line = line.box_err().context(Internal {
+ msg: "failed to read line",
+ })?;
+ let (username, password) = line.split_once(',').with_context(||
InternalNoCause {
+ msg: format!("invalid line: {:?}", line),
+ })?;
+ self.users
+ .insert(username.to_string(), password.to_string());
+ }
+
+ Ok(())
+ }
+
+ // TODO: currently we only support basic auth
+ // This function should return Result
+ pub fn identify(&self, input: Option<String>) -> bool {
+ if !self.enable {
+ return true;
+ }
+
+ let input = match input {
+ Some(v) => v,
+ None => return false,
+ };
+ let input = match input.split_once("Basic ") {
+ Some((_, encoded)) => match base64::decode(encoded) {
+ Ok(v) => v,
+ Err(_e) => return false,
+ },
+ None => return false,
+ };
+ let input = match std::str::from_utf8(&input) {
+ Ok(v) => v,
+ Err(_e) => return false,
+ };
+ match input.split_once(':') {
+ Some((user, pass)) => self
+ .users
+ .get(user)
+ .map(|expected| expected == pass)
+ .unwrap_or_default(),
+ None => false,
+ }
+ }
+}
+
+pub fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
+ req.metadata()
+ .get(AUTHORIZATION)
+ .and_then(|value| value.to_str().ok().map(String::from))
+}
+
+impl Interceptor for AuthWithFile {
+ fn call(
+ &mut self,
+ request: tonic::Request<()>,
+ ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
+ // TODO: extract username from request
+ let authorization = get_authorization(&request);
+ if self.identify(authorization) {
+ Ok(request)
+ } else {
+ Err(tonic::Status::unauthenticated("unauthenticated"))
+ }
+ }
+}
diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs
index cb2fcb67..02aa48c9 100644
--- a/src/proxy/src/context.rs
+++ b/src/proxy/src/context.rs
@@ -56,6 +56,8 @@ pub struct RequestContext {
pub timeout: Option<Duration>,
/// Request id
pub request_id: RequestId,
+ /// authorization
+ pub authorization: Option<String>,
}
impl RequestContext {
@@ -69,6 +71,7 @@ pub struct Builder {
catalog: String,
schema: String,
timeout: Option<Duration>,
+ authorization: Option<String>,
}
impl Builder {
@@ -87,6 +90,11 @@ impl Builder {
self
}
+ pub fn authorization(mut self, authorization: Option<String>) -> Self {
+ self.authorization = authorization;
+ self
+ }
+
pub fn build(self) -> Result<RequestContext> {
ensure!(!self.catalog.is_empty(), MissingCatalog);
ensure!(!self.schema.is_empty(), MissingSchema);
@@ -96,6 +104,7 @@ impl Builder {
schema: self.schema,
timeout: self.timeout,
request_id: RequestId::next_id(),
+ authorization: self.authorization,
})
}
}
diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs
index 0d8a856f..93d0dae9 100644
--- a/src/proxy/src/forward.rs
+++ b/src/proxy/src/forward.rs
@@ -37,7 +37,7 @@ use tonic::{
transport::{self, Channel},
};
-use crate::FORWARDED_FROM;
+use crate::{auth::AUTHORIZATION, FORWARDED_FROM};
#[derive(Debug, Snafu)]
pub enum Error {
@@ -206,6 +206,7 @@ pub struct ForwardRequest<Req> {
pub table: String,
pub req: tonic::Request<Req>,
pub forwarded_from: Option<String>,
+ pub authorization: Option<String>,
}
impl Forwarder<DefaultClientBuilder> {
@@ -283,6 +284,7 @@ impl<B: ClientBuilder> Forwarder<B> {
table,
req,
forwarded_from,
+ authorization,
} = forward_req;
let req_pb = RouteRequestPb {
@@ -309,7 +311,7 @@ impl<B: ClientBuilder> Forwarder<B> {
}
};
- self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc)
+ self.forward_with_endpoint(endpoint, req, forwarded_from,
authorization, do_rpc)
.await
}
@@ -318,6 +320,7 @@ impl<B: ClientBuilder> Forwarder<B> {
endpoint: Endpoint,
mut req: tonic::Request<Req>,
forwarded_from: Option<String>,
+ authorization: Option<String>,
do_rpc: F,
) -> Result<ForwardResult<Resp, Err>>
where
@@ -351,6 +354,11 @@ impl<B: ClientBuilder> Forwarder<B> {
self.local_endpoint.to_string().parse().unwrap(),
);
+ if let Some(authorization) = authorization {
+ req.metadata_mut()
+ .insert(AUTHORIZATION, authorization.parse().unwrap());
+ }
+
let client = self.get_or_create_client(&endpoint).await?;
match do_rpc(client, req, &endpoint).await {
Err(e) => {
@@ -503,6 +511,7 @@ mod tests {
table: table.to_string(),
req: query_request.into_request(),
forwarded_from: None,
+ authorization: None,
}
};
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 673b6131..e596f61a 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -81,6 +81,7 @@ impl Proxy {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;
+
let schema = req_ctx.database;
let catalog = self.instance.catalog_manager.default_catalog_name();
diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs
index 4a2a5d80..16c62017 100644
--- a/src/proxy/src/grpc/sql_query.rs
+++ b/src/proxy/src/grpc/sql_query.rs
@@ -227,6 +227,7 @@ impl Proxy {
table: req.tables[0].clone(),
req: req.clone().into_request(),
forwarded_from: ctx.forwarded_from.clone(),
+ authorization: ctx.authorization.clone(),
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs
index c113b3e7..847da23d 100644
--- a/src/proxy/src/http/prom.rs
+++ b/src/proxy/src/http/prom.rs
@@ -19,11 +19,7 @@
//! It converts write request to gRPC write request, and
//! translates query request to SQL for execution.
-use std::{
- collections::HashMap,
- result::Result as StdResult,
- time::{Duration, Instant},
-};
+use std::{collections::HashMap, result::Result as StdResult, time::Instant};
use async_trait::async_trait;
use catalog::consts::DEFAULT_CATALOG;
@@ -83,7 +79,7 @@ impl Proxy {
}),
table_requests: write_table_requests,
};
- let ctx = ProxyContext::new(ctx.timeout, None);
+ let ctx = ProxyContext::new(ctx.timeout, None, ctx.authorization);
match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
@@ -178,14 +174,14 @@ impl Proxy {
/// another HoraeDB instance.
pub async fn handle_prom_grpc_query(
&self,
- timeout: Option<Duration>,
+ ctx: ProxyContext,
req: PrometheusRemoteQueryRequest,
) -> Result<PrometheusRemoteQueryResponse> {
- let ctx = req.context.context(ErrNoCause {
+ let req_ctx = req.context.context(ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "request context is missing",
})?;
- let database = ctx.database.to_string();
+ let database = req_ctx.database.to_string();
let query = Query::decode(req.query.as_ref())
.box_err()
.context(Internal {
@@ -193,7 +189,8 @@ impl Proxy {
})?;
let metric = find_metric(&query.matchers)?;
let builder = RequestContext::builder()
- .timeout(timeout)
+ .timeout(ctx.timeout)
+ .authorization(ctx.authorization)
.schema(database)
// TODO: support different catalog
.catalog(DEFAULT_CATALOG.to_string());
@@ -235,7 +232,7 @@ impl RemoteStorage for Proxy {
query: query.encode_to_vec(),
};
if let Some(resp) = self
- .maybe_forward_prom_remote_query(metric.clone(), remote_req)
+ .maybe_forward_prom_remote_query(ctx, metric.clone(),
remote_req)
.await
.map_err(|e| {
error!("Forward prom remote query failed, err:{e}");
diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs
index 1b1fffdc..8f61039e 100644
--- a/src/proxy/src/http/sql.rs
+++ b/src/proxy/src/http/sql.rs
@@ -50,7 +50,7 @@ impl Proxy {
req: Request,
) -> Result<Output> {
let schema = &ctx.schema;
- let ctx = Context::new(ctx.timeout, None);
+ let ctx = Context::new(ctx.timeout, None, ctx.authorization.clone());
let query_res = self
.handle_sql(
diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs
index 8c8346df..3e244786 100644
--- a/src/proxy/src/influxdb/mod.rs
+++ b/src/proxy/src/influxdb/mod.rs
@@ -81,7 +81,7 @@ impl Proxy {
}),
table_requests: write_table_requests,
};
- let proxy_context = Context::new(ctx.timeout, None);
+ let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
match self
.handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs
index de07fc2d..54881640 100644
--- a/src/proxy/src/lib.rs
+++ b/src/proxy/src/lib.rs
@@ -20,6 +20,7 @@
#![feature(trait_alias)]
+pub mod auth;
pub mod context;
pub mod error;
mod error_util;
@@ -80,6 +81,8 @@ use table_engine::{
use tonic::{transport::Channel, IntoRequest};
use crate::{
+ auth::with_file::AuthWithFile,
+ context::RequestContext,
error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
hotspot::HotspotRecorder,
@@ -105,6 +108,7 @@ impl Default for SubTableAccessPerm {
}
pub struct Proxy {
+ auth: AuthWithFile,
router: Arc<dyn Router + Send + Sync>,
forwarder: ForwarderRef,
instance: InstanceRef,
@@ -122,6 +126,7 @@ pub struct Proxy {
impl Proxy {
#[allow(clippy::too_many_arguments)]
pub fn new(
+ auth: AuthWithFile,
router: Arc<dyn Router + Send + Sync>,
instance: InstanceRef,
forward_config: forward::Config,
@@ -143,6 +148,7 @@ impl Proxy {
));
Self {
+ auth,
router,
instance,
forwarder,
@@ -168,6 +174,7 @@ impl Proxy {
async fn maybe_forward_prom_remote_query(
&self,
+ ctx: &RequestContext,
metric: String,
req: PrometheusRemoteQueryRequest,
) -> Result<Option<ForwardResult<PrometheusRemoteQueryResponse, Error>>> {
@@ -177,6 +184,7 @@ impl Proxy {
table: metric,
req: req.into_request(),
forwarded_from: None,
+ authorization: ctx.authorization.clone(),
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<PrometheusRemoteQueryRequest>,
@@ -529,6 +537,10 @@ impl Proxy {
})
}
}
+
+ pub fn check_auth(&self, authorization: Option<String>) -> bool {
+ self.auth.identify(authorization)
+ }
}
#[derive(Clone, Debug)]
@@ -536,14 +548,20 @@ pub struct Context {
request_id: RequestId,
timeout: Option<Duration>,
forwarded_from: Option<String>,
+ authorization: Option<String>,
}
impl Context {
- pub fn new(timeout: Option<Duration>, forwarded_from: Option<String>) ->
Self {
+ pub fn new(
+ timeout: Option<Duration>,
+ forwarded_from: Option<String>,
+ authorization: Option<String>,
+ ) -> Self {
Self {
request_id: RequestId::next_id(),
timeout,
forwarded_from,
+ authorization,
}
}
}
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index aae4a4a2..80affd03 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -69,7 +69,7 @@ impl Proxy {
}),
table_requests: write_table_requests,
};
- let proxy_context = Context::new(ctx.timeout, None);
+ let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
match self
.handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs
index a34875d1..67e1473d 100644
--- a/src/proxy/src/read.rs
+++ b/src/proxy/src/read.rs
@@ -316,6 +316,7 @@ impl Proxy {
table: table_name.unwrap(),
req: sql_request.into_request(),
forwarded_from: ctx.forwarded_from,
+ authorization: ctx.authorization,
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 1e5ae356..fd8e54bd 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -453,6 +453,7 @@ impl Proxy {
endpoint,
tonic::Request::new(table_write_request),
ctx.forwarded_from,
+ ctx.authorization,
do_write,
)
.await;
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
index 054b1532..b07cef95 100644
--- a/src/server/src/config.rs
+++ b/src/server/src/config.rs
@@ -25,7 +25,7 @@ use std::{
use cluster::config::SchemaConfig;
use common_types::schema::TIMESTAMP_COLUMN;
use meta_client::types::ShardId;
-use proxy::{forward, hotspot, SubTableAccessPerm};
+use proxy::{auth, forward, hotspot, SubTableAccessPerm};
use router::{
endpoint::Endpoint,
rule_based::{ClusterView, RuleList},
@@ -141,6 +141,9 @@ pub struct ServerConfig {
/// The minimum length of the response body to compress.
pub resp_compress_min_length: ReadableSize,
+ /// Auth config
+ pub auth: auth::Config,
+
/// Config for forwarding
pub forward: forward::Config,
@@ -178,6 +181,7 @@ impl Default for ServerConfig {
http_max_body_size: ReadableSize::mb(64),
grpc_server_cq_count: 20,
resp_compress_min_length: ReadableSize::mb(4),
+ auth: auth::Config::default(),
forward: forward::Config::default(),
auto_create_table: true,
default_schema_config: Default::default(),
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 1c53f205..7b02a3a2 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -37,6 +37,7 @@ use logger::{info, warn};
use macros::define_result;
use notifier::notifier::RequestNotifiers;
use proxy::{
+ auth::with_file::AuthWithFile,
forward,
hotspot::HotspotRecorder,
instance::InstanceRef,
@@ -47,7 +48,7 @@ use runtime::{JoinHandle, Runtime};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Sender};
-use tonic::transport::Server;
+use tonic::{codegen::InterceptedService, transport::Server};
use wal::manager::OpenedWals;
use self::remote_engine_service::QueryDedup;
@@ -113,6 +114,9 @@ pub enum Error {
#[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))]
MissingHotspotRecorder { backtrace: Backtrace },
+ #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))]
+ MissingAuth { backtrace: Backtrace },
+
#[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))]
ParseCatalogName {
source: std::string::FromUtf8Error,
@@ -158,7 +162,7 @@ define_result!(Error);
/// Rpc services manages all grpc services of the server.
pub struct RpcServices {
serve_addr: SocketAddr,
- rpc_server: StorageServiceServer<StorageServiceImpl>,
+ rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>,
AuthWithFile>,
meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
runtime: Arc<Runtime>,
@@ -212,6 +216,7 @@ impl RpcServices {
}
pub struct Builder {
+ auth: Option<AuthWithFile>,
endpoint: String,
timeout: Option<Duration>,
runtimes: Option<Arc<EngineRuntimes>>,
@@ -226,6 +231,7 @@ pub struct Builder {
impl Builder {
pub fn new() -> Self {
Self {
+ auth: None,
endpoint: "0.0.0.0:8381".to_string(),
timeout: None,
runtimes: None,
@@ -238,6 +244,11 @@ impl Builder {
}
}
+ pub fn auth(mut self, auth: AuthWithFile) -> Self {
+ self.auth = Some(auth);
+ self
+ }
+
pub fn endpoint(mut self, endpoint: String) -> Self {
self.endpoint = endpoint;
self
@@ -287,6 +298,7 @@ impl Builder {
impl Builder {
pub fn build(self) -> Result<RpcServices> {
+ let auth = self.auth.context(MissingAuth)?;
let runtimes = self.runtimes.context(MissingRuntimes)?;
let instance = self.instance.context(MissingInstance)?;
let opened_wals = self.opened_wals.context(MissingWals)?;
@@ -330,7 +342,7 @@ impl Builder {
runtimes,
timeout: self.timeout,
};
- let rpc_server = StorageServiceServer::new(storage_service);
+ let rpc_server =
StorageServiceServer::with_interceptor(storage_service, auth);
let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?;
diff --git a/src/server/src/grpc/storage_service/mod.rs
b/src/server/src/grpc/storage_service/mod.rs
index 142c5e23..9cf1fa22 100644
--- a/src/server/src/grpc/storage_service/mod.rs
+++ b/src/server/src/grpc/storage_service/mod.rs
@@ -35,7 +35,7 @@ use horaedbproto::{
},
};
use http::StatusCode;
-use proxy::{Context, Proxy, FORWARDED_FROM};
+use proxy::{auth::with_file::get_authorization, Context, Proxy,
FORWARDED_FROM};
use table_engine::engine::EngineRuntimes;
use time_ext::InstantExt;
@@ -148,7 +148,11 @@ impl StorageService for StorageServiceImpl {
) -> Result<tonic::Response<Self::StreamSqlQueryStream>, tonic::Status> {
let begin_instant = Instant::now();
let proxy = self.proxy.clone();
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let stream = self.stream_sql_query_internal(ctx, proxy, req).await;
@@ -172,7 +176,11 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<RouteRequest>,
) -> Result<tonic::Response<RouteResponse>, tonic::Status> {
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let req = req.into_inner();
let proxy = self.proxy.clone();
@@ -199,7 +207,11 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let req = req.into_inner();
let proxy = self.proxy.clone();
@@ -236,7 +248,11 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<SqlQueryRequest>,
) -> Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let proxy = self.proxy.clone();
let join_handle = self
@@ -262,11 +278,17 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<PrometheusRemoteQueryRequest>,
) -> Result<tonic::Response<PrometheusRemoteQueryResponse>, tonic::Status>
{
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
+
let req = req.into_inner();
let proxy = self.proxy.clone();
- let timeout = self.timeout;
+
let join_handle = self.runtimes.read_runtime.spawn(async move {
- match proxy.handle_prom_grpc_query(timeout, req).await {
+ match proxy.handle_prom_grpc_query(ctx, req).await {
Ok(v) => v,
Err(e) => PrometheusRemoteQueryResponse {
header: Some(error::build_err_header(
@@ -295,7 +317,11 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<PrometheusQueryRequest>,
) -> Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let req = req.into_inner();
let proxy = self.proxy.clone();
@@ -331,7 +357,11 @@ impl StorageServiceImpl {
&self,
req: tonic::Request<tonic::Streaming<WriteRequest>>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
- let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+ let ctx = Context::new(
+ self.timeout,
+ get_forwarded_from(&req),
+ get_authorization(&req),
+ );
let mut stream = req.into_inner();
let proxy = self.proxy.clone();
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 95ce7a18..da74ee84 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,6 +37,7 @@ use macros::define_result;
use profile::Profiler;
use prom_remote_api::web;
use proxy::{
+ auth::AUTHORIZATION,
context::RequestContext,
handlers::{self},
http::sql::{convert_output, Request},
@@ -148,6 +149,9 @@ pub enum Error {
#[snafu(display("Querying shards is only supported in cluster mode"))]
QueryShards {},
+
+ #[snafu(display("unauthenticated.\nBacktrace:\n{}", backtrace))]
+ UnAuthenticated { backtrace: Backtrace },
}
define_result!(Error);
@@ -729,20 +733,32 @@ impl Service {
.default_schema_name()
.to_string();
let timeout = self.config.timeout;
+ let proxy = self.proxy.clone();
header::optional::<String>(consts::CATALOG_HEADER)
.and(header::optional::<String>(consts::SCHEMA_HEADER))
.and(header::optional::<String>(consts::TENANT_HEADER))
+ .and(header::optional::<String>(AUTHORIZATION))
.and_then(
- move |catalog: Option<_>, schema: Option<_>, _tenant:
Option<_>| {
+ move |catalog: Option<_>,
+ schema: Option<_>,
+ _tenant: Option<_>,
+ authorization: Option<_>| {
// Clone the captured variables
let default_catalog = default_catalog.clone();
let schema = schema.unwrap_or_else(||
default_schema.clone());
+ let proxy = proxy.clone();
+
async move {
+ if !proxy.check_auth(authorization.clone()) {
+ return
UnAuthenticated.fail().map_err(reject::custom);
+ }
+
RequestContext::builder()
.catalog(catalog.unwrap_or(default_catalog))
.schema(schema)
.timeout(timeout)
+ .authorization(authorization)
.build()
.context(CreateContext)
.map_err(reject::custom)
@@ -919,6 +935,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
| Error::QueryShards { .. } => StatusCode::BAD_REQUEST,
Error::HandleUpdateLogLevel { .. } =>
StatusCode::INTERNAL_SERVER_ERROR,
Error::QueryMaybeExceedTTL { .. } => StatusCode::OK,
+ Error::UnAuthenticated { .. } => StatusCode::UNAUTHORIZED,
}
}
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index ddc151c8..f7cd72ec 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -29,6 +29,7 @@ use macros::define_result;
use notifier::notifier::RequestNotifiers;
use partition_table_engine::PartitionTableEngine;
use proxy::{
+ auth::{with_file::AuthWithFile, AuthType},
hotspot::HotspotRecorder,
instance::{DynamicConfig, Instance, InstanceRef},
limiter::Limiter,
@@ -135,6 +136,9 @@ pub enum Error {
#[snafu(display("Failed to build query engine, err:{source}"))]
BuildQueryEngine { source: query_engine::error::Error },
+
+ #[snafu(display("Failed to load auth credential, err:{source}"))]
+ LoadCredential { source: proxy::error::Error },
}
define_result!(Error);
@@ -451,7 +455,20 @@ impl Builder {
.enable
.then(|| Arc::new(RequestNotifiers::default()));
+ // Build auth
+ let mut auth = if self.server_config.auth.enable {
+ match self.server_config.auth.auth_type {
+ AuthType::File => AuthWithFile::new(true,
self.server_config.auth.source.clone()),
+ }
+ } else {
+ AuthWithFile::default()
+ };
+
+ // Load auth credential
+ auth.load_credential().context(LoadCredential)?;
+
let proxy = Arc::new(Proxy::new(
+ auth.clone(),
router.clone(),
instance.clone(),
self.server_config.forward,
@@ -500,6 +517,7 @@ impl Builder {
.context(BuildPostgresqlService)?;
let rpc_services = grpc::Builder::new()
+ .auth(auth)
.endpoint(grpc_endpoint.to_string())
.runtimes(engine_runtimes)
.instance(instance.clone())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]