This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 94673bd1 chore: fix error message (#1412)
94673bd1 is described below
commit 94673bd18585ce3f5478a07c735558a31eed301a
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Jan 4 16:24:26 2024 +0800
chore: fix error message (#1412)
## Rationale
## Detailed Changes
- Attach endpoint to remote error
## Test Plan
CI
---
.github/workflows/ci.yml | 1 +
.../cases/env/cluster/ddl/partition_table.result | 3 +-
.../cases/env/cluster/ddl/partition_table.sql | 1 +
interpreters/src/table_manipulator/meta_based.rs | 2 +-
remote_engine_client/src/client.rs | 36 ++++++++++++++++------
remote_engine_client/src/lib.rs | 5 ++-
6 files changed, 36 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 13b4773a..25ea726d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,6 +20,7 @@ on:
push:
branches:
- main
+ - dev
paths-ignore:
- 'docs/**'
- 'etc/**'
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index e8feacbc..5ae20b64 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -99,9 +99,10 @@ ALTER TABLE partition_table_t ADD COLUMN (b string);
affected_rows: 0
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10,
"ceresdb0", 100);
-Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to
execute plan. Caused by: Internal error, msg:Failed to execute interpreter,
err:Failed to execute insert, err:Failed to write table, err:Failed to write
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema:
\"public\", table: \"__partition_table_t_1\" }], code:401, msg:failed to decode
row group payload. Cause [...]
+Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to
execute plan. Caused by: Internal error, msg:Failed to execute interpreter,
err:Failed to execute insert, err:Failed to write table, err:Failed to write
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema:
\"public\", table: \"__partition_table_t_1\" }], endpoint:xx, code:401,
msg:failed to decode row group p [...]
ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 46be8e1b..855e2c9a 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -46,6 +46,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name in
("ceresdb0", "cere
ALTER TABLE partition_table_t ADD COLUMN (b string);
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10,
"ceresdb0", 100);
ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
diff --git a/interpreters/src/table_manipulator/meta_based.rs
b/interpreters/src/table_manipulator/meta_based.rs
index bdecfd3e..2e6f8e69 100644
--- a/interpreters/src/table_manipulator/meta_based.rs
+++ b/interpreters/src/table_manipulator/meta_based.rs
@@ -126,7 +126,7 @@ impl TableManipulator for TableManipulatorImpl {
.await
.box_err()
.context(DropWithCause {
- msg: format!("failed to create table by meta client,
req:{req:?}"),
+ msg: format!("failed to drop table by meta client,
req:{req:?}"),
})?;
info!(
diff --git a/remote_engine_client/src/client.rs
b/remote_engine_client/src/client.rs
index 456de914..291c01d5 100644
--- a/remote_engine_client/src/client.rs
+++ b/remote_engine_client/src/client.rs
@@ -37,7 +37,7 @@ use common_types::{record_batch::RecordBatch,
schema::RecordSchema};
use futures::{Stream, StreamExt};
use generic_error::BoxError;
use logger::{error, info};
-use router::RouterRef;
+use router::{endpoint::Endpoint, RouterRef};
use runtime::Runtime;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
@@ -120,6 +120,7 @@ impl Client {
// evict cache entry.
let response = response.into_inner();
let remote_read_record_batch_stream = ClientReadRecordBatchStream::new(
+ route_context.endpoint,
table_ident,
response,
record_schema,
@@ -135,6 +136,7 @@ impl Client {
// Write to remote.
let table_ident = request.table.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
@@ -152,6 +154,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint,
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -187,9 +190,9 @@ impl Client {
}
// Merge according to endpoint.
- let mut remote_writes =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
+ let mut write_handles =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
let mut written_tables =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
- for (_, context) in write_batch_contexts_by_endpoint {
+ for (endpoint, context) in write_batch_contexts_by_endpoint {
// Write to remote.
let WriteBatchContext {
table_idents,
@@ -204,18 +207,18 @@ impl Client {
rpc_client
.write_batch(Request::new(batch_request_pb))
.await
+ .map(|v| (v, endpoint.clone()))
.box_err()
});
- remote_writes.push(handle);
+ write_handles.push(handle);
written_tables.push(table_idents);
}
- let mut results = Vec::with_capacity(remote_writes.len());
- for (table_idents, remote_write) in
written_tables.into_iter().zip(remote_writes) {
- let batch_result = remote_write.await;
+ let mut results = Vec::with_capacity(write_handles.len());
+ for (table_idents, handle) in
written_tables.into_iter().zip(write_handles) {
// If it's runtime error, don't evict entires from route cache.
- let batch_result = match batch_result.box_err() {
+ let batch_result = match handle.await.box_err() {
Ok(result) => result,
Err(e) => {
results.push(WriteBatchResult {
@@ -227,10 +230,12 @@ impl Client {
};
// Check remote write result then.
- let result = batch_result.and_then(|response| {
+ let result = batch_result.and_then(|result| {
+ let (response, endpoint) = result;
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint,
table_idents: table_idents.clone(),
code: header.code,
msg: header.error.clone(),
@@ -260,6 +265,7 @@ impl Client {
let route_context =
self.cached_router.route(&request.table_ident).await?;
let table_ident = request.table_ident.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb: ceresdbproto::remote_engine::AlterTableSchemaRequest =
request.into();
let mut rpc_client =
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
@@ -279,6 +285,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -318,6 +325,7 @@ impl Client {
let route_context =
self.cached_router.route(&request.table_ident).await?;
let table_ident = request.table_ident.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb: ceresdbproto::remote_engine::AlterTableOptionsRequest
= request.into();
let mut rpc_client =
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
@@ -336,6 +344,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -371,6 +380,7 @@ impl Client {
// Find the channel from router firstly.
let route_context = self.cached_router.route(&request.table).await?;
let table_ident = request.table.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb =
ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request)
.box_err()
.context(Convert {
@@ -391,6 +401,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -403,6 +414,7 @@ impl Client {
match result {
Ok(response) => {
let table_info = response.table_info.context(Server {
+ endpoint: endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table info is empty",
@@ -423,6 +435,7 @@ impl Client {
msg: "Failed to covert table schema",
})?
.with_context(|| Server {
+ endpoint,
table_idents: vec![table_ident],
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table schema is empty",
@@ -490,6 +503,7 @@ impl Client {
// evict cache entry.
let response = response.into_inner();
let remote_execute_plan_stream = ClientReadRecordBatchStream::new(
+ route_context.endpoint,
table_ident,
response,
plan_schema,
@@ -509,6 +523,7 @@ impl Client {
}
pub struct ClientReadRecordBatchStream {
+ endpoint: Endpoint,
pub table_ident: TableIdentifier,
pub response_stream: Streaming<remote_engine::ReadResponse>,
pub record_schema: RecordSchema,
@@ -517,12 +532,14 @@ pub struct ClientReadRecordBatchStream {
impl ClientReadRecordBatchStream {
pub fn new(
+ endpoint: Endpoint,
table_ident: TableIdentifier,
response_stream: Streaming<remote_engine::ReadResponse>,
record_schema: RecordSchema,
remote_metrics: Arc<Mutex<Option<String>>>,
) -> Self {
Self {
+ endpoint,
table_ident,
response_stream,
record_schema,
@@ -541,6 +558,7 @@ impl Stream for ClientReadRecordBatchStream {
// Check header.
if let Some(header) = response.header &&
!status_code::is_ok(header.code) {
return Poll::Ready(Some(Server {
+ endpoint: this.endpoint.clone(),
table_idents: vec![this.table_ident.clone()],
code: header.code,
msg: header.error,
diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs
index c88026f3..055f4083 100644
--- a/remote_engine_client/src/lib.rs
+++ b/remote_engine_client/src/lib.rs
@@ -54,6 +54,7 @@ use self::client::{Client, ClientReadRecordBatchStream};
pub mod error {
use generic_error::GenericError;
use macros::define_result;
+ use router::endpoint::Endpoint;
use snafu::{Backtrace, Snafu};
use table_engine::remote::model::TableIdentifier;
@@ -93,12 +94,14 @@ pub mod error {
},
#[snafu(display(
- "Failed to query from table in server, table_idents:{:?}, code:{},
msg:{}",
+ "Failed to query from table in server, table_idents:{:?},
endpoint:{}, code:{}, msg:{}",
table_idents,
+ endpoint.to_string(),
code,
msg
))]
Server {
+ endpoint: Endpoint,
table_idents: Vec<TableIdentifier>,
code: u32,
msg: String,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]