This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d64fc15 fix: table blacklist not work for write (#1507)
9d64fc15 is described below

commit 9d64fc159bee6affbb18709915ba48be4b4ad8a6
Author: kamille <[email protected]>
AuthorDate: Fri Mar 29 14:51:42 2024 +0800

    fix: table blacklist not work for write (#1507)
    
    ## Rationale
    Table blocking is found disable in grpc write path, that is annoying...
    This pr fix it for making us happy when handling the problematic tables
    in production.
    
    ## Detailed Changes
    - support table blocking in grpc write path.
    - add related its.
    
    ## Test Plan
    CI
---
 integration_tests/config/horaedb-cluster-0.toml |  4 +-
 integration_tests/config/horaedb-cluster-1.toml |  4 +-
 integration_tests/sdk/rust/src/main.rs          | 74 +++++++++++++++++++++++--
 src/proxy/src/limiter.rs                        | 16 ++++--
 src/proxy/src/write.rs                          | 50 ++++++++++++-----
 5 files changed, 118 insertions(+), 30 deletions(-)

diff --git a/integration_tests/config/horaedb-cluster-0.toml 
b/integration_tests/config/horaedb-cluster-0.toml
index 1cb3b722..da6170bc 100644
--- a/integration_tests/config/horaedb-cluster-0.toml
+++ b/integration_tests/config/horaedb-cluster-0.toml
@@ -55,5 +55,5 @@ timeout = "5s"
 server_addrs = ['127.0.0.1:2379']
 
 [limiter]
-write_block_list = ['mytable1']
-read_block_list = ['mytable1']
+write_block_list = ['block_test_table']
+read_block_list = ['block_test_table']
diff --git a/integration_tests/config/horaedb-cluster-1.toml 
b/integration_tests/config/horaedb-cluster-1.toml
index 130fde51..f1f71732 100644
--- a/integration_tests/config/horaedb-cluster-1.toml
+++ b/integration_tests/config/horaedb-cluster-1.toml
@@ -56,5 +56,5 @@ timeout = "5s"
 server_addrs = ['127.0.0.1:2379']
 
 [limiter]
-write_block_list = ['mytable1']
-read_block_list = ['mytable1']
+write_block_list = ['block_test_table']
+read_block_list = ['block_test_table']
diff --git a/integration_tests/sdk/rust/src/main.rs 
b/integration_tests/sdk/rust/src/main.rs
index 5d835113..07698fdb 100644
--- a/integration_tests/sdk/rust/src/main.rs
+++ b/integration_tests/sdk/rust/src/main.rs
@@ -31,6 +31,7 @@ use horaedb_client::{
 };
 
 const ENDPOINT: &str = "127.0.0.1:8831";
+const BLOCKED_TABLE: &str = "block_test_table";
 
 struct TestDatas {
     col_names: Vec<String>,
@@ -83,11 +84,12 @@ async fn main() {
     let now = current_timestamp_ms();
 
     let test_datas = generate_test_datas(now);
-
     test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await;
     test_add_column(&client, &rpc_ctx, now, &test_datas).await;
+    test_block_table(&client, &rpc_ctx, now).await;
 
-    drop_table_if_exists(&client, &rpc_ctx, now).await;
+    drop_test_table_if_exists(&client, &rpc_ctx, now).await;
+    drop_table_if_exists(&client, &rpc_ctx, BLOCKED_TABLE).await;
     print!("Test done")
 }
 
@@ -99,7 +101,7 @@ async fn test_auto_create_table(
 ) {
     println!("Test auto create table");
 
-    drop_table_if_exists(client, rpc_ctx, timestamp).await;
+    drop_test_table_if_exists(client, rpc_ctx, timestamp).await;
 
     write(client, rpc_ctx, timestamp, test_datas, false).await;
     sql_query(client, rpc_ctx, timestamp, test_datas, false).await;
@@ -117,11 +119,71 @@ async fn test_add_column(
     sql_query(client, rpc_ctx, timestamp, test_datas, true).await;
 }
 
-async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx: 
&RpcContext, timestamp: i64) {
+async fn test_block_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, 
timestamp: i64) {
+    println!("Test auto create table");
+
+    drop_table_if_exists(client, rpc_ctx, BLOCKED_TABLE).await;
+    create_table(client, rpc_ctx, BLOCKED_TABLE).await;
+
+    // try to write, should return table blocked error
+    let mut write_req = WriteRequest::default();
+    let mut points = Vec::new();
+    let builder = PointBuilder::new(BLOCKED_TABLE.to_string())
+        .timestamp(timestamp)
+        .tag("name", Value::String("name1".to_string()))
+        .field("value", Value::Double(0.42));
+    let point = builder.build().unwrap();
+    points.push(point);
+    write_req.add_points(points);
+    if let Err(e) = client.write(rpc_ctx, &write_req).await {
+        let e = e.to_string();
+        assert!(e.contains("Table operation is blocked"));
+    } else {
+        panic!("it should return blocked error");
+    }
+
+    // try to query, should be blocked, too
+    let query_req = SqlQueryRequest {
+        tables: vec![BLOCKED_TABLE.to_string()],
+        sql: format!("SELECT * from {}", BLOCKED_TABLE),
+    };
+    if let Err(e) = client.sql_query(rpc_ctx, &query_req).await {
+        let e = e.to_string();
+        assert!(e.contains("Table operation is blocked"));
+    } else {
+        panic!("it should return blocked error");
+    }
+}
+
+async fn drop_test_table_if_exists(
+    client: &Arc<dyn DbClient>,
+    rpc_ctx: &RpcContext,
+    timestamp: i64,
+) {
     let test_table = format!("test_table_{timestamp}");
+    drop_table_if_exists(client, rpc_ctx, &test_table).await;
+}
+
+async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx: 
&RpcContext, table: &str) {
     let query_req = SqlQueryRequest {
-        tables: vec![test_table.clone()],
-        sql: format!("DROP TABLE IF EXISTS {test_table}"),
+        tables: vec![table.to_string()],
+        sql: format!("DROP TABLE IF EXISTS {table}"),
+    };
+    let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
+}
+
+async fn create_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, table: 
&str) {
+    let query_req = SqlQueryRequest {
+        tables: vec![table.to_string()],
+        sql: format!(
+            "
+        CREATE TABLE {} (
+            name string TAG,
+            value double NOT NULL,
+            t timestamp NOT NULL,
+            timestamp KEY (t))",
+            table
+        ),
     };
     let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
 }
diff --git a/src/proxy/src/limiter.rs b/src/proxy/src/limiter.rs
index ee3d9f51..d3a1fc32 100644
--- a/src/proxy/src/limiter.rs
+++ b/src/proxy/src/limiter.rs
@@ -30,11 +30,11 @@ use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
 #[derive(Snafu, Debug)]
 #[snafu(visibility(pub))]
 pub enum Error {
-    #[snafu(display("Queried table is blocked, table:{}", table,))]
-    BlockedTable { table: String },
+    #[snafu(display("Table operation is blocked, table:{}, op:{}", table, op))]
+    BlockedTable { table: String, op: String },
 
-    #[snafu(display("Query is blocked by rule:{:?}", rule))]
-    BlockedByRule { rule: BlockRule },
+    #[snafu(display("Table operation is blocked by rule:{:?}, op:{}", rule, 
op))]
+    BlockedByRule { rule: BlockRule, op: String },
 }
 
 define_result!(Error);
@@ -158,6 +158,7 @@ impl Limiter {
                         {
                             BlockedTable {
                                 table: blocked_table,
+                                op: plan.plan_type(),
                             }
                             .fail()?;
                         }
@@ -174,6 +175,7 @@ impl Limiter {
                 {
                     BlockedTable {
                         table: insert.table.name(),
+                        op: plan.plan_type(),
                     }
                     .fail()?;
                 }
@@ -187,7 +189,11 @@ impl Limiter {
     fn try_limit_by_rules(&self, plan: &Plan) -> Result<()> {
         self.rules.read().unwrap().iter().try_for_each(|rule| {
             if rule.should_limit(plan) {
-                BlockedByRule { rule: *rule }.fail()?;
+                BlockedByRule {
+                    rule: *rule,
+                    op: plan.plan_type(),
+                }
+                .fail()?;
             }
 
             Ok(())
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 36e7faa7..1e5ae356 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -61,6 +61,11 @@ use crate::{
 
 type WriteResponseFutures<'a> = Vec<BoxFuture<'a, 
runtime::Result<Result<WriteResponse>>>>;
 
+struct PlanWithTable {
+    plan: Plan,
+    table: TableRef,
+}
+
 #[derive(Debug)]
 pub struct WriteContext {
     pub request_id: RequestId,
@@ -501,19 +506,34 @@ impl Proxy {
             auto_create_table: self.auto_create_table,
         };
 
-        let plan_vec = self
+        let plans = self
             .write_request_to_insert_plan(req.table_requests, write_context)
             .await?;
 
         let mut success = 0;
-        for insert_plan in plan_vec {
-            let table = insert_plan.table.clone();
+
+        // TODO: concurrently run the insert plan here
+        for plan_with_table in plans {
+            let PlanWithTable { plan, table } = plan_with_table;
+
+            // check limit first
+            // TODO: if one table is blocked, maybe should not lead to failure 
of whole
+            // batch?
+            self.instance
+                .limiter
+                .try_limit(&plan)
+                .box_err()
+                .context(ErrWithCause {
+                    code: StatusCode::INTERNAL_SERVER_ERROR,
+                    msg: "table is blocked",
+                })?;
+
             match self
                 .execute_insert_plan(
                     request_id.clone(),
                     catalog_name,
                     &schema_name,
-                    insert_plan,
+                    plan,
                     deadline,
                 )
                 .await
@@ -544,8 +564,8 @@ impl Proxy {
         &self,
         table_requests: Vec<WriteTableRequest>,
         write_context: WriteContext,
-    ) -> Result<Vec<InsertPlan>> {
-        let mut plan_vec = Vec::with_capacity(table_requests.len());
+    ) -> Result<Vec<PlanWithTable>> {
+        let mut plans = Vec::with_capacity(table_requests.len());
 
         let WriteContext {
             request_id,
@@ -602,10 +622,16 @@ impl Proxy {
                 }
                 Ok(v) => v,
             };
-            plan_vec.push(plan);
+            let plan = Plan::Insert(plan);
+            let plan_with_table = PlanWithTable {
+                plan,
+                table: table_clone,
+            };
+
+            plans.push(plan_with_table);
         }
 
-        Ok(plan_vec)
+        Ok(plans)
     }
 
     async fn execute_insert_plan(
@@ -613,15 +639,9 @@ impl Proxy {
         request_id: RequestId,
         catalog_name: &str,
         schema_name: &str,
-        insert_plan: InsertPlan,
+        plan: Plan,
         deadline: Option<Instant>,
     ) -> Result<usize> {
-        debug!(
-            "Execute insert plan begin, table:{}, row_num:{}",
-            insert_plan.table.name(),
-            insert_plan.rows.num_rows()
-        );
-        let plan = Plan::Insert(insert_plan);
         let output = self
             .execute_plan(request_id, catalog_name, schema_name, plan, 
deadline)
             .await;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to