This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 71d261d5 feat: impl basic auth (#1531)
71d261d5 is described below

commit 71d261d5000b71eaffe19d4ae79f7d5cec1c614f
Author: 鲍金日 <[email protected]>
AuthorDate: Wed May 15 17:04:37 2024 +0800

    feat: impl basic auth (#1531)
    
    ## Rationale
    Close https://github.com/apache/incubator-horaedb/issues/929
    
    ## Detailed Changes
    - Added file authentication
    - Modify the query and write paths, and add authentication
    
    ## Test Plan
    - Existed tests
    - Manual tests
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                 |   1 +
 src/proxy/Cargo.toml                       |   1 +
 src/proxy/src/auth/mod.rs                  |  37 ++++++++
 src/proxy/src/auth/with_file.rs            | 136 +++++++++++++++++++++++++++++
 src/proxy/src/context.rs                   |   9 ++
 src/proxy/src/forward.rs                   |  13 ++-
 src/proxy/src/grpc/prom_query.rs           |   1 +
 src/proxy/src/grpc/sql_query.rs            |   1 +
 src/proxy/src/http/prom.rs                 |  19 ++--
 src/proxy/src/http/sql.rs                  |   2 +-
 src/proxy/src/influxdb/mod.rs              |   2 +-
 src/proxy/src/lib.rs                       |  20 ++++-
 src/proxy/src/opentsdb/mod.rs              |   2 +-
 src/proxy/src/read.rs                      |   1 +
 src/proxy/src/write.rs                     |   1 +
 src/server/src/config.rs                   |   6 +-
 src/server/src/grpc/mod.rs                 |  18 +++-
 src/server/src/grpc/storage_service/mod.rs |  48 ++++++++--
 src/server/src/http.rs                     |  19 +++-
 src/server/src/server.rs                   |  18 ++++
 20 files changed, 324 insertions(+), 31 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8064a066..04edee33 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5430,6 +5430,7 @@ dependencies = [
  "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
+ "base64 0.13.1",
  "bytes",
  "catalog",
  "clru",
diff --git a/src/proxy/Cargo.toml b/src/proxy/Cargo.toml
index 1f66b131..c6831b36 100644
--- a/src/proxy/Cargo.toml
+++ b/src/proxy/Cargo.toml
@@ -34,6 +34,7 @@ workspace = true
 arrow = { workspace = true }
 arrow_ext = { workspace = true }
 async-trait = { workspace = true }
+base64 = { workspace = true }
 bytes = { workspace = true }
 catalog = { workspace = true }
 clru = { workspace = true }
diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs
new file mode 100644
index 00000000..b0b52691
--- /dev/null
+++ b/src/proxy/src/auth/mod.rs
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+pub mod with_file;
+
+/// Header of authorization
+pub const AUTHORIZATION: &str = "authorization";
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub enum AuthType {
+    #[default]
+    #[serde(rename = "file")]
+    File,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub struct Config {
+    pub enable: bool,
+    pub auth_type: AuthType,
+    pub source: String,
+}
diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs
new file mode 100644
index 00000000..116005ce
--- /dev/null
+++ b/src/proxy/src/auth/with_file.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::HashMap,
+    fs::File,
+    io::{self, BufRead},
+    path::Path,
+};
+
+use generic_error::BoxError;
+use snafu::{OptionExt, ResultExt};
+use tonic::service::Interceptor;
+
+use crate::{
+    auth::AUTHORIZATION,
+    error::{Internal, InternalNoCause, Result},
+};
+
+#[derive(Debug, Clone, Default)]
+pub struct AuthWithFile {
+    enable: bool,
+    file_path: String,
+    // name -> password
+    users: HashMap<String, String>,
+}
+
+impl AuthWithFile {
+    pub fn new(enable: bool, file_path: String) -> Self {
+        Self {
+            enable,
+            file_path,
+            users: HashMap::new(),
+        }
+    }
+
+    // Load a csv format config
+    pub fn load_credential(&mut self) -> Result<()> {
+        if !self.enable {
+            return Ok(());
+        }
+
+        let path = Path::new(&self.file_path);
+        if !path.exists() {
+            return InternalNoCause {
+                msg: format!("file not existed: {:?}", path),
+            }
+            .fail();
+        }
+
+        let file = File::open(path).box_err().context(Internal {
+            msg: "failed to open file",
+        })?;
+        let reader = io::BufReader::new(file);
+
+        for line in reader.lines() {
+            let line = line.box_err().context(Internal {
+                msg: "failed to read line",
+            })?;
+            let (username, password) = line.split_once(',').with_context(|| 
InternalNoCause {
+                msg: format!("invalid line: {:?}", line),
+            })?;
+            self.users
+                .insert(username.to_string(), password.to_string());
+        }
+
+        Ok(())
+    }
+
+    // TODO: currently we only support basic auth
+    // This function should return Result
+    pub fn identify(&self, input: Option<String>) -> bool {
+        if !self.enable {
+            return true;
+        }
+
+        let input = match input {
+            Some(v) => v,
+            None => return false,
+        };
+        let input = match input.split_once("Basic ") {
+            Some((_, encoded)) => match base64::decode(encoded) {
+                Ok(v) => v,
+                Err(_e) => return false,
+            },
+            None => return false,
+        };
+        let input = match std::str::from_utf8(&input) {
+            Ok(v) => v,
+            Err(_e) => return false,
+        };
+        match input.split_once(':') {
+            Some((user, pass)) => self
+                .users
+                .get(user)
+                .map(|expected| expected == pass)
+                .unwrap_or_default(),
+            None => false,
+        }
+    }
+}
+
+pub fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
+    req.metadata()
+        .get(AUTHORIZATION)
+        .and_then(|value| value.to_str().ok().map(String::from))
+}
+
+impl Interceptor for AuthWithFile {
+    fn call(
+        &mut self,
+        request: tonic::Request<()>,
+    ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
+        // TODO: extract username from request
+        let authorization = get_authorization(&request);
+        if self.identify(authorization) {
+            Ok(request)
+        } else {
+            Err(tonic::Status::unauthenticated("unauthenticated"))
+        }
+    }
+}
diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs
index cb2fcb67..02aa48c9 100644
--- a/src/proxy/src/context.rs
+++ b/src/proxy/src/context.rs
@@ -56,6 +56,8 @@ pub struct RequestContext {
     pub timeout: Option<Duration>,
     /// Request id
     pub request_id: RequestId,
+    /// authorization
+    pub authorization: Option<String>,
 }
 
 impl RequestContext {
@@ -69,6 +71,7 @@ pub struct Builder {
     catalog: String,
     schema: String,
     timeout: Option<Duration>,
+    authorization: Option<String>,
 }
 
 impl Builder {
@@ -87,6 +90,11 @@ impl Builder {
         self
     }
 
+    pub fn authorization(mut self, authorization: Option<String>) -> Self {
+        self.authorization = authorization;
+        self
+    }
+
     pub fn build(self) -> Result<RequestContext> {
         ensure!(!self.catalog.is_empty(), MissingCatalog);
         ensure!(!self.schema.is_empty(), MissingSchema);
@@ -96,6 +104,7 @@ impl Builder {
             schema: self.schema,
             timeout: self.timeout,
             request_id: RequestId::next_id(),
+            authorization: self.authorization,
         })
     }
 }
diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs
index 0d8a856f..93d0dae9 100644
--- a/src/proxy/src/forward.rs
+++ b/src/proxy/src/forward.rs
@@ -37,7 +37,7 @@ use tonic::{
     transport::{self, Channel},
 };
 
-use crate::FORWARDED_FROM;
+use crate::{auth::AUTHORIZATION, FORWARDED_FROM};
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -206,6 +206,7 @@ pub struct ForwardRequest<Req> {
     pub table: String,
     pub req: tonic::Request<Req>,
     pub forwarded_from: Option<String>,
+    pub authorization: Option<String>,
 }
 
 impl Forwarder<DefaultClientBuilder> {
@@ -283,6 +284,7 @@ impl<B: ClientBuilder> Forwarder<B> {
             table,
             req,
             forwarded_from,
+            authorization,
         } = forward_req;
 
         let req_pb = RouteRequestPb {
@@ -309,7 +311,7 @@ impl<B: ClientBuilder> Forwarder<B> {
             }
         };
 
-        self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc)
+        self.forward_with_endpoint(endpoint, req, forwarded_from, 
authorization, do_rpc)
             .await
     }
 
@@ -318,6 +320,7 @@ impl<B: ClientBuilder> Forwarder<B> {
         endpoint: Endpoint,
         mut req: tonic::Request<Req>,
         forwarded_from: Option<String>,
+        authorization: Option<String>,
         do_rpc: F,
     ) -> Result<ForwardResult<Resp, Err>>
     where
@@ -351,6 +354,11 @@ impl<B: ClientBuilder> Forwarder<B> {
             self.local_endpoint.to_string().parse().unwrap(),
         );
 
+        if let Some(authorization) = authorization {
+            req.metadata_mut()
+                .insert(AUTHORIZATION, authorization.parse().unwrap());
+        }
+
         let client = self.get_or_create_client(&endpoint).await?;
         match do_rpc(client, req, &endpoint).await {
             Err(e) => {
@@ -503,6 +511,7 @@ mod tests {
                 table: table.to_string(),
                 req: query_request.into_request(),
                 forwarded_from: None,
+                authorization: None,
             }
         };
 
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 673b6131..e596f61a 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -81,6 +81,7 @@ impl Proxy {
             msg: "Missing context",
             code: StatusCode::BAD_REQUEST,
         })?;
+
         let schema = req_ctx.database;
         let catalog = self.instance.catalog_manager.default_catalog_name();
 
diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs
index 4a2a5d80..16c62017 100644
--- a/src/proxy/src/grpc/sql_query.rs
+++ b/src/proxy/src/grpc/sql_query.rs
@@ -227,6 +227,7 @@ impl Proxy {
             table: req.tables[0].clone(),
             req: req.clone().into_request(),
             forwarded_from: ctx.forwarded_from.clone(),
+            authorization: ctx.authorization.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs
index c113b3e7..847da23d 100644
--- a/src/proxy/src/http/prom.rs
+++ b/src/proxy/src/http/prom.rs
@@ -19,11 +19,7 @@
 //! It converts write request to gRPC write request, and
 //! translates query request to SQL for execution.
 
-use std::{
-    collections::HashMap,
-    result::Result as StdResult,
-    time::{Duration, Instant},
-};
+use std::{collections::HashMap, result::Result as StdResult, time::Instant};
 
 use async_trait::async_trait;
 use catalog::consts::DEFAULT_CATALOG;
@@ -83,7 +79,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let ctx = ProxyContext::new(ctx.timeout, None);
+        let ctx = ProxyContext::new(ctx.timeout, None, ctx.authorization);
 
         match self.handle_write_internal(ctx, table_request).await {
             Ok(result) => {
@@ -178,14 +174,14 @@ impl Proxy {
     /// another HoraeDB instance.
     pub async fn handle_prom_grpc_query(
         &self,
-        timeout: Option<Duration>,
+        ctx: ProxyContext,
         req: PrometheusRemoteQueryRequest,
     ) -> Result<PrometheusRemoteQueryResponse> {
-        let ctx = req.context.context(ErrNoCause {
+        let req_ctx = req.context.context(ErrNoCause {
             code: StatusCode::BAD_REQUEST,
             msg: "request context is missing",
         })?;
-        let database = ctx.database.to_string();
+        let database = req_ctx.database.to_string();
         let query = Query::decode(req.query.as_ref())
             .box_err()
             .context(Internal {
@@ -193,7 +189,8 @@ impl Proxy {
             })?;
         let metric = find_metric(&query.matchers)?;
         let builder = RequestContext::builder()
-            .timeout(timeout)
+            .timeout(ctx.timeout)
+            .authorization(ctx.authorization)
             .schema(database)
             // TODO: support different catalog
             .catalog(DEFAULT_CATALOG.to_string());
@@ -235,7 +232,7 @@ impl RemoteStorage for Proxy {
                 query: query.encode_to_vec(),
             };
             if let Some(resp) = self
-                .maybe_forward_prom_remote_query(metric.clone(), remote_req)
+                .maybe_forward_prom_remote_query(ctx, metric.clone(), 
remote_req)
                 .await
                 .map_err(|e| {
                     error!("Forward prom remote query failed, err:{e}");
diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs
index 1b1fffdc..8f61039e 100644
--- a/src/proxy/src/http/sql.rs
+++ b/src/proxy/src/http/sql.rs
@@ -50,7 +50,7 @@ impl Proxy {
         req: Request,
     ) -> Result<Output> {
         let schema = &ctx.schema;
-        let ctx = Context::new(ctx.timeout, None);
+        let ctx = Context::new(ctx.timeout, None, ctx.authorization.clone());
 
         let query_res = self
             .handle_sql(
diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs
index 8c8346df..3e244786 100644
--- a/src/proxy/src/influxdb/mod.rs
+++ b/src/proxy/src/influxdb/mod.rs
@@ -81,7 +81,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs
index de07fc2d..54881640 100644
--- a/src/proxy/src/lib.rs
+++ b/src/proxy/src/lib.rs
@@ -20,6 +20,7 @@
 
 #![feature(trait_alias)]
 
+pub mod auth;
 pub mod context;
 pub mod error;
 mod error_util;
@@ -80,6 +81,8 @@ use table_engine::{
 use tonic::{transport::Channel, IntoRequest};
 
 use crate::{
+    auth::with_file::AuthWithFile,
+    context::RequestContext,
     error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
     forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
     hotspot::HotspotRecorder,
@@ -105,6 +108,7 @@ impl Default for SubTableAccessPerm {
 }
 
 pub struct Proxy {
+    auth: AuthWithFile,
     router: Arc<dyn Router + Send + Sync>,
     forwarder: ForwarderRef,
     instance: InstanceRef,
@@ -122,6 +126,7 @@ pub struct Proxy {
 impl Proxy {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
+        auth: AuthWithFile,
         router: Arc<dyn Router + Send + Sync>,
         instance: InstanceRef,
         forward_config: forward::Config,
@@ -143,6 +148,7 @@ impl Proxy {
         ));
 
         Self {
+            auth,
             router,
             instance,
             forwarder,
@@ -168,6 +174,7 @@ impl Proxy {
 
     async fn maybe_forward_prom_remote_query(
         &self,
+        ctx: &RequestContext,
         metric: String,
         req: PrometheusRemoteQueryRequest,
     ) -> Result<Option<ForwardResult<PrometheusRemoteQueryResponse, Error>>> {
@@ -177,6 +184,7 @@ impl Proxy {
             table: metric,
             req: req.into_request(),
             forwarded_from: None,
+            authorization: ctx.authorization.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<PrometheusRemoteQueryRequest>,
@@ -529,6 +537,10 @@ impl Proxy {
             })
         }
     }
+
+    pub fn check_auth(&self, authorization: Option<String>) -> bool {
+        self.auth.identify(authorization)
+    }
 }
 
 #[derive(Clone, Debug)]
@@ -536,14 +548,20 @@ pub struct Context {
     request_id: RequestId,
     timeout: Option<Duration>,
     forwarded_from: Option<String>,
+    authorization: Option<String>,
 }
 
 impl Context {
-    pub fn new(timeout: Option<Duration>, forwarded_from: Option<String>) -> 
Self {
+    pub fn new(
+        timeout: Option<Duration>,
+        forwarded_from: Option<String>,
+        authorization: Option<String>,
+    ) -> Self {
         Self {
             request_id: RequestId::next_id(),
             timeout,
             forwarded_from,
+            authorization,
         }
     }
 }
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index aae4a4a2..80affd03 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -69,7 +69,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs
index a34875d1..67e1473d 100644
--- a/src/proxy/src/read.rs
+++ b/src/proxy/src/read.rs
@@ -316,6 +316,7 @@ impl Proxy {
             table: table_name.unwrap(),
             req: sql_request.into_request(),
             forwarded_from: ctx.forwarded_from,
+            authorization: ctx.authorization,
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 1e5ae356..fd8e54bd 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -453,6 +453,7 @@ impl Proxy {
                 endpoint,
                 tonic::Request::new(table_write_request),
                 ctx.forwarded_from,
+                ctx.authorization,
                 do_write,
             )
             .await;
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
index 054b1532..b07cef95 100644
--- a/src/server/src/config.rs
+++ b/src/server/src/config.rs
@@ -25,7 +25,7 @@ use std::{
 use cluster::config::SchemaConfig;
 use common_types::schema::TIMESTAMP_COLUMN;
 use meta_client::types::ShardId;
-use proxy::{forward, hotspot, SubTableAccessPerm};
+use proxy::{auth, forward, hotspot, SubTableAccessPerm};
 use router::{
     endpoint::Endpoint,
     rule_based::{ClusterView, RuleList},
@@ -141,6 +141,9 @@ pub struct ServerConfig {
     /// The minimum length of the response body to compress.
     pub resp_compress_min_length: ReadableSize,
 
+    /// Auth config
+    pub auth: auth::Config,
+
     /// Config for forwarding
     pub forward: forward::Config,
 
@@ -178,6 +181,7 @@ impl Default for ServerConfig {
             http_max_body_size: ReadableSize::mb(64),
             grpc_server_cq_count: 20,
             resp_compress_min_length: ReadableSize::mb(4),
+            auth: auth::Config::default(),
             forward: forward::Config::default(),
             auto_create_table: true,
             default_schema_config: Default::default(),
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 1c53f205..7b02a3a2 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -37,6 +37,7 @@ use logger::{info, warn};
 use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use proxy::{
+    auth::with_file::AuthWithFile,
     forward,
     hotspot::HotspotRecorder,
     instance::InstanceRef,
@@ -47,7 +48,7 @@ use runtime::{JoinHandle, Runtime};
 use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::engine::EngineRuntimes;
 use tokio::sync::oneshot::{self, Sender};
-use tonic::transport::Server;
+use tonic::{codegen::InterceptedService, transport::Server};
 use wal::manager::OpenedWals;
 
 use self::remote_engine_service::QueryDedup;
@@ -113,6 +114,9 @@ pub enum Error {
     #[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))]
     MissingHotspotRecorder { backtrace: Backtrace },
 
+    #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))]
+    MissingAuth { backtrace: Backtrace },
+
     #[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))]
     ParseCatalogName {
         source: std::string::FromUtf8Error,
@@ -158,7 +162,7 @@ define_result!(Error);
 /// Rpc services manages all grpc services of the server.
 pub struct RpcServices {
     serve_addr: SocketAddr,
-    rpc_server: StorageServiceServer<StorageServiceImpl>,
+    rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>, 
AuthWithFile>,
     meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
     remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
     runtime: Arc<Runtime>,
@@ -212,6 +216,7 @@ impl RpcServices {
 }
 
 pub struct Builder {
+    auth: Option<AuthWithFile>,
     endpoint: String,
     timeout: Option<Duration>,
     runtimes: Option<Arc<EngineRuntimes>>,
@@ -226,6 +231,7 @@ pub struct Builder {
 impl Builder {
     pub fn new() -> Self {
         Self {
+            auth: None,
             endpoint: "0.0.0.0:8381".to_string(),
             timeout: None,
             runtimes: None,
@@ -238,6 +244,11 @@ impl Builder {
         }
     }
 
+    pub fn auth(mut self, auth: AuthWithFile) -> Self {
+        self.auth = Some(auth);
+        self
+    }
+
     pub fn endpoint(mut self, endpoint: String) -> Self {
         self.endpoint = endpoint;
         self
@@ -287,6 +298,7 @@ impl Builder {
 
 impl Builder {
     pub fn build(self) -> Result<RpcServices> {
+        let auth = self.auth.context(MissingAuth)?;
         let runtimes = self.runtimes.context(MissingRuntimes)?;
         let instance = self.instance.context(MissingInstance)?;
         let opened_wals = self.opened_wals.context(MissingWals)?;
@@ -330,7 +342,7 @@ impl Builder {
             runtimes,
             timeout: self.timeout,
         };
-        let rpc_server = StorageServiceServer::new(storage_service);
+        let rpc_server = 
StorageServiceServer::with_interceptor(storage_service, auth);
 
         let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?;
 
diff --git a/src/server/src/grpc/storage_service/mod.rs 
b/src/server/src/grpc/storage_service/mod.rs
index 142c5e23..9cf1fa22 100644
--- a/src/server/src/grpc/storage_service/mod.rs
+++ b/src/server/src/grpc/storage_service/mod.rs
@@ -35,7 +35,7 @@ use horaedbproto::{
     },
 };
 use http::StatusCode;
-use proxy::{Context, Proxy, FORWARDED_FROM};
+use proxy::{auth::with_file::get_authorization, Context, Proxy, 
FORWARDED_FROM};
 use table_engine::engine::EngineRuntimes;
 use time_ext::InstantExt;
 
@@ -148,7 +148,11 @@ impl StorageService for StorageServiceImpl {
     ) -> Result<tonic::Response<Self::StreamSqlQueryStream>, tonic::Status> {
         let begin_instant = Instant::now();
         let proxy = self.proxy.clone();
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
 
         let stream = self.stream_sql_query_internal(ctx, proxy, req).await;
 
@@ -172,7 +176,11 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<RouteRequest>,
     ) -> Result<tonic::Response<RouteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
         let req = req.into_inner();
         let proxy = self.proxy.clone();
 
@@ -199,7 +207,11 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<WriteRequest>,
     ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
 
         let req = req.into_inner();
         let proxy = self.proxy.clone();
@@ -236,7 +248,11 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<SqlQueryRequest>,
     ) -> Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
         let proxy = self.proxy.clone();
 
         let join_handle = self
@@ -262,11 +278,17 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<PrometheusRemoteQueryRequest>,
     ) -> Result<tonic::Response<PrometheusRemoteQueryResponse>, tonic::Status> 
{
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
+
         let req = req.into_inner();
         let proxy = self.proxy.clone();
-        let timeout = self.timeout;
+
         let join_handle = self.runtimes.read_runtime.spawn(async move {
-            match proxy.handle_prom_grpc_query(timeout, req).await {
+            match proxy.handle_prom_grpc_query(ctx, req).await {
                 Ok(v) => v,
                 Err(e) => PrometheusRemoteQueryResponse {
                     header: Some(error::build_err_header(
@@ -295,7 +317,11 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<PrometheusQueryRequest>,
     ) -> Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
 
         let req = req.into_inner();
         let proxy = self.proxy.clone();
@@ -331,7 +357,11 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<tonic::Streaming<WriteRequest>>,
     ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_authorization(&req),
+        );
         let mut stream = req.into_inner();
         let proxy = self.proxy.clone();
 
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 95ce7a18..da74ee84 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,6 +37,7 @@ use macros::define_result;
 use profile::Profiler;
 use prom_remote_api::web;
 use proxy::{
+    auth::AUTHORIZATION,
     context::RequestContext,
     handlers::{self},
     http::sql::{convert_output, Request},
@@ -148,6 +149,9 @@ pub enum Error {
 
     #[snafu(display("Querying shards is only supported in cluster mode"))]
     QueryShards {},
+
+    #[snafu(display("unauthenticated.\nBacktrace:\n{}", backtrace))]
+    UnAuthenticated { backtrace: Backtrace },
 }
 
 define_result!(Error);
@@ -729,20 +733,32 @@ impl Service {
             .default_schema_name()
             .to_string();
         let timeout = self.config.timeout;
+        let proxy = self.proxy.clone();
 
         header::optional::<String>(consts::CATALOG_HEADER)
             .and(header::optional::<String>(consts::SCHEMA_HEADER))
             .and(header::optional::<String>(consts::TENANT_HEADER))
+            .and(header::optional::<String>(AUTHORIZATION))
             .and_then(
-                move |catalog: Option<_>, schema: Option<_>, _tenant: 
Option<_>| {
+                move |catalog: Option<_>,
+                      schema: Option<_>,
+                      _tenant: Option<_>,
+                      authorization: Option<_>| {
                     // Clone the captured variables
                     let default_catalog = default_catalog.clone();
                     let schema = schema.unwrap_or_else(|| 
default_schema.clone());
+                    let proxy = proxy.clone();
+
                     async move {
+                        if !proxy.check_auth(authorization.clone()) {
+                            return 
UnAuthenticated.fail().map_err(reject::custom);
+                        }
+
                         RequestContext::builder()
                             .catalog(catalog.unwrap_or(default_catalog))
                             .schema(schema)
                             .timeout(timeout)
+                            .authorization(authorization)
                             .build()
                             .context(CreateContext)
                             .map_err(reject::custom)
@@ -919,6 +935,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
         | Error::QueryShards { .. } => StatusCode::BAD_REQUEST,
         Error::HandleUpdateLogLevel { .. } => 
StatusCode::INTERNAL_SERVER_ERROR,
         Error::QueryMaybeExceedTTL { .. } => StatusCode::OK,
+        Error::UnAuthenticated { .. } => StatusCode::UNAUTHORIZED,
     }
 }
 
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index ddc151c8..f7cd72ec 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -29,6 +29,7 @@ use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use partition_table_engine::PartitionTableEngine;
 use proxy::{
+    auth::{with_file::AuthWithFile, AuthType},
     hotspot::HotspotRecorder,
     instance::{DynamicConfig, Instance, InstanceRef},
     limiter::Limiter,
@@ -135,6 +136,9 @@ pub enum Error {
 
     #[snafu(display("Failed to build query engine, err:{source}"))]
     BuildQueryEngine { source: query_engine::error::Error },
+
+    #[snafu(display("Failed to load auth credential, err:{source}"))]
+    LoadCredential { source: proxy::error::Error },
 }
 
 define_result!(Error);
@@ -451,7 +455,20 @@ impl Builder {
             .enable
             .then(|| Arc::new(RequestNotifiers::default()));
 
+        // Build auth
+        let mut auth = if self.server_config.auth.enable {
+            match self.server_config.auth.auth_type {
+                AuthType::File => AuthWithFile::new(true, 
self.server_config.auth.source.clone()),
+            }
+        } else {
+            AuthWithFile::default()
+        };
+
+        // Load auth credential
+        auth.load_credential().context(LoadCredential)?;
+
         let proxy = Arc::new(Proxy::new(
+            auth.clone(),
             router.clone(),
             instance.clone(),
             self.server_config.forward,
@@ -500,6 +517,7 @@ impl Builder {
             .context(BuildPostgresqlService)?;
 
         let rpc_services = grpc::Builder::new()
+            .auth(auth)
             .endpoint(grpc_endpoint.to_string())
             .runtimes(engine_runtimes)
             .instance(instance.clone())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to