This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 94673bd1 chore: fix error message (#1412)
94673bd1 is described below

commit 94673bd18585ce3f5478a07c735558a31eed301a
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Jan 4 16:24:26 2024 +0800

    chore: fix error message (#1412)
    
    ## Rationale
    
    
    ## Detailed Changes
    - Attach endpoint to remote error
    
    ## Test Plan
    CI
---
 .github/workflows/ci.yml                           |  1 +
 .../cases/env/cluster/ddl/partition_table.result   |  3 +-
 .../cases/env/cluster/ddl/partition_table.sql      |  1 +
 interpreters/src/table_manipulator/meta_based.rs   |  2 +-
 remote_engine_client/src/client.rs                 | 36 ++++++++++++++++------
 remote_engine_client/src/lib.rs                    |  5 ++-
 6 files changed, 36 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 13b4773a..25ea726d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,6 +20,7 @@ on:
   push:
     branches:
       - main
+      - dev
     paths-ignore:
       - 'docs/**'
       - 'etc/**'
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index e8feacbc..5ae20b64 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -99,9 +99,10 @@ ALTER TABLE partition_table_t ADD COLUMN (b string);
 
 affected_rows: 0
 
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
 INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10, 
"ceresdb0", 100);
 
-Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to 
execute plan. Caused by: Internal error, msg:Failed to execute interpreter, 
err:Failed to execute insert, err:Failed to write table, err:Failed to write 
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from 
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema: 
\"public\", table: \"__partition_table_t_1\" }], code:401, msg:failed to decode 
row group payload. Cause [...]
+Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to 
execute plan. Caused by: Internal error, msg:Failed to execute interpreter, 
err:Failed to execute insert, err:Failed to write table, err:Failed to write 
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from 
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema: 
\"public\", table: \"__partition_table_t_1\" }], endpoint:xx, code:401, 
msg:failed to decode row group p [...]
 
 ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
 
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 46be8e1b..855e2c9a 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -46,6 +46,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name in 
("ceresdb0", "cere
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
 
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
 INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10, 
"ceresdb0", 100);
 
 ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
diff --git a/interpreters/src/table_manipulator/meta_based.rs 
b/interpreters/src/table_manipulator/meta_based.rs
index bdecfd3e..2e6f8e69 100644
--- a/interpreters/src/table_manipulator/meta_based.rs
+++ b/interpreters/src/table_manipulator/meta_based.rs
@@ -126,7 +126,7 @@ impl TableManipulator for TableManipulatorImpl {
             .await
             .box_err()
             .context(DropWithCause {
-                msg: format!("failed to create table by meta client, 
req:{req:?}"),
+                msg: format!("failed to drop table by meta client, 
req:{req:?}"),
             })?;
 
         info!(
diff --git a/remote_engine_client/src/client.rs 
b/remote_engine_client/src/client.rs
index 456de914..291c01d5 100644
--- a/remote_engine_client/src/client.rs
+++ b/remote_engine_client/src/client.rs
@@ -37,7 +37,7 @@ use common_types::{record_batch::RecordBatch, 
schema::RecordSchema};
 use futures::{Stream, StreamExt};
 use generic_error::BoxError;
 use logger::{error, info};
-use router::RouterRef;
+use router::{endpoint::Endpoint, RouterRef};
 use runtime::Runtime;
 use snafu::{ensure, OptionExt, ResultExt};
 use table_engine::{
@@ -120,6 +120,7 @@ impl Client {
         // evict cache entry.
         let response = response.into_inner();
         let remote_read_record_batch_stream = ClientReadRecordBatchStream::new(
+            route_context.endpoint,
             table_ident,
             response,
             record_schema,
@@ -135,6 +136,7 @@ impl Client {
 
         // Write to remote.
         let table_ident = request.table.clone();
+        let endpoint = route_context.endpoint.clone();
         let request_pb = request.convert_into_pb().box_err().context(Convert {
             msg: "Failed to convert WriteRequest to pb",
         })?;
@@ -152,6 +154,7 @@ impl Client {
             let response = response.into_inner();
             if let Some(header) = &response.header && 
!status_code::is_ok(header.code) {
                 Server {
+                    endpoint,
                     table_idents: vec![table_ident.clone()],
                     code: header.code,
                     msg: header.error.clone(),
@@ -187,9 +190,9 @@ impl Client {
         }
 
         // Merge according to endpoint.
-        let mut remote_writes = 
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
+        let mut write_handles = 
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
         let mut written_tables = 
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
-        for (_, context) in write_batch_contexts_by_endpoint {
+        for (endpoint, context) in write_batch_contexts_by_endpoint {
             // Write to remote.
             let WriteBatchContext {
                 table_idents,
@@ -204,18 +207,18 @@ impl Client {
                 rpc_client
                     .write_batch(Request::new(batch_request_pb))
                     .await
+                    .map(|v| (v, endpoint.clone()))
                     .box_err()
             });
 
-            remote_writes.push(handle);
+            write_handles.push(handle);
             written_tables.push(table_idents);
         }
 
-        let mut results = Vec::with_capacity(remote_writes.len());
-        for (table_idents, remote_write) in 
written_tables.into_iter().zip(remote_writes) {
-            let batch_result = remote_write.await;
+        let mut results = Vec::with_capacity(write_handles.len());
+        for (table_idents, handle) in 
written_tables.into_iter().zip(write_handles) {
             // If it's runtime error, don't evict entires from route cache.
-            let batch_result = match batch_result.box_err() {
+            let batch_result = match handle.await.box_err() {
                 Ok(result) => result,
                 Err(e) => {
                     results.push(WriteBatchResult {
@@ -227,10 +230,12 @@ impl Client {
             };
 
             // Check remote write result then.
-            let result = batch_result.and_then(|response| {
+            let result = batch_result.and_then(|result| {
+                let (response, endpoint) = result;
                 let response = response.into_inner();
                 if let Some(header) = &response.header && 
!status_code::is_ok(header.code) {
                     Server {
+                        endpoint,
                         table_idents: table_idents.clone(),
                         code: header.code,
                         msg: header.error.clone(),
@@ -260,6 +265,7 @@ impl Client {
         let route_context = 
self.cached_router.route(&request.table_ident).await?;
 
         let table_ident = request.table_ident.clone();
+        let endpoint = route_context.endpoint.clone();
         let request_pb: ceresdbproto::remote_engine::AlterTableSchemaRequest = 
request.into();
         let mut rpc_client = 
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
 
@@ -279,6 +285,7 @@ impl Client {
                 let response = response.into_inner();
                 if let Some(header) = &response.header && 
!status_code::is_ok(header.code) {
                     Server {
+                        endpoint:endpoint.clone(),
                         table_idents: vec![table_ident.clone()],
                         code: header.code,
                         msg: header.error.clone(),
@@ -318,6 +325,7 @@ impl Client {
         let route_context = 
self.cached_router.route(&request.table_ident).await?;
 
         let table_ident = request.table_ident.clone();
+        let endpoint = route_context.endpoint.clone();
         let request_pb: ceresdbproto::remote_engine::AlterTableOptionsRequest 
= request.into();
         let mut rpc_client = 
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
 
@@ -336,6 +344,7 @@ impl Client {
                 let response = response.into_inner();
                 if let Some(header) = &response.header && 
!status_code::is_ok(header.code) {
                     Server {
+                        endpoint:endpoint.clone(),
                         table_idents: vec![table_ident.clone()],
                         code: header.code,
                         msg: header.error.clone(),
@@ -371,6 +380,7 @@ impl Client {
         // Find the channel from router firstly.
         let route_context = self.cached_router.route(&request.table).await?;
         let table_ident = request.table.clone();
+        let endpoint = route_context.endpoint.clone();
         let request_pb = 
ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request)
             .box_err()
             .context(Convert {
@@ -391,6 +401,7 @@ impl Client {
             let response = response.into_inner();
             if let Some(header) = &response.header && 
!status_code::is_ok(header.code) {
                     Server {
+                        endpoint:endpoint.clone(),
                         table_idents: vec![table_ident.clone()],
                         code: header.code,
                         msg: header.error.clone(),
@@ -403,6 +414,7 @@ impl Client {
         match result {
             Ok(response) => {
                 let table_info = response.table_info.context(Server {
+                    endpoint: endpoint.clone(),
                     table_idents: vec![table_ident.clone()],
                     code: status_code::StatusCode::Internal.as_u32(),
                     msg: "Table info is empty",
@@ -423,6 +435,7 @@ impl Client {
                             msg: "Failed to covert table schema",
                         })?
                         .with_context(|| Server {
+                            endpoint,
                             table_idents: vec![table_ident],
                             code: status_code::StatusCode::Internal.as_u32(),
                             msg: "Table schema is empty",
@@ -490,6 +503,7 @@ impl Client {
         // evict cache entry.
         let response = response.into_inner();
         let remote_execute_plan_stream = ClientReadRecordBatchStream::new(
+            route_context.endpoint,
             table_ident,
             response,
             plan_schema,
@@ -509,6 +523,7 @@ impl Client {
 }
 
 pub struct ClientReadRecordBatchStream {
+    endpoint: Endpoint,
     pub table_ident: TableIdentifier,
     pub response_stream: Streaming<remote_engine::ReadResponse>,
     pub record_schema: RecordSchema,
@@ -517,12 +532,14 @@ pub struct ClientReadRecordBatchStream {
 
 impl ClientReadRecordBatchStream {
     pub fn new(
+        endpoint: Endpoint,
         table_ident: TableIdentifier,
         response_stream: Streaming<remote_engine::ReadResponse>,
         record_schema: RecordSchema,
         remote_metrics: Arc<Mutex<Option<String>>>,
     ) -> Self {
         Self {
+            endpoint,
             table_ident,
             response_stream,
             record_schema,
@@ -541,6 +558,7 @@ impl Stream for ClientReadRecordBatchStream {
                 // Check header.
                 if let Some(header) = response.header && 
!status_code::is_ok(header.code) {
                     return Poll::Ready(Some(Server {
+                        endpoint: this.endpoint.clone(),
                         table_idents: vec![this.table_ident.clone()],
                         code: header.code,
                         msg: header.error,
diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs
index c88026f3..055f4083 100644
--- a/remote_engine_client/src/lib.rs
+++ b/remote_engine_client/src/lib.rs
@@ -54,6 +54,7 @@ use self::client::{Client, ClientReadRecordBatchStream};
 pub mod error {
     use generic_error::GenericError;
     use macros::define_result;
+    use router::endpoint::Endpoint;
     use snafu::{Backtrace, Snafu};
     use table_engine::remote::model::TableIdentifier;
 
@@ -93,12 +94,14 @@ pub mod error {
         },
 
         #[snafu(display(
-            "Failed to query from table in server, table_idents:{:?}, code:{}, 
msg:{}",
+            "Failed to query from table in server, table_idents:{:?}, 
endpoint:{}, code:{}, msg:{}",
             table_idents,
+            endpoint.to_string(),
             code,
             msg
         ))]
         Server {
+            endpoint: Endpoint,
             table_idents: Vec<TableIdentifier>,
             code: u32,
             msg: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to