This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e10d63  chore: Improve error (#77)
4e10d63 is described below

commit 4e10d6305712357630f96cc5fa47dc9867fea842
Author: yuxia Luo <[email protected]>
AuthorDate: Sat Dec 20 12:04:20 2025 +0800

    chore: Improve error (#77)
---
 crates/fluss/Cargo.toml                            |   1 +
 crates/fluss/src/client/admin.rs                   |  28 +-
 crates/fluss/src/client/credentials.rs             |   7 +-
 crates/fluss/src/client/table/remote_log.rs        |  29 +-
 crates/fluss/src/client/table/scanner.rs           |  30 +-
 crates/fluss/src/client/write/mod.rs               |  10 +-
 crates/fluss/src/client/write/sender.rs            |  11 +-
 crates/fluss/src/client/write/writer_client.rs     |  17 +-
 crates/fluss/src/error.rs                          | 139 ++++++--
 crates/fluss/src/io/file_io.rs                     |   5 +-
 crates/fluss/src/io/storage.rs                     |   6 +-
 crates/fluss/src/metadata/database.rs              |  22 +-
 crates/fluss/src/metadata/json_serde.rs            | 179 +++++-----
 crates/fluss/src/metadata/table.rs                 |  88 +++--
 crates/fluss/src/proto/fluss_api.proto             |   5 +
 crates/fluss/src/record/arrow.rs                   |  24 +-
 crates/fluss/src/row/datum.rs                      |  34 +-
 crates/fluss/src/rpc/error.rs                      |   4 +
 crates/fluss/src/rpc/fluss_api_error.rs            | 371 +++++++++++++++++++++
 crates/fluss/src/rpc/frame.rs                      |   4 +
 crates/fluss/src/rpc/message/create_database.rs    |   3 +-
 crates/fluss/src/rpc/message/create_table.rs       |   3 +-
 crates/fluss/src/rpc/message/database_exists.rs    |   4 +-
 crates/fluss/src/rpc/message/drop_database.rs      |   4 +-
 crates/fluss/src/rpc/message/drop_table.rs         |   4 +-
 crates/fluss/src/rpc/message/fetch.rs              |   4 +-
 crates/fluss/src/rpc/message/get_database_info.rs  |   4 +-
 .../src/rpc/message/get_latest_lake_snapshot.rs    |   4 +-
 crates/fluss/src/rpc/message/get_table.rs          |   4 +-
 crates/fluss/src/rpc/message/header.rs             |  18 +-
 crates/fluss/src/rpc/message/list_databases.rs     |   4 +-
 crates/fluss/src/rpc/message/list_offsets.rs       |  19 +-
 crates/fluss/src/rpc/message/list_tables.rs        |   4 +-
 crates/fluss/src/rpc/message/mod.rs                |   1 +
 crates/fluss/src/rpc/message/produce_log.rs        |   4 +-
 crates/fluss/src/rpc/message/table_exists.rs       |   5 +-
 crates/fluss/src/rpc/message/update_metadata.rs    |   4 +-
 crates/fluss/src/rpc/mod.rs                        |   4 +-
 crates/fluss/src/rpc/server_connection.rs          |  24 +-
 crates/fluss/tests/integration/admin.rs            |  32 ++
 .../fluss/tests/integration/table_remote_scan.rs   |  14 +-
 41 files changed, 908 insertions(+), 273 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 0cf0364..cdba9de 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -57,6 +57,7 @@ opendal = "0.55.0"
 url = "2.5.7"
 uuid = { version = "1.10", features = ["v4"] }
 tempfile = "3.23.0"
+snafu = "0.8.3"
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index fefab43..e185af8 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -29,7 +29,7 @@ use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
 use crate::rpc::{RpcClient, ServerConnection};
 
 use crate::BucketId;
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::proto::GetTableInfoResponse;
 use std::collections::HashMap;
 use std::slice::from_ref;
@@ -245,10 +245,10 @@ impl FlussAdmin {
         let mut results = HashMap::new();
 
         for response_future in response_futures {
-            let offsets = response_future.await.map_err(
-                // todo: consider use suitable error
-                |e| crate::error::Error::WriteError(format!("Fail to get 
result: {e}")),
-            )?;
+            let offsets = response_future.await.map_err(|e| 
Error::UnexpectedError {
+                message: "Fail to get result for list offsets.".to_string(),
+                source: Some(Box::new(e)),
+            })?;
             results.extend(offsets?);
         }
         Ok(results)
@@ -267,10 +267,11 @@ impl FlussAdmin {
         for bucket_id in buckets {
             let table_bucket = TableBucket::new(table_id, *bucket_id);
             let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
-                // todo: consider use another suitable error
-                crate::error::Error::InvalidTableError(format!(
-                    "No leader found for table bucket: table_id={table_id}, 
bucket_id={bucket_id}"
-                ))
+                // todo: consider retry?
+                Error::UnexpectedError {
+                    message: format!("No leader found for table bucket: 
{table_bucket}."),
+                    source: None,
+                }
             })?;
 
             node_for_bucket_list
@@ -301,10 +302,11 @@ impl FlussAdmin {
             let task = tokio::spawn(async move {
                 let cluster = metadata.get_cluster();
                 let tablet_server = 
cluster.get_tablet_server(leader_id).ok_or_else(|| {
-                    // todo: consider use more suitable error
-                    crate::error::Error::InvalidTableError(format!(
-                        "Tablet server {leader_id} not found"
-                    ))
+                    Error::LeaderNotAvailable {
+                        message: format!(
+                            "Tablet server {leader_id} is not found in 
metadata cache."
+                        ),
+                    }
                 })?;
                 let connection = 
rpc_client.get_connection(tablet_server).await?;
                 let list_offsets_response = connection.request(request).await?;
diff --git a/crates/fluss/src/client/credentials.rs 
b/crates/fluss/src/client/credentials.rs
index bd2a477..6b07d08 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -134,9 +134,10 @@ impl CredentialsCache {
             return Ok(HashMap::new());
         }
 
-        let credentials: Credentials = 
serde_json::from_slice(&response.token).map_err(|e| {
-            Error::JsonSerdeError(format!("Error when parse token from server: 
{e}"))
-        })?;
+        let credentials: Credentials =
+            serde_json::from_slice(&response.token).map_err(|e| 
Error::JsonSerdeError {
+                message: format!("Error when parse token from server: {e}"),
+            })?;
 
         let mut addition_infos = HashMap::new();
         for kv in &response.addition_info {
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index a2561f3..10273dd 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -100,15 +100,14 @@ impl RemoteLogDownloadFuture {
 
     /// Get the downloaded file path
     pub async fn get_file_path(&mut self) -> Result<PathBuf> {
-        let receiver = self
-            .receiver
-            .take()
-            .ok_or_else(|| Error::Io(io::Error::other("Download future already 
consumed")))?;
-
-        receiver.await.map_err(|e| {
-            Error::Io(io::Error::other(format!(
-                "Download future cancelled: {e:?}"
-            )))
+        let receiver = self.receiver.take().ok_or_else(|| 
Error::UnexpectedError {
+            message: "Downloaded file already consumed".to_string(),
+            source: None,
+        })?;
+
+        receiver.await.map_err(|e| Error::UnexpectedError {
+            message: format!("Download future cancelled: {e:?}"),
+            source: None,
         })?
     }
 }
@@ -234,13 +233,13 @@ impl RemoteLogDownloader {
             let read_future = op.read_with(relative_path).range(range.clone());
             let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
                 .await
-                .map_err(|_| {
-                    Error::Io(io::Error::new(
-                        io::ErrorKind::TimedOut,
-                        format!(
-                            "Timeout reading chunk from remote storage: 
{remote_path} at offset {offset}"
+                .map_err(|e| {
+                    Error::IoUnexpectedError {
+                        message: format!(
+                            "Timeout reading chunk from remote storage: 
{remote_path} at offset {offset}, exception: {e}."
                         ),
-                    ))
+                        source: io::ErrorKind::TimedOut.into(),
+                    }
                 })??;
             let bytes = chunk.to_bytes();
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index f66d7d7..1e70649 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -74,18 +74,20 @@ impl<'a> TableScan<'a> {
     /// ```
     pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
         if column_indices.is_empty() {
-            return Err(Error::IllegalArgument(
-                "Column indices cannot be empty".to_string(),
-            ));
+            return Err(Error::IllegalArgument {
+                message: "Column indices cannot be empty".to_string(),
+            });
         }
         let field_count = self.table_info.row_type().fields().len();
         for &idx in column_indices {
             if idx >= field_count {
-                return Err(Error::IllegalArgument(format!(
-                    "Column index {} out of range (max: {})",
-                    idx,
-                    field_count - 1
-                )));
+                return Err(Error::IllegalArgument {
+                    message: format!(
+                        "Column index {} out of range (max: {})",
+                        idx,
+                        field_count - 1
+                    ),
+                });
             }
         }
         self.projected_fields = Some(column_indices.to_vec());
@@ -106,9 +108,9 @@ impl<'a> TableScan<'a> {
     /// ```
     pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
         if column_names.is_empty() {
-            return Err(Error::IllegalArgument(
-                "Column names cannot be empty".to_string(),
-            ));
+            return Err(Error::IllegalArgument {
+                message: "Column names cannot be empty".to_string(),
+            });
         }
         let row_type = self.table_info.row_type();
         let mut indices = Vec::new();
@@ -118,7 +120,9 @@ impl<'a> TableScan<'a> {
                 .fields()
                 .iter()
                 .position(|f| f.name() == *name)
-                .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' 
not found")))?;
+                .ok_or_else(|| Error::IllegalArgument {
+                    message: format!("Column '{name}' not found"),
+                })?;
             indices.push(idx);
         }
 
@@ -277,7 +281,7 @@ impl LogFetcher {
                             // Download and process remote log segments
                             let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
                             let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download parallelly
+                            // todo: make segment download in parallel
                             for (i, segment) in
                                 
remote_fetch_info.remote_log_segments.iter().enumerate()
                             {
diff --git a/crates/fluss/src/client/write/mod.rs 
b/crates/fluss/src/client/write/mod.rs
index e632cde..cd33586 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -74,11 +74,17 @@ impl ResultHandle {
         self.receiver
             .receive()
             .await
-            .map_err(|e| Error::WriteError(e.to_string()))
+            .map_err(|e| Error::UnexpectedError {
+                message: format!("Fail to wait write result {e:?}"),
+                source: None,
+            })
     }
 
     pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
         // do nothing, just return empty result
-        batch_result.map_err(|e| Error::WriteError(e.to_string()))
+        batch_result.map_err(|e| Error::UnexpectedError {
+            message: format!("Fail to get write result {e:?}"),
+            source: None,
+        })
     }
 }
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 27460e3..462a846 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -17,7 +17,7 @@
 
 use crate::client::metadata::Metadata;
 use crate::client::{ReadyWriteBatch, RecordAccumulator};
-use crate::error::Error::WriteError;
+use crate::error::Error;
 use crate::error::Result;
 use crate::metadata::TableBucket;
 use crate::proto::ProduceLogResponse;
@@ -150,9 +150,12 @@ impl Sender {
 
         let cluster = self.metadata.get_cluster();
 
-        let destination_node = cluster
-            .get_tablet_server(destination)
-            .ok_or(WriteError(String::from("destination node not found")))?;
+        let destination_node =
+            cluster
+                .get_tablet_server(destination)
+                .ok_or(Error::LeaderNotAvailable {
+                    message: format!("destination node not found in metadata 
cache {destination}."),
+                })?;
         let connection = self.metadata.get_connection(destination_node).await?;
 
         for (table_id, write_batches) in write_batch_by_table {
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 28f5371..042859a 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -78,11 +78,12 @@ impl WriterClient {
 
     fn get_ack(config: &Config) -> Result<i16> {
         let acks = config.writer_acks.as_str();
-        if acks.eq("all") {
+        if acks.eq_ignore_ascii_case("all") {
             Ok(-1)
         } else {
-            acks.parse::<i16>()
-                .map_err(|e| Error::IllegalArgument(e.to_string()))
+            acks.parse::<i16>().map_err(|e| Error::IllegalArgument {
+                message: format!("invalid writer ack '{acks}': {e}"),
+            })
         }
     }
 
@@ -133,11 +134,17 @@ impl WriterClient {
         self.shutdown_tx
             .send(())
             .await
-            .map_err(|e| Error::WriteError(e.to_string()))?;
+            .map_err(|e| Error::UnexpectedError {
+                message: format!("Failed to close write client: {e:?}"),
+                source: None,
+            })?;
 
         self.sender_join_handle
             .await
-            .map_err(|e| Error::WriteError(e.to_string()))?;
+            .map_err(|e| Error::UnexpectedError {
+                message: format!("Failed to close write client: {e:?}"),
+                source: None,
+            })?;
         Ok(())
     }
 
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 63438b1..0f4b1b6 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -15,48 +15,137 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::rpc::RpcError;
+pub use crate::rpc::RpcError;
+pub use crate::rpc::{ApiError, FlussError};
+
 use arrow_schema::ArrowError;
+use snafu::Snafu;
 use std::{io, result};
-use thiserror::Error;
 
 pub type Result<T> = result::Result<T, Error>;
 
-#[derive(Debug, Error)]
+#[derive(Debug, Snafu)]
 pub enum Error {
-    #[error(transparent)]
-    Io(#[from] io::Error),
+    #[snafu(
+        whatever,
+        display("Fluss hitting unexpected error {}: {:?}", message, source)
+    )]
+    UnexpectedError {
+        message: String,
+        /// see https://github.com/shepmaster/snafu/issues/446
+        #[snafu(source(from(Box<dyn std::error::Error + Send + Sync + 
'static>, Some)))]
+        source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
+    },
+
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting unexpected io error {}: {:?}", message, source)
+    )]
+    IoUnexpectedError { message: String, source: io::Error },
+
+    #[snafu(
+        visibility(pub(crate)),
+        display(
+            "Fluss hitting remote storage unexpected error {}: {:?}",
+            message,
+            source
+        )
+    )]
+    RemoteStorageUnexpectedError {
+        message: String,
+        source: opendal::Error,
+    },
+
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting invalid table error {}.", message)
+    )]
+    InvalidTableError { message: String },
 
-    #[error("Invalid table")]
-    InvalidTableError(String),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting json serde error {}.", message)
+    )]
+    JsonSerdeError { message: String },
 
-    #[error("Json serde error")]
-    JsonSerdeError(String),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting unexpected rpc error {}: {:?}", message, source)
+    )]
+    RpcError { message: String, source: RpcError },
 
-    #[error("Rpc error")]
-    RpcError(#[from] RpcError),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting row convert error {}.", message)
+    )]
+    RowConvertError { message: String },
 
-    #[error("Row convert error")]
-    RowConvertError(String),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting Arrow error {}: {:?}.", message, source)
+    )]
+    ArrowError { message: String, source: ArrowError },
 
-    #[error("Arrow error: {0}")]
-    ArrowError(#[from] ArrowError),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting illegal argument error {}.", message)
+    )]
+    IllegalArgument { message: String },
 
-    #[error("Write error: {0}")]
-    WriteError(String),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting IO not supported error {}.", message)
+    )]
+    IoUnsupported { message: String },
 
-    #[error("Illegal argument error: {0}")]
-    IllegalArgument(String),
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting leader not available error {}.", message)
+    )]
+    LeaderNotAvailable { message: String },
 
-    #[error("IO not supported error: {0}")]
-    IoUnsupported(String),
+    #[snafu(visibility(pub(crate)), display("Fluss API Error: {}.", 
api_error))]
+    FlussAPIError { api_error: ApiError },
+}
 
-    #[error("IO operation failed on underlying storage: {0}")]
-    IoUnexpected(Box<opendal::Error>),
+impl From<ArrowError> for Error {
+    fn from(value: ArrowError) -> Self {
+        Error::ArrowError {
+            message: format!("{value}"),
+            source: value,
+        }
+    }
+}
+
+impl From<RpcError> for Error {
+    fn from(value: RpcError) -> Self {
+        Error::RpcError {
+            message: format!("{value}"),
+            source: value,
+        }
+    }
+}
+
+impl From<io::Error> for Error {
+    fn from(value: io::Error) -> Self {
+        Error::IoUnexpectedError {
+            message: format!("{value}"),
+            source: value,
+        }
+    }
 }
 
 impl From<opendal::Error> for Error {
-    fn from(err: opendal::Error) -> Self {
-        Error::IoUnexpected(Box::new(err))
+    fn from(value: opendal::Error) -> Self {
+        Error::RemoteStorageUnexpectedError {
+            message: format!("{value}"),
+            source: value,
+        }
+    }
+}
+
+impl From<ApiError> for Error {
+    fn from(value: ApiError) -> Self {
+        Error::FlussAPIError { api_error: value }
     }
 }
diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs
index ec3b87e..e7b026d 100644
--- a/crates/fluss/src/io/file_io.rs
+++ b/crates/fluss/src/io/file_io.rs
@@ -39,8 +39,9 @@ pub struct FileIO {
 impl FileIO {
     /// Try to infer file io scheme from path.
     pub fn from_url(path: &str) -> Result<FileIOBuilder> {
-        let url =
-            Url::parse(path).map_err(|_| 
Error::IllegalArgument(format!("Invalid URL: {path}")))?;
+        let url = Url::parse(path).map_err(|_| Error::IllegalArgument {
+            message: format!("Invalid URL: {path}"),
+        })?;
         Ok(FileIOBuilder::new(url.scheme()))
     }
 
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index 089670e..d90eaa5 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -44,9 +44,9 @@ impl Storage {
             Scheme::Fs => Ok(Self::LocalFs),
             #[cfg(feature = "storage-s3")]
             Scheme::S3 => Ok(Self::S3 { props }),
-            _ => Err(error::Error::IoUnsupported(
-                "Unsupported storage feature".to_string(),
-            )),
+            _ => Err(error::Error::IoUnsupported {
+                message: format!("Unsupported storage feature {scheme_str}"),
+            }),
         }
     }
 
diff --git a/crates/fluss/src/metadata/database.rs 
b/crates/fluss/src/metadata/database.rs
index 8eaa4d3..fad1498 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -148,8 +148,8 @@ impl JsonSerde for DatabaseDescriptor {
         if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
             let comment = comment_node
                 .as_str()
-                .ok_or_else(|| {
-                    JsonSerdeError(format!("{} should be a string", 
Self::COMMENT_NAME))
+                .ok_or_else(|| JsonSerdeError {
+                    message: format!("{} should be a string", 
Self::COMMENT_NAME),
                 })?
                 .to_owned();
             builder = builder.comment(&comment);
@@ -157,8 +157,8 @@ impl JsonSerde for DatabaseDescriptor {
 
         // Deserialize custom properties directly
         let custom_properties = if let Some(props_node) = 
node.get(Self::CUSTOM_PROPERTIES_NAME) {
-            let obj = props_node.as_object().ok_or_else(|| {
-                JsonSerdeError("Custom properties should be an 
object".to_string())
+            let obj = props_node.as_object().ok_or_else(|| JsonSerdeError {
+                message: "Custom properties should be an object".to_string(),
             })?;
 
             let mut properties = HashMap::with_capacity(obj.len());
@@ -167,8 +167,8 @@ impl JsonSerde for DatabaseDescriptor {
                     key.clone(),
                     value
                         .as_str()
-                        .ok_or_else(|| {
-                            JsonSerdeError("Property value should be a 
string".to_string())
+                        .ok_or_else(|| JsonSerdeError {
+                            message: "Property value should be a 
string".to_string(),
                         })?
                         .to_owned(),
                 );
@@ -186,16 +186,18 @@ impl JsonSerde for DatabaseDescriptor {
 impl DatabaseDescriptor {
     /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's 
fromJsonBytes)
     pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
-        let json_value: Value = serde_json::from_slice(bytes)
-            .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: 
{e}")))?;
+        let json_value: Value = serde_json::from_slice(bytes).map_err(|e| 
JsonSerdeError {
+            message: format!("Failed to parse JSON: {e}"),
+        })?;
         Self::deserialize_json(&json_value)
     }
 
     /// Convert DatabaseDescriptor to JSON bytes
     pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
         let json_value = self.serialize_json()?;
-        serde_json::to_vec(&json_value)
-            .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON: 
{e}")))
+        serde_json::to_vec(&json_value).map_err(|e| JsonSerdeError {
+            message: format!("Failed to serialize to JSON: {e}"),
+        })
     }
 }
 
diff --git a/crates/fluss/src/metadata/json_serde.rs 
b/crates/fluss/src/metadata/json_serde.rs
index 447b0f9..7d94e19 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::Error::{InvalidTableError, JsonSerdeError};
-use crate::error::Result;
+use crate::error::Error::JsonSerdeError;
+use crate::error::{Error, Result};
 use crate::metadata::datatype::{DataField, DataType, DataTypes};
 use crate::metadata::table::{Column, Schema, TableDescriptor};
 use serde_json::{Value, json};
@@ -166,11 +166,11 @@ impl JsonSerde for DataType {
         let type_root = node
             .get(Self::FIELD_NAME_TYPE_NAME)
             .and_then(|v| v.as_str())
-            .ok_or_else(|| {
-                JsonSerdeError(format!(
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!(
                     "Couldn't find field {} while deserializing datatype.",
                     Self::FIELD_NAME_TYPE_NAME
-                ))
+                ),
             })?;
 
         let mut data_type = match type_root {
@@ -185,11 +185,8 @@ impl JsonSerde for DataType {
                 let length = node
                     .get(Self::FIELD_NAME_LENGTH)
                     .and_then(|v| v.as_u64())
-                    .ok_or_else(|| {
-                        JsonSerdeError(format!(
-                            "Missing required field: {}",
-                            Self::FIELD_NAME_LENGTH
-                        ))
+                    .ok_or_else(|| Error::JsonSerdeError {
+                        message: format!("Missing required field: {}", 
Self::FIELD_NAME_LENGTH),
                     })? as u32;
                 DataTypes::char(length)
             }
@@ -198,11 +195,8 @@ impl JsonSerde for DataType {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
-                    .ok_or_else(|| {
-                        JsonSerdeError(format!(
-                            "Missing required field: {}",
-                            Self::FIELD_NAME_PRECISION
-                        ))
+                    .ok_or_else(|| Error::JsonSerdeError {
+                        message: format!("Missing required field: {}", 
Self::FIELD_NAME_PRECISION),
                     })? as u32;
                 let scale = node
                     .get(Self::FIELD_NAME_SCALE)
@@ -243,43 +237,46 @@ impl JsonSerde for DataType {
             "ARRAY" => {
                 let element_type_node =
                     node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| {
-                        JsonSerdeError(format!(
-                            "Missing required field: {}",
-                            Self::FIELD_NAME_ELEMENT_TYPE
-                        ))
+                        Error::JsonSerdeError {
+                            message: format!(
+                                "Missing required field: {}",
+                                Self::FIELD_NAME_ELEMENT_TYPE
+                            ),
+                        }
                     })?;
                 let element_type = 
DataType::deserialize_json(element_type_node)?;
                 DataTypes::array(element_type)
             }
             "MAP" => {
-                let key_type_node = 
node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| {
-                    JsonSerdeError(format!(
-                        "Missing required field: {}",
-                        Self::FIELD_NAME_KEY_TYPE
-                    ))
-                })?;
+                let key_type_node =
+                    node.get(Self::FIELD_NAME_KEY_TYPE)
+                        .ok_or_else(|| Error::JsonSerdeError {
+                            message: format!(
+                                "Missing required field: {}",
+                                Self::FIELD_NAME_KEY_TYPE
+                            ),
+                        })?;
                 let key_type = DataType::deserialize_json(key_type_node)?;
-                let value_type_node = 
node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| {
-                    JsonSerdeError(format!(
-                        "Missing required field: {}",
-                        Self::FIELD_NAME_VALUE_TYPE
-                    ))
-                })?;
+                let value_type_node =
+                    node.get(Self::FIELD_NAME_VALUE_TYPE)
+                        .ok_or_else(|| Error::JsonSerdeError {
+                            message: format!(
+                                "Missing required field: {}",
+                                Self::FIELD_NAME_VALUE_TYPE
+                            ),
+                        })?;
                 let value_type = DataType::deserialize_json(value_type_node)?;
                 DataTypes::map(key_type, value_type)
             }
             "ROW" => {
                 let fields_node = node
                     .get(Self::FIELD_NAME_FIELDS)
-                    .ok_or_else(|| {
-                        JsonSerdeError(format!(
-                            "Missing required field: {}",
-                            Self::FIELD_NAME_FIELDS
-                        ))
+                    .ok_or_else(|| Error::JsonSerdeError {
+                        message: format!("Missing required field: {}", 
Self::FIELD_NAME_FIELDS),
                     })?
                     .as_array()
-                    .ok_or_else(|| {
-                        JsonSerdeError(format!("{} must be an array", 
Self::FIELD_NAME_FIELDS))
+                    .ok_or_else(|| Error::JsonSerdeError {
+                        message: format!("{} must be an array", 
Self::FIELD_NAME_FIELDS),
                     })?;
                 let mut fields = Vec::with_capacity(fields_node.len());
                 for field_node in fields_node {
@@ -287,7 +284,11 @@ impl JsonSerde for DataType {
                 }
                 DataTypes::row(fields)
             }
-            _ => return Err(JsonSerdeError(format!("Unknown type root: 
{type_root}"))),
+            _ => {
+                return Err(Error::JsonSerdeError {
+                    message: format!("Unknown type root: {type_root}"),
+                });
+            }
         };
 
         if let Some(nullable) = node.get(Self::FIELD_NAME_NULLABLE) {
@@ -327,12 +328,16 @@ impl JsonSerde for DataField {
         let name = node
             .get(Self::NAME)
             .and_then(|v| v.as_str())
-            .ok_or_else(|| JsonSerdeError(format!("Missing required field: 
{}", Self::NAME)))?
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("Missing required field: {}", Self::NAME),
+            })?
             .to_string();
 
-        let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| {
-            JsonSerdeError(format!("Missing required field: {}", 
Self::FIELD_TYPE))
-        })?;
+        let field_type_node = node
+            .get(Self::FIELD_TYPE)
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("Missing required field: {}", 
Self::FIELD_TYPE),
+            })?;
 
         let data_type = DataType::deserialize_json(field_type_node)?;
 
@@ -373,12 +378,16 @@ impl JsonSerde for Column {
         let name = node
             .get(Self::NAME)
             .and_then(|v| v.as_str())
-            .ok_or_else(|| JsonSerdeError(format!("Missing required field: 
{}", Self::NAME)))?
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("Missing required field: {}", Self::NAME),
+            })?
             .to_string();
 
-        let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| {
-            JsonSerdeError(format!("Missing required field: {}", 
Self::DATA_TYPE))
-        })?;
+        let data_type_node = node
+            .get(Self::DATA_TYPE)
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("Missing required field: {}", 
Self::DATA_TYPE),
+            })?;
 
         let data_type = DataType::deserialize_json(data_type_node)?;
 
@@ -429,11 +438,13 @@ impl JsonSerde for Schema {
     fn deserialize_json(node: &Value) -> Result<Schema> {
         let columns_node = node
             .get(Self::COLUMNS_NAME)
-            .ok_or_else(|| {
-                JsonSerdeError(format!("Missing required field: {}", 
Self::COLUMNS_NAME))
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("Missing required field: {}", 
Self::COLUMNS_NAME),
             })?
             .as_array()
-            .ok_or_else(|| JsonSerdeError(format!("{} must be an array", 
Self::COLUMNS_NAME)))?;
+            .ok_or_else(|| Error::JsonSerdeError {
+                message: format!("{} must be an array", Self::COLUMNS_NAME),
+            })?;
 
         let mut columns = Vec::with_capacity(columns_node.len());
         for col_node in columns_node {
@@ -443,17 +454,17 @@ impl JsonSerde for Schema {
         let mut schema_builder = Schema::builder().with_columns(columns);
 
         if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
-            let pk_array = pk_node
-                .as_array()
-                .ok_or_else(|| InvalidTableError("Primary key must be an 
array".to_string()))?;
+            let pk_array = pk_node.as_array().ok_or_else(|| 
Error::InvalidTableError {
+                message: "Primary key must be an array".to_string(),
+            })?;
 
             let mut primary_keys = Vec::with_capacity(pk_array.len());
             for name_node in pk_array {
                 primary_keys.push(
                     name_node
                         .as_str()
-                        .ok_or_else(|| {
-                            InvalidTableError("Primary key element must be a 
string".to_string())
+                        .ok_or_else(|| Error::InvalidTableError {
+                            message: "Primary key element must be a 
string".to_string(),
                         })?
                         .to_string(),
                 );
@@ -478,9 +489,9 @@ impl TableDescriptor {
     const VERSION: u32 = 1;
 
     fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>> 
{
-        let obj = node
-            .as_object()
-            .ok_or_else(|| JsonSerdeError("Properties must be an 
object".to_string()))?;
+        let obj = node.as_object().ok_or_else(|| Error::JsonSerdeError {
+            message: "Properties must be an object".to_string(),
+        })?;
 
         let mut properties = HashMap::with_capacity(obj.len());
         for (key, value) in obj {
@@ -488,7 +499,9 @@ impl TableDescriptor {
                 key.clone(),
                 value
                     .as_str()
-                    .ok_or_else(|| JsonSerdeError("Property value must be a 
string".to_string()))?
+                    .ok_or_else(|| Error::JsonSerdeError {
+                        message: "Property value must be a string".to_string(),
+                    })?
                     .to_owned(),
             );
         }
@@ -545,8 +558,8 @@ impl JsonSerde for TableDescriptor {
         let mut builder = TableDescriptor::builder();
 
         // Deserialize schema
-        let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(|| {
-            JsonSerdeError(format!("Missing required field: {}", 
Self::SCHEMA_NAME))
+        let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(|| 
JsonSerdeError {
+            message: format!("Missing required field: {}", Self::SCHEMA_NAME),
         })?;
         let schema = Schema::deserialize_json(schema_node)?;
         builder = builder.schema(schema);
@@ -555,22 +568,21 @@ impl JsonSerde for TableDescriptor {
         if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
             let comment = comment_node
                 .as_str()
-                .ok_or_else(|| JsonSerdeError(format!("{} must be a string", 
Self::COMMENT_NAME)))?
+                .ok_or_else(|| JsonSerdeError {
+                    message: format!("{} must be a string", 
Self::COMMENT_NAME),
+                })?
                 .to_owned();
             builder = builder.comment(comment.as_str());
         }
 
         let partition_node = node
             .get(Self::PARTITION_KEY_NAME)
-            .ok_or_else(|| {
-                JsonSerdeError(format!(
-                    "Missing required field: {}",
-                    Self::PARTITION_KEY_NAME
-                ))
+            .ok_or_else(|| JsonSerdeError {
+                message: format!("Missing required field: {}", 
Self::PARTITION_KEY_NAME),
             })?
             .as_array()
-            .ok_or_else(|| {
-                JsonSerdeError(format!("{} must be an array", 
Self::PARTITION_KEY_NAME))
+            .ok_or_else(|| JsonSerdeError {
+                message: format!("{} must be an array", 
Self::PARTITION_KEY_NAME),
             })?;
 
         let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -578,11 +590,8 @@ impl JsonSerde for TableDescriptor {
             partition_keys.push(
                 key_node
                     .as_str()
-                    .ok_or_else(|| {
-                        JsonSerdeError(format!(
-                            "{} element must be a string",
-                            Self::PARTITION_KEY_NAME
-                        ))
+                    .ok_or_else(|| JsonSerdeError {
+                        message: format!("{} element must be a string", 
Self::PARTITION_KEY_NAME),
                     })?
                     .to_owned(),
             );
@@ -592,15 +601,17 @@ impl JsonSerde for TableDescriptor {
         let mut bucket_count = None;
         let mut bucket_keys = vec![];
         if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
-            let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
-                JsonSerdeError(format!("{} must be an array", 
Self::BUCKET_KEY_NAME))
+            let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| 
JsonSerdeError {
+                message: format!("{} must be an array", Self::BUCKET_KEY_NAME),
             })?;
 
             for key_node in bucket_key_node {
                 bucket_keys.push(
                     key_node
                         .as_str()
-                        .ok_or_else(|| JsonSerdeError("Bucket key must be a 
string".to_string()))?
+                        .ok_or_else(|| JsonSerdeError {
+                            message: "Bucket key must be a string".to_string(),
+                        })?
                         .to_owned(),
                 );
             }
@@ -617,18 +628,18 @@ impl JsonSerde for TableDescriptor {
         // Deserialize properties
         let properties =
             
Self::deserialize_properties(node.get(Self::PROPERTIES_NAME).ok_or_else(|| {
-                JsonSerdeError(format!("Missing required field: {}", 
Self::PROPERTIES_NAME))
+                JsonSerdeError {
+                    message: format!("Missing required field: {}", 
Self::PROPERTIES_NAME),
+                }
             })?)?;
         builder = builder.properties(properties);
 
         // Deserialize custom properties
         let custom_properties = Self::deserialize_properties(
-            node.get(Self::CUSTOM_PROPERTIES_NAME).ok_or_else(|| {
-                JsonSerdeError(format!(
-                    "Missing required field: {}",
-                    Self::CUSTOM_PROPERTIES_NAME
-                ))
-            })?,
+            node.get(Self::CUSTOM_PROPERTIES_NAME)
+                .ok_or_else(|| JsonSerdeError {
+                    message: format!("Missing required field: {}", 
Self::CUSTOM_PROPERTIES_NAME),
+                })?,
         )?;
         builder = builder.custom_properties(custom_properties);
 
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 751dd6d..770c4f2 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::error::Error::InvalidTableError;
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::datatype::{DataField, DataType, RowType};
 use core::fmt;
 use serde::{Deserialize, Serialize};
@@ -220,9 +220,9 @@ impl SchemaBuilder {
     ) -> Result<Vec<Column>> {
         let names: Vec<_> = columns.iter().map(|c| &c.name).collect();
         if let Some(duplicates) = Self::find_duplicates(&names) {
-            return Err(InvalidTableError(format!(
-                "Duplicate column names found: {duplicates:?}"
-            )));
+            return Err(InvalidTableError {
+                message: format!("Duplicate column names found: 
{duplicates:?}"),
+            });
         }
 
         let Some(pk) = primary_key else {
@@ -232,9 +232,9 @@ impl SchemaBuilder {
         let pk_set: HashSet<_> = pk.column_names.iter().collect();
         let all_columns: HashSet<_> = columns.iter().map(|c| 
&c.name).collect();
         if !pk_set.is_subset(&all_columns) {
-            return Err(InvalidTableError(format!(
-                "Primary key columns {pk_set:?} not found in schema"
-            )));
+            return Err(InvalidTableError {
+                message: format!("Primary key columns {pk_set:?} not found in 
schema"),
+            });
         }
 
         Ok(columns
@@ -441,12 +441,12 @@ impl TableDescriptor {
     pub fn replication_factor(&self) -> Result<i32> {
         self.properties
             .get("table.replication.factor")
-            .ok_or(InvalidTableError(
-                "Replication factor is not set".to_string(),
-            ))?
+            .ok_or_else(|| InvalidTableError {
+                message: "Replication factor is not set".to_string(),
+            })?
             .parse()
-            .map_err(|_e| {
-                InvalidTableError("Replication factor can't be convert into 
int".to_string())
+            .map_err(|_e| InvalidTableError {
+                message: "Replication factor can't be convert into 
int".to_string(),
             })
     }
 
@@ -497,11 +497,13 @@ impl TableDescriptor {
         bucket_keys.retain(|k| !partition_keys.contains(k));
 
         if bucket_keys.is_empty() {
-            return Err(InvalidTableError(format!(
-                "Primary Key constraint {:?} should not be same with partition 
fields {:?}.",
-                schema.primary_key().unwrap().column_names(),
-                partition_keys
-            )));
+            return Err(Error::InvalidTableError {
+                message: format!(
+                    "Primary Key constraint {:?} should not be same with 
partition fields {:?}.",
+                    schema.primary_key().unwrap().column_names(),
+                    partition_keys
+                ),
+            });
         }
 
         Ok(bucket_keys)
@@ -518,10 +520,12 @@ impl TableDescriptor {
                 .iter()
                 .any(|k| partition_keys.contains(k))
             {
-                return Err(InvalidTableError(format!(
-                    "Bucket key {:?} shouldn't include any column in partition 
keys {:?}.",
-                    distribution.bucket_keys, partition_keys
-                )));
+                return Err(InvalidTableError {
+                    message: format!(
+                        "Bucket key {:?} shouldn't include any column in 
partition keys {:?}.",
+                        distribution.bucket_keys, partition_keys
+                    ),
+                });
             }
 
             return if let Some(pk) = schema.primary_key() {
@@ -540,13 +544,15 @@ impl TableDescriptor {
                         .iter()
                         .all(|k| pk_columns.contains(k))
                     {
-                        return Err(InvalidTableError(format!(
-                            "Bucket keys must be a subset of primary keys 
excluding partition keys for primary-key tables. \
-                            The primary keys are {:?}, the partition keys are 
{:?}, but the user-defined bucket keys are {:?}.",
-                            pk.column_names(),
-                            partition_keys,
-                            distribution.bucket_keys
-                        )));
+                        return Err(InvalidTableError {
+                            message: format!(
+                                "Bucket keys must be a subset of primary keys 
excluding partition keys for primary-key tables. \
+                                The primary keys are {:?}, the partition keys 
are {:?}, but the user-defined bucket keys are {:?}.",
+                                pk.column_names(),
+                                partition_keys,
+                                distribution.bucket_keys
+                            ),
+                        });
                     }
                     Ok(Some(distribution))
                 }
@@ -589,7 +595,9 @@ impl LogFormat {
         match s.to_uppercase().as_str() {
             "ARROW" => Ok(LogFormat::ARROW),
             "INDEXED" => Ok(LogFormat::INDEXED),
-            _ => Err(InvalidTableError(format!("Unknown log format: {s}"))),
+            _ => Err(InvalidTableError {
+                message: format!("Unknown log format: {s}"),
+            }),
         }
     }
 }
@@ -615,7 +623,9 @@ impl KvFormat {
         match s.to_uppercase().as_str() {
             "INDEXED" => Ok(KvFormat::INDEXED),
             "COMPACTED" => Ok(KvFormat::COMPACTED),
-            _ => Err(InvalidTableError(format!("Unknown kv format: {s}"))),
+            _ => Err(Error::InvalidTableError {
+                message: format!("Unknown kv format: {s}"),
+            }),
         }
     }
 }
@@ -961,6 +971,24 @@ impl TableBucket {
     }
 }
 
+impl Display for TableBucket {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        if let Some(partition_id) = self.partition_id {
+            write!(
+                f,
+                "TableBucket(table_id={}, partition_id={}, bucket={})",
+                self.table_id, partition_id, self.bucket
+            )
+        } else {
+            write!(
+                f,
+                "TableBucket(table_id={}, bucket={})",
+                self.table_id, self.bucket
+            )
+        }
+    }
+}
+
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct LakeSnapshot {
     pub snapshot_id: i64,
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index e59c2d9..dbbb45d 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -19,6 +19,11 @@ syntax = "proto2";
 
 package proto;
 
+message ErrorResponse {
+  required int32 error_code = 1;
+  optional string error_message = 2;
+}
+
 // metadata request and response, request send from client to each server.
 message MetadataRequest {
   repeated PbTablePath table_path = 1;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 6e8cb55..9295713 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -844,10 +844,7 @@ impl ReadContext {
     }
 
     pub fn record_batch_for_remote_log(&self, data: &[u8]) -> 
Result<Option<RecordBatch>> {
-        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
-            Some(result) => result,
-            None => return Ok(None),
-        };
+        let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
 
         let record_batch = read_record_batch(
             &body_buffer,
@@ -1086,13 +1083,17 @@ mod tests {
         let result = parse_ipc_message(empty_body);
         assert_eq!(
             result.unwrap_err().to_string(),
-            String::from("Arrow error: Parser error: Range [0, 4) is out of 
bounds.\n\n")
+            String::from(
+                "Fluss hitting Arrow error Parser error: Range [0, 4) is out 
of bounds.\n\n: ParseError(\"Range [0, 4) is out of bounds.\\n\\n\")."
+            )
         );
 
         let invalid_data = &[];
         assert_eq!(
             parse_ipc_message(invalid_data).unwrap_err().to_string(),
-            String::from("Arrow error: Parser error: Invalid data length: 0")
+            String::from(
+                "Fluss hitting Arrow error Parser error: Invalid data length: 
0: ParseError(\"Invalid data length: 0\")."
+            )
         );
 
         let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 
0x00000000]);
@@ -1100,7 +1101,9 @@ mod tests {
             parse_ipc_message(data_with_invalid_continuation)
                 .unwrap_err()
                 .to_string(),
-            String::from("Arrow error: Parser error: Invalid continuation 
marker: 1")
+            String::from(
+                "Fluss hitting Arrow error Parser error: Invalid continuation 
marker: 1: ParseError(\"Invalid continuation marker: 1\")."
+            )
         );
 
         let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 
0x00000001]);
@@ -1109,8 +1112,7 @@ mod tests {
                 .unwrap_err()
                 .to_string(),
             String::from(
-                "Arrow error: Parser error: Invalid data length. \
-                   Remaining data length 0 is shorter than specified size 1"
+                "Fluss hitting Arrow error Parser error: Invalid data length. 
Remaining data length 0 is shorter than specified size 1: ParseError(\"Invalid 
data length. Remaining data length 0 is shorter than specified size 1\")."
             )
         );
 
@@ -1119,7 +1121,9 @@ mod tests {
             parse_ipc_message(data_with_invalid_length)
                 .unwrap_err()
                 .to_string(),
-            String::from("Arrow error: Parser error: Not a record batch")
+            String::from(
+                "Fluss hitting Arrow error Parser error: Not a record batch: 
ParseError(\"Not a record batch\")."
+            )
         );
     }
 
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 6929b57..1ea3933 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -290,18 +290,22 @@ impl Datum<'_> {
             Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
             Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | 
Datum::TimestampTz(_) => {
-                return Err(RowConvertError(format!(
-                    "Type {:?} is not yet supported for Arrow conversion",
-                    std::mem::discriminant(self)
-                )));
+                return Err(RowConvertError {
+                    message: format!(
+                        "Type {:?} is not yet supported for Arrow conversion",
+                        std::mem::discriminant(self)
+                    ),
+                });
             }
         }
 
-        Err(RowConvertError(format!(
-            "Cannot append {:?} to builder of type {}",
-            self,
-            std::any::type_name_of_val(builder)
-        )))
+        Err(RowConvertError {
+            message: format!(
+                "Cannot append {:?} to builder of type {}",
+                self,
+                std::any::type_name_of_val(builder)
+            ),
+        })
     }
 }
 
@@ -313,11 +317,13 @@ macro_rules! impl_to_arrow {
                     b.append_value(*self);
                     Ok(())
                 } else {
-                    Err(RowConvertError(format!(
-                        "Cannot cast {} to {} builder",
-                        stringify!($ty),
-                        stringify!($variant)
-                    )))
+                    Err(RowConvertError {
+                        message: format!(
+                            "Cannot cast {} to {} builder",
+                            stringify!($ty),
+                            stringify!($variant)
+                        ),
+                    })
                 }
             }
         }
diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs
index 84b20b1..da3a11e 100644
--- a/crates/fluss/src/rpc/error.rs
+++ b/crates/fluss/src/rpc/error.rs
@@ -17,6 +17,7 @@
 
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
+use prost::DecodeError;
 use std::sync::Arc;
 use thiserror::Error;
 
@@ -29,6 +30,9 @@ pub enum RpcError {
     #[error("Cannot read framed message: {0}")]
     ReadMessageError(#[from] crate::rpc::frame::ReadError),
 
+    #[error("Rpc Decode Error: {0}")]
+    RpcDecodeError(#[from] DecodeError),
+
     #[error("connection error")]
     ConnectionError(String),
 
diff --git a/crates/fluss/src/rpc/fluss_api_error.rs 
b/crates/fluss/src/rpc/fluss_api_error.rs
new file mode 100644
index 0000000..b26eb72
--- /dev/null
+++ b/crates/fluss/src/rpc/fluss_api_error.rs
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot does not 
exist.",
+            FlussError::PartitionAlreadyExists => "The partition already 
exists.",
+            FlussError::PartitionSpecInvalidException => "The partition spec 
is invalid.",
+            FlussError::LeaderNotAvailableException => {
+                "There is no currently available leader for the given 
partition."
+            }
+            FlussError::PartitionMaxNumException => "Exceed the maximum number 
of partitions.",
+            FlussError::AuthenticateException => "Authentication failed.",
+            FlussError::SecurityDisabledException => "Security is disabled.",
+            FlussError::AuthorizationException => "Authorization failed.",
+            FlussError::BucketMaxNumException => "Exceed the maximum number of 
buckets.",
+            FlussError::FencedTieringEpochException => "The tiering epoch is 
invalid.",
+            FlussError::RetriableAuthenticateException => {
+                "Authentication failed with retriable exception."
+            }
+            FlussError::InvalidServerRackInfoException => "The server rack 
info is invalid.",
+            FlussError::LakeSnapshotNotExist => "The lake snapshot does not 
exist.",
+            FlussError::LakeTableAlreadyExist => "The lake table already 
exists.",
+            FlussError::IneligibleReplicaException => {
+                "The new ISR contains at least one ineligible replica."
+            }
+            FlussError::InvalidAlterTableException => "The alter table is 
invalid.",
+            FlussError::DeletionDisabledException => {
+                "Deletion operations are disabled on this table."
+            }
+        }
+    }
+
+    /// Create an ApiError from this error with the default message.
+    pub fn to_api_error(&self, message: Option<String>) -> ApiError {
+        ApiError {
+            code: self.code(),
+            message: message.unwrap_or(self.message().to_string()),
+        }
+    }
+
+    /// Get the FlussError for the given error code.
+    /// Returns `UnknownServerError` if the code is not recognized.
+    pub fn for_code(code: i32) -> Self {
+        match code {
+            -1 => FlussError::UnknownServerError,
+            0 => FlussError::None,
+            1 => FlussError::NetworkException,
+            2 => FlussError::UnsupportedVersion,
+            3 => FlussError::CorruptMessage,
+            4 => FlussError::DatabaseNotExist,
+            5 => FlussError::DatabaseNotEmpty,
+            6 => FlussError::DatabaseAlreadyExist,
+            7 => FlussError::TableNotExist,
+            8 => FlussError::TableAlreadyExist,
+            9 => FlussError::SchemaNotExist,
+            10 => FlussError::LogStorageException,
+            11 => FlussError::KvStorageException,
+            12 => FlussError::NotLeaderOrFollower,
+            13 => FlussError::RecordTooLargeException,
+            14 => FlussError::CorruptRecordException,
+            15 => FlussError::InvalidTableException,
+            16 => FlussError::InvalidDatabaseException,
+            17 => FlussError::InvalidReplicationFactor,
+            18 => FlussError::InvalidRequiredAcks,
+            19 => FlussError::LogOffsetOutOfRangeException,
+            20 => FlussError::NonPrimaryKeyTableException,
+            21 => FlussError::UnknownTableOrBucketException,
+            22 => FlussError::InvalidUpdateVersionException,
+            23 => FlussError::InvalidCoordinatorException,
+            24 => FlussError::FencedLeaderEpochException,
+            25 => FlussError::RequestTimeOut,
+            26 => FlussError::StorageException,
+            27 => FlussError::OperationNotAttemptedException,
+            28 => FlussError::NotEnoughReplicasAfterAppendException,
+            29 => FlussError::NotEnoughReplicasException,
+            30 => FlussError::SecurityTokenException,
+            31 => FlussError::OutOfOrderSequenceException,
+            32 => FlussError::DuplicateSequenceException,
+            33 => FlussError::UnknownWriterIdException,
+            34 => FlussError::InvalidColumnProjection,
+            35 => FlussError::InvalidTargetColumn,
+            36 => FlussError::PartitionNotExists,
+            37 => FlussError::TableNotPartitionedException,
+            38 => FlussError::InvalidTimestampException,
+            39 => FlussError::InvalidConfigException,
+            40 => FlussError::LakeStorageNotConfiguredException,
+            41 => FlussError::KvSnapshotNotExist,
+            42 => FlussError::PartitionAlreadyExists,
+            43 => FlussError::PartitionSpecInvalidException,
+            44 => FlussError::LeaderNotAvailableException,
+            45 => FlussError::PartitionMaxNumException,
+            46 => FlussError::AuthenticateException,
+            47 => FlussError::SecurityDisabledException,
+            48 => FlussError::AuthorizationException,
+            49 => FlussError::BucketMaxNumException,
+            50 => FlussError::FencedTieringEpochException,
+            51 => FlussError::RetriableAuthenticateException,
+            52 => FlussError::InvalidServerRackInfoException,
+            53 => FlussError::LakeSnapshotNotExist,
+            54 => FlussError::LakeTableAlreadyExist,
+            55 => FlussError::IneligibleReplicaException,
+            56 => FlussError::InvalidAlterTableException,
+            57 => FlussError::DeletionDisabledException,
+            _ => FlussError::UnknownServerError,
+        }
+    }
+}
+
+impl Display for FlussError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.message())
+    }
+}
+
+impl From<ErrorResponse> for ApiError {
+    fn from(error_response: ErrorResponse) -> Self {
+        let fluss_error = FlussError::for_code(error_response.error_code);
+        fluss_error.to_api_error(error_response.error_message)
+    }
+}
+
+impl From<ApiError> for FlussError {
+    fn from(api_error: ApiError) -> Self {
+        FlussError::for_code(api_error.code)
+    }
+}
diff --git a/crates/fluss/src/rpc/frame.rs b/crates/fluss/src/rpc/frame.rs
index 44dadc9..81cc094 100644
--- a/crates/fluss/src/rpc/frame.rs
+++ b/crates/fluss/src/rpc/frame.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use prost::DecodeError;
 use thiserror::Error;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 
@@ -29,6 +30,9 @@ pub enum ReadError {
 
     #[error("Message too large, limit is {limit} bytes but got {actual} 
bytes")]
     MessageTooLarge { limit: usize, actual: usize },
+
+    #[error("Fail to decode error response: {0}")]
+    ProtoErrorResponseDecodeError(#[from] DecodeError),
 }
 
 pub trait AsyncMessageRead {
diff --git a/crates/fluss/src/rpc/message/create_database.rs 
b/crates/fluss/src/rpc/message/create_database.rs
index e4052ef..7d24235 100644
--- a/crates/fluss/src/rpc/message/create_database.rs
+++ b/crates/fluss/src/rpc/message/create_database.rs
@@ -22,7 +22,8 @@ use crate::error::Result as FlussResult;
 use crate::proto::CreateDatabaseResponse;
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/create_table.rs 
b/crates/fluss/src/rpc/message/create_table.rs
index 5802e71..69865b8 100644
--- a/crates/fluss/src/rpc/message/create_table.rs
+++ b/crates/fluss/src/rpc/message/create_table.rs
@@ -23,7 +23,8 @@ use crate::proto::CreateTableResponse;
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
 use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/database_exists.rs 
b/crates/fluss/src/rpc/message/database_exists.rs
index 795eea1..7e717a4 100644
--- a/crates/fluss/src/rpc/message/database_exists.rs
+++ b/crates/fluss/src/rpc/message/database_exists.rs
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/drop_database.rs 
b/crates/fluss/src/rpc/message/drop_database.rs
index 49cbfaf..663e970 100644
--- a/crates/fluss/src/rpc/message/drop_database.rs
+++ b/crates/fluss/src/rpc/message/drop_database.rs
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/drop_table.rs 
b/crates/fluss/src/rpc/message/drop_table.rs
index 0dbc21b..a2b3f2d 100644
--- a/crates/fluss/src/rpc/message/drop_table.rs
+++ b/crates/fluss/src/rpc/message/drop_table.rs
@@ -19,10 +19,12 @@ use crate::metadata::TablePath;
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 
 use crate::proto::DropTableResponse;
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
 use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/fetch.rs 
b/crates/fluss/src/rpc/message/fetch.rs
index 6ebc5a2..1587606 100644
--- a/crates/fluss/src/rpc/message/fetch.rs
+++ b/crates/fluss/src/rpc/message/fetch.rs
@@ -16,9 +16,11 @@
 // under the License.
 
 use crate::proto::FetchLogResponse;
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use prost::Message;
diff --git a/crates/fluss/src/rpc/message/get_database_info.rs 
b/crates/fluss/src/rpc/message/get_database_info.rs
index 85492a8..6468beb 100644
--- a/crates/fluss/src/rpc/message/get_database_info.rs
+++ b/crates/fluss/src/rpc/message/get_database_info.rs
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs 
b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
index a0e186e..a632a15 100644
--- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
+++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
@@ -19,10 +19,12 @@ use crate::proto;
 use crate::proto::PbTablePath;
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
 use crate::{impl_read_version_type, impl_write_version_type};
 use bytes::{Buf, BufMut};
 use prost::Message;
diff --git a/crates/fluss/src/rpc/message/get_table.rs 
b/crates/fluss/src/rpc/message/get_table.rs
index 4f4d6c7..61657f7 100644
--- a/crates/fluss/src/rpc/message/get_table.rs
+++ b/crates/fluss/src/rpc/message/get_table.rs
@@ -18,10 +18,12 @@
 use crate::proto::{GetTableInfoRequest, GetTableInfoResponse, PbTablePath};
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
 use crate::{impl_read_version_type, impl_write_version_type};
 use bytes::{Buf, BufMut};
 use prost::Message;
diff --git a/crates/fluss/src/rpc/message/header.rs 
b/crates/fluss/src/rpc/message/header.rs
index fe60f8c..77bda7c 100644
--- a/crates/fluss/src/rpc/message/header.rs
+++ b/crates/fluss/src/rpc/message/header.rs
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::proto::ErrorResponse;
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
 use crate::rpc::frame::{ReadError, WriteError};
 use crate::rpc::message::{ReadVersionedType, WriteVersionedType};
 use bytes::{Buf, BufMut};
+use prost::Message;
 
 #[allow(dead_code)]
 const REQUEST_HEADER_LENGTH: i32 = 8;
@@ -53,9 +55,10 @@ where
     }
 }
 
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq)]
 pub struct ResponseHeader {
     pub request_id: i32,
+    pub error_response: Option<ErrorResponse>,
 }
 
 impl<R> ReadVersionedType<R> for ResponseHeader
@@ -64,10 +67,17 @@ where
 {
     fn read_versioned(reader: &mut R, _version: ApiVersion) -> Result<Self, 
ReadError> {
         let resp_type = reader.get_u8();
+        let request_id = reader.get_i32();
         if resp_type != SUCCESS_RESPONSE {
-            todo!("handle unsuccess response type");
+            let error_response = ErrorResponse::decode(reader)?;
+            return Ok(ResponseHeader {
+                request_id,
+                error_response: Some(error_response),
+            });
         }
-        let request_id = reader.get_i32();
-        Ok(ResponseHeader { request_id })
+        Ok(ResponseHeader {
+            request_id,
+            error_response: None,
+        })
     }
 }
diff --git a/crates/fluss/src/rpc/message/list_databases.rs 
b/crates/fluss/src/rpc/message/list_databases.rs
index ce5a091..83226ab 100644
--- a/crates/fluss/src/rpc/message/list_databases.rs
+++ b/crates/fluss/src/rpc/message/list_databases.rs
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs 
b/crates/fluss/src/rpc/message/list_offsets.rs
index 500db33..9ab1f14 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -20,9 +20,11 @@ use crate::{impl_read_version_type, impl_write_version_type, 
proto};
 use crate::error::Error;
 use crate::error::Result as FlussResult;
 use crate::proto::ListOffsetsResponse;
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use std::collections::HashMap;
 
@@ -108,12 +110,15 @@ impl ListOffsetsResponse {
             .map(|resp| {
                 if resp.error_code.is_some() {
                     // todo: consider use another suitable error
-                    Err(Error::WriteError(format!(
-                        "Missing offset, error message: {}",
-                        resp.error_message
-                            .as_deref()
-                            .unwrap_or("unknown server exception")
-                    )))
+                    Err(Error::UnexpectedError {
+                        message: format!(
+                            "Missing offset, error message: {}",
+                            resp.error_message
+                                .as_deref()
+                                .unwrap_or("unknown server exception")
+                        ),
+                        source: None,
+                    })
                 } else {
                     // if no error msg, offset must exists
                     Ok((resp.bucket_id, resp.offset.unwrap()))
diff --git a/crates/fluss/src/rpc/message/list_tables.rs 
b/crates/fluss/src/rpc/message/list_tables.rs
index daf57ea..ff2497a 100644
--- a/crates/fluss/src/rpc/message/list_tables.rs
+++ b/crates/fluss/src/rpc/message/list_tables.rs
@@ -18,9 +18,11 @@
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 
 use crate::proto::ListTablesResponse;
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 0ed5b7c..b619ee4 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -38,6 +38,7 @@ mod produce_log;
 mod table_exists;
 mod update_metadata;
 
+pub use crate::rpc::RpcError;
 pub use create_database::*;
 pub use create_table::*;
 pub use database_exists::*;
diff --git a/crates/fluss/src/rpc/message/produce_log.rs 
b/crates/fluss/src/rpc/message/produce_log.rs
index 7da2b59..39bfb3f 100644
--- a/crates/fluss/src/rpc/message/produce_log.rs
+++ b/crates/fluss/src/rpc/message/produce_log.rs
@@ -17,9 +17,11 @@
 
 use crate::error::Result as FlussResult;
 use crate::proto::{PbProduceLogReqForBucket, ProduceLogResponse};
+use crate::rpc::frame::ReadError;
+
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use std::sync::Arc;
diff --git a/crates/fluss/src/rpc/message/table_exists.rs 
b/crates/fluss/src/rpc/message/table_exists.rs
index 3b71f47..ec98211 100644
--- a/crates/fluss/src/rpc/message/table_exists.rs
+++ b/crates/fluss/src/rpc/message/table_exists.rs
@@ -22,12 +22,13 @@ use crate::proto::TableExistsResponse;
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
 use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
+use crate::rpc::frame::ReadError;
+
 use bytes::{Buf, BufMut};
 use prost::Message;
-
 #[derive(Debug)]
 pub struct TableExistsRequest {
     pub inner_request: proto::TableExistsRequest,
diff --git a/crates/fluss/src/rpc/message/update_metadata.rs 
b/crates/fluss/src/rpc/message/update_metadata.rs
index 0d8ad64..a6e6288 100644
--- a/crates/fluss/src/rpc/message/update_metadata.rs
+++ b/crates/fluss/src/rpc/message/update_metadata.rs
@@ -18,10 +18,12 @@
 use crate::proto::{MetadataResponse, PbTablePath};
 use crate::rpc::api_key::ApiKey;
 use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
 use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
 
 use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
 use crate::{impl_read_version_type, impl_write_version_type, proto};
 use bytes::{Buf, BufMut};
 use prost::Message;
diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs
index b8705a3..86e13b1 100644
--- a/crates/fluss/src/rpc/mod.rs
+++ b/crates/fluss/src/rpc/mod.rs
@@ -17,7 +17,9 @@
 
 mod api_key;
 mod api_version;
-mod error;
+pub mod error;
+mod fluss_api_error;
+pub use fluss_api_error::{ApiError, FlussError};
 mod frame;
 pub mod message;
 pub use error::*;
diff --git a/crates/fluss/src/rpc/server_connection.rs 
b/crates/fluss/src/rpc/server_connection.rs
index c474534..fdeb56f 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::cluster::ServerNode;
+use crate::error::Error;
 use crate::rpc::api_version::ApiVersion;
 use crate::rpc::error::RpcError;
 use crate::rpc::error::RpcError::ConnectionError;
@@ -230,7 +231,7 @@ where
         }
     }
 
-    pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RpcError>
+    pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, Error>
     where
         R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
         R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -249,9 +250,12 @@ where
 
         let mut buf = Vec::new();
         // write header
-        header.write_versioned(&mut buf, header_version)?;
+        header
+            .write_versioned(&mut buf, header_version)
+            .map_err(RpcError::WriteMessageError)?;
         // write message body
-        msg.write_versioned(&mut buf, body_api_version)?;
+        msg.write_versioned(&mut buf, body_api_version)
+            .map_err(RpcError::WriteMessageError)?;
 
         let (tx, rx) = channel();
 
@@ -264,14 +268,21 @@ where
             ConnectionState::RequestMap(map) => {
                 map.insert(request_id, ActiveRequest { channel: tx });
             }
-            ConnectionState::Poison(e) => return 
Err(RpcError::Poisoned(Arc::clone(e))),
+            ConnectionState::Poison(e) => return 
Err(RpcError::Poisoned(Arc::clone(e)).into()),
         }
 
         self.send_message(buf).await?;
         _cleanup_on_cancel.message_sent();
         let mut response = rx.await.expect("Who closed this channel?!")?;
 
-        let body = R::ResponseBody::read_versioned(&mut response.data, 
body_api_version)?;
+        if let Some(error_response) = response.header.error_response {
+            return Err(Error::FlussAPIError {
+                api_error: crate::rpc::ApiError::from(error_response),
+            });
+        }
+
+        let body = R::ResponseBody::read_versioned(&mut response.data, 
body_api_version)
+            .map_err(RpcError::ReadMessageError)?;
 
         let read_bytes = response.data.position();
         let message_bytes = response.data.into_inner().len() as u64;
@@ -281,7 +292,8 @@ where
                 read: read_bytes,
                 api_key: R::API_KEY,
                 api_version: body_api_version,
-            });
+            }
+            .into());
         }
         Ok(body)
     }
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index 0086d9c..ccb7172 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -34,6 +34,7 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 mod admin_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
+    use fluss::error::FlussError;
     use fluss::metadata::{
         DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, 
TableDescriptor,
         TablePath,
@@ -251,4 +252,35 @@ mod admin_test {
         // database shouldn't exist now
         assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
     }
+
+    #[tokio::test]
+    async fn test_fluss_error_response() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Failed to get admin client");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"not_exist".to_string());
+
+        let result = admin.get_table(&table_path).await;
+        assert!(result.is_err(), "Expected error but got Ok");
+
+        let error = result.unwrap_err();
+        match error {
+            fluss::error::Error::FlussAPIError { api_error } => {
+                assert_eq!(
+                    api_error.code,
+                    FlussError::TableNotExist.code(),
+                    "Expected error code 7 (TableNotExist)"
+                );
+                assert_eq!(
+                    api_error.message, "Table 'fluss.not_exist' does not 
exist.",
+                    "Expected specific error message"
+                );
+            }
+            other => panic!("Expected FlussAPIError, got {:?}", other),
+        }
+    }
 }
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index f52d526..ca61ff8 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -38,8 +38,6 @@ mod table_remote_scan_test {
     use fluss::row::{GenericRow, InternalRow};
     use std::collections::HashMap;
     use std::sync::Arc;
-    use std::sync::atomic::AtomicUsize;
-    use std::sync::atomic::Ordering;
     use std::thread;
     use std::thread::sleep;
     use std::time::Duration;
@@ -89,11 +87,13 @@ mod table_remote_scan_test {
                     temp_dir.to_string_lossy().to_string(),
                 );
 
-                let cluster =
-                    
FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf)
-                        .with_remote_data_dir(temp_dir)
-                        .build()
-                        .await;
+                let cluster = 
FlussTestingClusterBuilder::new_with_cluster_conf(
+                    "test_table_remote",
+                    &cluster_conf,
+                )
+                .with_remote_data_dir(temp_dir)
+                .build()
+                .await;
                 let mut guard = cluster_guard.write();
                 *guard = Some(cluster);
             });


Reply via email to