This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 9d64fc15 fix: table blacklist not work for write (#1507)
9d64fc15 is described below
commit 9d64fc159bee6affbb18709915ba48be4b4ad8a6
Author: kamille <[email protected]>
AuthorDate: Fri Mar 29 14:51:42 2024 +0800
fix: table blacklist not work for write (#1507)
## Rationale
Table blocking is found disable in grpc write path, that is annoying...
This pr fix it for making us happy when handling the problematic tables
in production.
## Detailed Changes
- support table blocking in grpc write path.
- add related its.
## Test Plan
CI
---
integration_tests/config/horaedb-cluster-0.toml | 4 +-
integration_tests/config/horaedb-cluster-1.toml | 4 +-
integration_tests/sdk/rust/src/main.rs | 74 +++++++++++++++++++++++--
src/proxy/src/limiter.rs | 16 ++++--
src/proxy/src/write.rs | 50 ++++++++++++-----
5 files changed, 118 insertions(+), 30 deletions(-)
diff --git a/integration_tests/config/horaedb-cluster-0.toml
b/integration_tests/config/horaedb-cluster-0.toml
index 1cb3b722..da6170bc 100644
--- a/integration_tests/config/horaedb-cluster-0.toml
+++ b/integration_tests/config/horaedb-cluster-0.toml
@@ -55,5 +55,5 @@ timeout = "5s"
server_addrs = ['127.0.0.1:2379']
[limiter]
-write_block_list = ['mytable1']
-read_block_list = ['mytable1']
+write_block_list = ['block_test_table']
+read_block_list = ['block_test_table']
diff --git a/integration_tests/config/horaedb-cluster-1.toml
b/integration_tests/config/horaedb-cluster-1.toml
index 130fde51..f1f71732 100644
--- a/integration_tests/config/horaedb-cluster-1.toml
+++ b/integration_tests/config/horaedb-cluster-1.toml
@@ -56,5 +56,5 @@ timeout = "5s"
server_addrs = ['127.0.0.1:2379']
[limiter]
-write_block_list = ['mytable1']
-read_block_list = ['mytable1']
+write_block_list = ['block_test_table']
+read_block_list = ['block_test_table']
diff --git a/integration_tests/sdk/rust/src/main.rs
b/integration_tests/sdk/rust/src/main.rs
index 5d835113..07698fdb 100644
--- a/integration_tests/sdk/rust/src/main.rs
+++ b/integration_tests/sdk/rust/src/main.rs
@@ -31,6 +31,7 @@ use horaedb_client::{
};
const ENDPOINT: &str = "127.0.0.1:8831";
+const BLOCKED_TABLE: &str = "block_test_table";
struct TestDatas {
col_names: Vec<String>,
@@ -83,11 +84,12 @@ async fn main() {
let now = current_timestamp_ms();
let test_datas = generate_test_datas(now);
-
test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await;
test_add_column(&client, &rpc_ctx, now, &test_datas).await;
+ test_block_table(&client, &rpc_ctx, now).await;
- drop_table_if_exists(&client, &rpc_ctx, now).await;
+ drop_test_table_if_exists(&client, &rpc_ctx, now).await;
+ drop_table_if_exists(&client, &rpc_ctx, BLOCKED_TABLE).await;
print!("Test done")
}
@@ -99,7 +101,7 @@ async fn test_auto_create_table(
) {
println!("Test auto create table");
- drop_table_if_exists(client, rpc_ctx, timestamp).await;
+ drop_test_table_if_exists(client, rpc_ctx, timestamp).await;
write(client, rpc_ctx, timestamp, test_datas, false).await;
sql_query(client, rpc_ctx, timestamp, test_datas, false).await;
@@ -117,11 +119,71 @@ async fn test_add_column(
sql_query(client, rpc_ctx, timestamp, test_datas, true).await;
}
-async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx:
&RpcContext, timestamp: i64) {
+async fn test_block_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext,
timestamp: i64) {
+ println!("Test auto create table");
+
+ drop_table_if_exists(client, rpc_ctx, BLOCKED_TABLE).await;
+ create_table(client, rpc_ctx, BLOCKED_TABLE).await;
+
+ // try to write, should return table blocked error
+ let mut write_req = WriteRequest::default();
+ let mut points = Vec::new();
+ let builder = PointBuilder::new(BLOCKED_TABLE.to_string())
+ .timestamp(timestamp)
+ .tag("name", Value::String("name1".to_string()))
+ .field("value", Value::Double(0.42));
+ let point = builder.build().unwrap();
+ points.push(point);
+ write_req.add_points(points);
+ if let Err(e) = client.write(rpc_ctx, &write_req).await {
+ let e = e.to_string();
+ assert!(e.contains("Table operation is blocked"));
+ } else {
+ panic!("it should return blocked error");
+ }
+
+ // try to query, should be blocked, too
+ let query_req = SqlQueryRequest {
+ tables: vec![BLOCKED_TABLE.to_string()],
+ sql: format!("SELECT * from {}", BLOCKED_TABLE),
+ };
+ if let Err(e) = client.sql_query(rpc_ctx, &query_req).await {
+ let e = e.to_string();
+ assert!(e.contains("Table operation is blocked"));
+ } else {
+ panic!("it should return blocked error");
+ }
+}
+
+async fn drop_test_table_if_exists(
+ client: &Arc<dyn DbClient>,
+ rpc_ctx: &RpcContext,
+ timestamp: i64,
+) {
let test_table = format!("test_table_{timestamp}");
+ drop_table_if_exists(client, rpc_ctx, &test_table).await;
+}
+
+async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx:
&RpcContext, table: &str) {
let query_req = SqlQueryRequest {
- tables: vec![test_table.clone()],
- sql: format!("DROP TABLE IF EXISTS {test_table}"),
+ tables: vec![table.to_string()],
+ sql: format!("DROP TABLE IF EXISTS {table}"),
+ };
+ let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
+}
+
+async fn create_table(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, table:
&str) {
+ let query_req = SqlQueryRequest {
+ tables: vec![table.to_string()],
+ sql: format!(
+ "
+ CREATE TABLE {} (
+ name string TAG,
+ value double NOT NULL,
+ t timestamp NOT NULL,
+ timestamp KEY (t))",
+ table
+ ),
};
let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap();
}
diff --git a/src/proxy/src/limiter.rs b/src/proxy/src/limiter.rs
index ee3d9f51..d3a1fc32 100644
--- a/src/proxy/src/limiter.rs
+++ b/src/proxy/src/limiter.rs
@@ -30,11 +30,11 @@ use crate::metrics::BLOCKED_REQUEST_COUNTER_VEC_GLOBAL;
#[derive(Snafu, Debug)]
#[snafu(visibility(pub))]
pub enum Error {
- #[snafu(display("Queried table is blocked, table:{}", table,))]
- BlockedTable { table: String },
+ #[snafu(display("Table operation is blocked, table:{}, op:{}", table, op))]
+ BlockedTable { table: String, op: String },
- #[snafu(display("Query is blocked by rule:{:?}", rule))]
- BlockedByRule { rule: BlockRule },
+ #[snafu(display("Table operation is blocked by rule:{:?}, op:{}", rule,
op))]
+ BlockedByRule { rule: BlockRule, op: String },
}
define_result!(Error);
@@ -158,6 +158,7 @@ impl Limiter {
{
BlockedTable {
table: blocked_table,
+ op: plan.plan_type(),
}
.fail()?;
}
@@ -174,6 +175,7 @@ impl Limiter {
{
BlockedTable {
table: insert.table.name(),
+ op: plan.plan_type(),
}
.fail()?;
}
@@ -187,7 +189,11 @@ impl Limiter {
fn try_limit_by_rules(&self, plan: &Plan) -> Result<()> {
self.rules.read().unwrap().iter().try_for_each(|rule| {
if rule.should_limit(plan) {
- BlockedByRule { rule: *rule }.fail()?;
+ BlockedByRule {
+ rule: *rule,
+ op: plan.plan_type(),
+ }
+ .fail()?;
}
Ok(())
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 36e7faa7..1e5ae356 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -61,6 +61,11 @@ use crate::{
type WriteResponseFutures<'a> = Vec<BoxFuture<'a,
runtime::Result<Result<WriteResponse>>>>;
+struct PlanWithTable {
+ plan: Plan,
+ table: TableRef,
+}
+
#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
@@ -501,19 +506,34 @@ impl Proxy {
auto_create_table: self.auto_create_table,
};
- let plan_vec = self
+ let plans = self
.write_request_to_insert_plan(req.table_requests, write_context)
.await?;
let mut success = 0;
- for insert_plan in plan_vec {
- let table = insert_plan.table.clone();
+
+ // TODO: concurrently run the insert plan here
+ for plan_with_table in plans {
+ let PlanWithTable { plan, table } = plan_with_table;
+
+ // check limit first
+ // TODO: if one table is blocked, maybe should not lead to failure
of whole
+ // batch?
+ self.instance
+ .limiter
+ .try_limit(&plan)
+ .box_err()
+ .context(ErrWithCause {
+ code: StatusCode::INTERNAL_SERVER_ERROR,
+ msg: "table is blocked",
+ })?;
+
match self
.execute_insert_plan(
request_id.clone(),
catalog_name,
&schema_name,
- insert_plan,
+ plan,
deadline,
)
.await
@@ -544,8 +564,8 @@ impl Proxy {
&self,
table_requests: Vec<WriteTableRequest>,
write_context: WriteContext,
- ) -> Result<Vec<InsertPlan>> {
- let mut plan_vec = Vec::with_capacity(table_requests.len());
+ ) -> Result<Vec<PlanWithTable>> {
+ let mut plans = Vec::with_capacity(table_requests.len());
let WriteContext {
request_id,
@@ -602,10 +622,16 @@ impl Proxy {
}
Ok(v) => v,
};
- plan_vec.push(plan);
+ let plan = Plan::Insert(plan);
+ let plan_with_table = PlanWithTable {
+ plan,
+ table: table_clone,
+ };
+
+ plans.push(plan_with_table);
}
- Ok(plan_vec)
+ Ok(plans)
}
async fn execute_insert_plan(
@@ -613,15 +639,9 @@ impl Proxy {
request_id: RequestId,
catalog_name: &str,
schema_name: &str,
- insert_plan: InsertPlan,
+ plan: Plan,
deadline: Option<Instant>,
) -> Result<usize> {
- debug!(
- "Execute insert plan begin, table:{}, row_num:{}",
- insert_plan.table.name(),
- insert_plan.rows.num_rows()
- );
- let plan = Plan::Insert(insert_plan);
let output = self
.execute_plan(request_id, catalog_name, schema_name, plan,
deadline)
.await;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]