This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4e10d63 chore: Improve error (#77)
4e10d63 is described below
commit 4e10d6305712357630f96cc5fa47dc9867fea842
Author: yuxia Luo <[email protected]>
AuthorDate: Sat Dec 20 12:04:20 2025 +0800
chore: Improve error (#77)
---
crates/fluss/Cargo.toml | 1 +
crates/fluss/src/client/admin.rs | 28 +-
crates/fluss/src/client/credentials.rs | 7 +-
crates/fluss/src/client/table/remote_log.rs | 29 +-
crates/fluss/src/client/table/scanner.rs | 30 +-
crates/fluss/src/client/write/mod.rs | 10 +-
crates/fluss/src/client/write/sender.rs | 11 +-
crates/fluss/src/client/write/writer_client.rs | 17 +-
crates/fluss/src/error.rs | 139 ++++++--
crates/fluss/src/io/file_io.rs | 5 +-
crates/fluss/src/io/storage.rs | 6 +-
crates/fluss/src/metadata/database.rs | 22 +-
crates/fluss/src/metadata/json_serde.rs | 179 +++++-----
crates/fluss/src/metadata/table.rs | 88 +++--
crates/fluss/src/proto/fluss_api.proto | 5 +
crates/fluss/src/record/arrow.rs | 24 +-
crates/fluss/src/row/datum.rs | 34 +-
crates/fluss/src/rpc/error.rs | 4 +
crates/fluss/src/rpc/fluss_api_error.rs | 371 +++++++++++++++++++++
crates/fluss/src/rpc/frame.rs | 4 +
crates/fluss/src/rpc/message/create_database.rs | 3 +-
crates/fluss/src/rpc/message/create_table.rs | 3 +-
crates/fluss/src/rpc/message/database_exists.rs | 4 +-
crates/fluss/src/rpc/message/drop_database.rs | 4 +-
crates/fluss/src/rpc/message/drop_table.rs | 4 +-
crates/fluss/src/rpc/message/fetch.rs | 4 +-
crates/fluss/src/rpc/message/get_database_info.rs | 4 +-
.../src/rpc/message/get_latest_lake_snapshot.rs | 4 +-
crates/fluss/src/rpc/message/get_table.rs | 4 +-
crates/fluss/src/rpc/message/header.rs | 18 +-
crates/fluss/src/rpc/message/list_databases.rs | 4 +-
crates/fluss/src/rpc/message/list_offsets.rs | 19 +-
crates/fluss/src/rpc/message/list_tables.rs | 4 +-
crates/fluss/src/rpc/message/mod.rs | 1 +
crates/fluss/src/rpc/message/produce_log.rs | 4 +-
crates/fluss/src/rpc/message/table_exists.rs | 5 +-
crates/fluss/src/rpc/message/update_metadata.rs | 4 +-
crates/fluss/src/rpc/mod.rs | 4 +-
crates/fluss/src/rpc/server_connection.rs | 24 +-
crates/fluss/tests/integration/admin.rs | 32 ++
.../fluss/tests/integration/table_remote_scan.rs | 14 +-
41 files changed, 908 insertions(+), 273 deletions(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 0cf0364..cdba9de 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -57,6 +57,7 @@ opendal = "0.55.0"
url = "2.5.7"
uuid = { version = "1.10", features = ["v4"] }
tempfile = "3.23.0"
+snafu = "0.8.3"
[target.'cfg(target_arch = "wasm32")'.dependencies]
jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index fefab43..e185af8 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -29,7 +29,7 @@ use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};
use crate::BucketId;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use std::collections::HashMap;
use std::slice::from_ref;
@@ -245,10 +245,10 @@ impl FlussAdmin {
let mut results = HashMap::new();
for response_future in response_futures {
- let offsets = response_future.await.map_err(
- // todo: consider use suitable error
- |e| crate::error::Error::WriteError(format!("Fail to get
result: {e}")),
- )?;
+ let offsets = response_future.await.map_err(|e|
Error::UnexpectedError {
+ message: "Fail to get result for list offsets.".to_string(),
+ source: Some(Box::new(e)),
+ })?;
results.extend(offsets?);
}
Ok(results)
@@ -267,10 +267,11 @@ impl FlussAdmin {
for bucket_id in buckets {
let table_bucket = TableBucket::new(table_id, *bucket_id);
let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
- // todo: consider use another suitable error
- crate::error::Error::InvalidTableError(format!(
- "No leader found for table bucket: table_id={table_id},
bucket_id={bucket_id}"
- ))
+ // todo: consider retry?
+ Error::UnexpectedError {
+ message: format!("No leader found for table bucket:
{table_bucket}."),
+ source: None,
+ }
})?;
node_for_bucket_list
@@ -301,10 +302,11 @@ impl FlussAdmin {
let task = tokio::spawn(async move {
let cluster = metadata.get_cluster();
let tablet_server =
cluster.get_tablet_server(leader_id).ok_or_else(|| {
- // todo: consider use more suitable error
- crate::error::Error::InvalidTableError(format!(
- "Tablet server {leader_id} not found"
- ))
+ Error::LeaderNotAvailable {
+ message: format!(
+ "Tablet server {leader_id} is not found in
metadata cache."
+ ),
+ }
})?;
let connection =
rpc_client.get_connection(tablet_server).await?;
let list_offsets_response = connection.request(request).await?;
diff --git a/crates/fluss/src/client/credentials.rs
b/crates/fluss/src/client/credentials.rs
index bd2a477..6b07d08 100644
--- a/crates/fluss/src/client/credentials.rs
+++ b/crates/fluss/src/client/credentials.rs
@@ -134,9 +134,10 @@ impl CredentialsCache {
return Ok(HashMap::new());
}
- let credentials: Credentials =
serde_json::from_slice(&response.token).map_err(|e| {
- Error::JsonSerdeError(format!("Error when parse token from server:
{e}"))
- })?;
+ let credentials: Credentials =
+ serde_json::from_slice(&response.token).map_err(|e|
Error::JsonSerdeError {
+ message: format!("Error when parse token from server: {e}"),
+ })?;
let mut addition_infos = HashMap::new();
for kv in &response.addition_info {
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index a2561f3..10273dd 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -100,15 +100,14 @@ impl RemoteLogDownloadFuture {
/// Get the downloaded file path
pub async fn get_file_path(&mut self) -> Result<PathBuf> {
- let receiver = self
- .receiver
- .take()
- .ok_or_else(|| Error::Io(io::Error::other("Download future already
consumed")))?;
-
- receiver.await.map_err(|e| {
- Error::Io(io::Error::other(format!(
- "Download future cancelled: {e:?}"
- )))
+ let receiver = self.receiver.take().ok_or_else(||
Error::UnexpectedError {
+ message: "Downloaded file already consumed".to_string(),
+ source: None,
+ })?;
+
+ receiver.await.map_err(|e| Error::UnexpectedError {
+ message: format!("Download future cancelled: {e:?}"),
+ source: None,
})?
}
}
@@ -234,13 +233,13 @@ impl RemoteLogDownloader {
let read_future = op.read_with(relative_path).range(range.clone());
let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
.await
- .map_err(|_| {
- Error::Io(io::Error::new(
- io::ErrorKind::TimedOut,
- format!(
- "Timeout reading chunk from remote storage:
{remote_path} at offset {offset}"
+ .map_err(|e| {
+ Error::IoUnexpectedError {
+ message: format!(
+ "Timeout reading chunk from remote storage:
{remote_path} at offset {offset}, exception: {e}."
),
- ))
+ source: io::ErrorKind::TimedOut.into(),
+ }
})??;
let bytes = chunk.to_bytes();
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index f66d7d7..1e70649 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -74,18 +74,20 @@ impl<'a> TableScan<'a> {
/// ```
pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
if column_indices.is_empty() {
- return Err(Error::IllegalArgument(
- "Column indices cannot be empty".to_string(),
- ));
+ return Err(Error::IllegalArgument {
+ message: "Column indices cannot be empty".to_string(),
+ });
}
let field_count = self.table_info.row_type().fields().len();
for &idx in column_indices {
if idx >= field_count {
- return Err(Error::IllegalArgument(format!(
- "Column index {} out of range (max: {})",
- idx,
- field_count - 1
- )));
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ ),
+ });
}
}
self.projected_fields = Some(column_indices.to_vec());
@@ -106,9 +108,9 @@ impl<'a> TableScan<'a> {
/// ```
pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
if column_names.is_empty() {
- return Err(Error::IllegalArgument(
- "Column names cannot be empty".to_string(),
- ));
+ return Err(Error::IllegalArgument {
+ message: "Column names cannot be empty".to_string(),
+ });
}
let row_type = self.table_info.row_type();
let mut indices = Vec::new();
@@ -118,7 +120,9 @@ impl<'a> TableScan<'a> {
.fields()
.iter()
.position(|f| f.name() == *name)
- .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}'
not found")))?;
+ .ok_or_else(|| Error::IllegalArgument {
+ message: format!("Column '{name}' not found"),
+ })?;
indices.push(idx);
}
@@ -277,7 +281,7 @@ impl LogFetcher {
// Download and process remote log segments
let mut pos_in_log_segment =
remote_fetch_info.first_start_pos;
let mut current_fetch_offset = fetch_offset;
- // todo: make segment download parallelly
+ // todo: make segment download in parallel
for (i, segment) in
remote_fetch_info.remote_log_segments.iter().enumerate()
{
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index e632cde..cd33586 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -74,11 +74,17 @@ impl ResultHandle {
self.receiver
.receive()
.await
- .map_err(|e| Error::WriteError(e.to_string()))
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Fail to wait write result {e:?}"),
+ source: None,
+ })
}
pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
// do nothing, just return empty result
- batch_result.map_err(|e| Error::WriteError(e.to_string()))
+ batch_result.map_err(|e| Error::UnexpectedError {
+ message: format!("Fail to get write result {e:?}"),
+ source: None,
+ })
}
}
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 27460e3..462a846 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -17,7 +17,7 @@
use crate::client::metadata::Metadata;
use crate::client::{ReadyWriteBatch, RecordAccumulator};
-use crate::error::Error::WriteError;
+use crate::error::Error;
use crate::error::Result;
use crate::metadata::TableBucket;
use crate::proto::ProduceLogResponse;
@@ -150,9 +150,12 @@ impl Sender {
let cluster = self.metadata.get_cluster();
- let destination_node = cluster
- .get_tablet_server(destination)
- .ok_or(WriteError(String::from("destination node not found")))?;
+ let destination_node =
+ cluster
+ .get_tablet_server(destination)
+ .ok_or(Error::LeaderNotAvailable {
+ message: format!("destination node not found in metadata
cache {destination}."),
+ })?;
let connection = self.metadata.get_connection(destination_node).await?;
for (table_id, write_batches) in write_batch_by_table {
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 28f5371..042859a 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -78,11 +78,12 @@ impl WriterClient {
fn get_ack(config: &Config) -> Result<i16> {
let acks = config.writer_acks.as_str();
- if acks.eq("all") {
+ if acks.eq_ignore_ascii_case("all") {
Ok(-1)
} else {
- acks.parse::<i16>()
- .map_err(|e| Error::IllegalArgument(e.to_string()))
+ acks.parse::<i16>().map_err(|e| Error::IllegalArgument {
+ message: format!("invalid writer ack '{acks}': {e}"),
+ })
}
}
@@ -133,11 +134,17 @@ impl WriterClient {
self.shutdown_tx
.send(())
.await
- .map_err(|e| Error::WriteError(e.to_string()))?;
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to close write client: {e:?}"),
+ source: None,
+ })?;
self.sender_join_handle
.await
- .map_err(|e| Error::WriteError(e.to_string()))?;
+ .map_err(|e| Error::UnexpectedError {
+ message: format!("Failed to close write client: {e:?}"),
+ source: None,
+ })?;
Ok(())
}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 63438b1..0f4b1b6 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -15,48 +15,137 @@
// specific language governing permissions and limitations
// under the License.
-use crate::rpc::RpcError;
+pub use crate::rpc::RpcError;
+pub use crate::rpc::{ApiError, FlussError};
+
use arrow_schema::ArrowError;
+use snafu::Snafu;
use std::{io, result};
-use thiserror::Error;
pub type Result<T> = result::Result<T, Error>;
-#[derive(Debug, Error)]
+#[derive(Debug, Snafu)]
pub enum Error {
- #[error(transparent)]
- Io(#[from] io::Error),
+ #[snafu(
+ whatever,
+ display("Fluss hitting unexpected error {}: {:?}", message, source)
+ )]
+ UnexpectedError {
+ message: String,
+ /// see https://github.com/shepmaster/snafu/issues/446
+ #[snafu(source(from(Box<dyn std::error::Error + Send + Sync +
'static>, Some)))]
+ source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
+ },
+
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting unexpected io error {}: {:?}", message, source)
+ )]
+ IoUnexpectedError { message: String, source: io::Error },
+
+ #[snafu(
+ visibility(pub(crate)),
+ display(
+ "Fluss hitting remote storage unexpected error {}: {:?}",
+ message,
+ source
+ )
+ )]
+ RemoteStorageUnexpectedError {
+ message: String,
+ source: opendal::Error,
+ },
+
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting invalid table error {}.", message)
+ )]
+ InvalidTableError { message: String },
- #[error("Invalid table")]
- InvalidTableError(String),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting json serde error {}.", message)
+ )]
+ JsonSerdeError { message: String },
- #[error("Json serde error")]
- JsonSerdeError(String),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting unexpected rpc error {}: {:?}", message, source)
+ )]
+ RpcError { message: String, source: RpcError },
- #[error("Rpc error")]
- RpcError(#[from] RpcError),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting row convert error {}.", message)
+ )]
+ RowConvertError { message: String },
- #[error("Row convert error")]
- RowConvertError(String),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting Arrow error {}: {:?}.", message, source)
+ )]
+ ArrowError { message: String, source: ArrowError },
- #[error("Arrow error: {0}")]
- ArrowError(#[from] ArrowError),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting illegal argument error {}.", message)
+ )]
+ IllegalArgument { message: String },
- #[error("Write error: {0}")]
- WriteError(String),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting IO not supported error {}.", message)
+ )]
+ IoUnsupported { message: String },
- #[error("Illegal argument error: {0}")]
- IllegalArgument(String),
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting leader not available error {}.", message)
+ )]
+ LeaderNotAvailable { message: String },
- #[error("IO not supported error: {0}")]
- IoUnsupported(String),
+ #[snafu(visibility(pub(crate)), display("Fluss API Error: {}.",
api_error))]
+ FlussAPIError { api_error: ApiError },
+}
- #[error("IO operation failed on underlying storage: {0}")]
- IoUnexpected(Box<opendal::Error>),
+impl From<ArrowError> for Error {
+ fn from(value: ArrowError) -> Self {
+ Error::ArrowError {
+ message: format!("{value}"),
+ source: value,
+ }
+ }
+}
+
+impl From<RpcError> for Error {
+ fn from(value: RpcError) -> Self {
+ Error::RpcError {
+ message: format!("{value}"),
+ source: value,
+ }
+ }
+}
+
+impl From<io::Error> for Error {
+ fn from(value: io::Error) -> Self {
+ Error::IoUnexpectedError {
+ message: format!("{value}"),
+ source: value,
+ }
+ }
}
impl From<opendal::Error> for Error {
- fn from(err: opendal::Error) -> Self {
- Error::IoUnexpected(Box::new(err))
+ fn from(value: opendal::Error) -> Self {
+ Error::RemoteStorageUnexpectedError {
+ message: format!("{value}"),
+ source: value,
+ }
+ }
+}
+
+impl From<ApiError> for Error {
+ fn from(value: ApiError) -> Self {
+ Error::FlussAPIError { api_error: value }
}
}
diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs
index ec3b87e..e7b026d 100644
--- a/crates/fluss/src/io/file_io.rs
+++ b/crates/fluss/src/io/file_io.rs
@@ -39,8 +39,9 @@ pub struct FileIO {
impl FileIO {
/// Try to infer file io scheme from path.
pub fn from_url(path: &str) -> Result<FileIOBuilder> {
- let url =
- Url::parse(path).map_err(|_|
Error::IllegalArgument(format!("Invalid URL: {path}")))?;
+ let url = Url::parse(path).map_err(|_| Error::IllegalArgument {
+ message: format!("Invalid URL: {path}"),
+ })?;
Ok(FileIOBuilder::new(url.scheme()))
}
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index 089670e..d90eaa5 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -44,9 +44,9 @@ impl Storage {
Scheme::Fs => Ok(Self::LocalFs),
#[cfg(feature = "storage-s3")]
Scheme::S3 => Ok(Self::S3 { props }),
- _ => Err(error::Error::IoUnsupported(
- "Unsupported storage feature".to_string(),
- )),
+ _ => Err(error::Error::IoUnsupported {
+ message: format!("Unsupported storage feature {scheme_str}"),
+ }),
}
}
diff --git a/crates/fluss/src/metadata/database.rs
b/crates/fluss/src/metadata/database.rs
index 8eaa4d3..fad1498 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -148,8 +148,8 @@ impl JsonSerde for DatabaseDescriptor {
if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
let comment = comment_node
.as_str()
- .ok_or_else(|| {
- JsonSerdeError(format!("{} should be a string",
Self::COMMENT_NAME))
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("{} should be a string",
Self::COMMENT_NAME),
})?
.to_owned();
builder = builder.comment(&comment);
@@ -157,8 +157,8 @@ impl JsonSerde for DatabaseDescriptor {
// Deserialize custom properties directly
let custom_properties = if let Some(props_node) =
node.get(Self::CUSTOM_PROPERTIES_NAME) {
- let obj = props_node.as_object().ok_or_else(|| {
- JsonSerdeError("Custom properties should be an
object".to_string())
+ let obj = props_node.as_object().ok_or_else(|| JsonSerdeError {
+ message: "Custom properties should be an object".to_string(),
})?;
let mut properties = HashMap::with_capacity(obj.len());
@@ -167,8 +167,8 @@ impl JsonSerde for DatabaseDescriptor {
key.clone(),
value
.as_str()
- .ok_or_else(|| {
- JsonSerdeError("Property value should be a
string".to_string())
+ .ok_or_else(|| JsonSerdeError {
+ message: "Property value should be a
string".to_string(),
})?
.to_owned(),
);
@@ -186,16 +186,18 @@ impl JsonSerde for DatabaseDescriptor {
impl DatabaseDescriptor {
/// Create DatabaseDescriptor from JSON bytes (equivalent to Java's
fromJsonBytes)
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
- let json_value: Value = serde_json::from_slice(bytes)
- .map_err(|e| JsonSerdeError(format!("Failed to parse JSON:
{e}")))?;
+ let json_value: Value = serde_json::from_slice(bytes).map_err(|e|
JsonSerdeError {
+ message: format!("Failed to parse JSON: {e}"),
+ })?;
Self::deserialize_json(&json_value)
}
/// Convert DatabaseDescriptor to JSON bytes
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
let json_value = self.serialize_json()?;
- serde_json::to_vec(&json_value)
- .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON:
{e}")))
+ serde_json::to_vec(&json_value).map_err(|e| JsonSerdeError {
+ message: format!("Failed to serialize to JSON: {e}"),
+ })
}
}
diff --git a/crates/fluss/src/metadata/json_serde.rs
b/crates/fluss/src/metadata/json_serde.rs
index 447b0f9..7d94e19 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::Error::{InvalidTableError, JsonSerdeError};
-use crate::error::Result;
+use crate::error::Error::JsonSerdeError;
+use crate::error::{Error, Result};
use crate::metadata::datatype::{DataField, DataType, DataTypes};
use crate::metadata::table::{Column, Schema, TableDescriptor};
use serde_json::{Value, json};
@@ -166,11 +166,11 @@ impl JsonSerde for DataType {
let type_root = node
.get(Self::FIELD_NAME_TYPE_NAME)
.and_then(|v| v.as_str())
- .ok_or_else(|| {
- JsonSerdeError(format!(
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!(
"Couldn't find field {} while deserializing datatype.",
Self::FIELD_NAME_TYPE_NAME
- ))
+ ),
})?;
let mut data_type = match type_root {
@@ -185,11 +185,8 @@ impl JsonSerde for DataType {
let length = node
.get(Self::FIELD_NAME_LENGTH)
.and_then(|v| v.as_u64())
- .ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_LENGTH
- ))
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::FIELD_NAME_LENGTH),
})? as u32;
DataTypes::char(length)
}
@@ -198,11 +195,8 @@ impl JsonSerde for DataType {
let precision = node
.get(Self::FIELD_NAME_PRECISION)
.and_then(|v| v.as_u64())
- .ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_PRECISION
- ))
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::FIELD_NAME_PRECISION),
})? as u32;
let scale = node
.get(Self::FIELD_NAME_SCALE)
@@ -243,43 +237,46 @@ impl JsonSerde for DataType {
"ARRAY" => {
let element_type_node =
node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_ELEMENT_TYPE
- ))
+ Error::JsonSerdeError {
+ message: format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_ELEMENT_TYPE
+ ),
+ }
})?;
let element_type =
DataType::deserialize_json(element_type_node)?;
DataTypes::array(element_type)
}
"MAP" => {
- let key_type_node =
node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_KEY_TYPE
- ))
- })?;
+ let key_type_node =
+ node.get(Self::FIELD_NAME_KEY_TYPE)
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_KEY_TYPE
+ ),
+ })?;
let key_type = DataType::deserialize_json(key_type_node)?;
- let value_type_node =
node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_VALUE_TYPE
- ))
- })?;
+ let value_type_node =
+ node.get(Self::FIELD_NAME_VALUE_TYPE)
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!(
+ "Missing required field: {}",
+ Self::FIELD_NAME_VALUE_TYPE
+ ),
+ })?;
let value_type = DataType::deserialize_json(value_type_node)?;
DataTypes::map(key_type, value_type)
}
"ROW" => {
let fields_node = node
.get(Self::FIELD_NAME_FIELDS)
- .ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::FIELD_NAME_FIELDS
- ))
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::FIELD_NAME_FIELDS),
})?
.as_array()
- .ok_or_else(|| {
- JsonSerdeError(format!("{} must be an array",
Self::FIELD_NAME_FIELDS))
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("{} must be an array",
Self::FIELD_NAME_FIELDS),
})?;
let mut fields = Vec::with_capacity(fields_node.len());
for field_node in fields_node {
@@ -287,7 +284,11 @@ impl JsonSerde for DataType {
}
DataTypes::row(fields)
}
- _ => return Err(JsonSerdeError(format!("Unknown type root:
{type_root}"))),
+ _ => {
+ return Err(Error::JsonSerdeError {
+ message: format!("Unknown type root: {type_root}"),
+ });
+ }
};
if let Some(nullable) = node.get(Self::FIELD_NAME_NULLABLE) {
@@ -327,12 +328,16 @@ impl JsonSerde for DataField {
let name = node
.get(Self::NAME)
.and_then(|v| v.as_str())
- .ok_or_else(|| JsonSerdeError(format!("Missing required field:
{}", Self::NAME)))?
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}", Self::NAME),
+ })?
.to_string();
- let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| {
- JsonSerdeError(format!("Missing required field: {}",
Self::FIELD_TYPE))
- })?;
+ let field_type_node = node
+ .get(Self::FIELD_TYPE)
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::FIELD_TYPE),
+ })?;
let data_type = DataType::deserialize_json(field_type_node)?;
@@ -373,12 +378,16 @@ impl JsonSerde for Column {
let name = node
.get(Self::NAME)
.and_then(|v| v.as_str())
- .ok_or_else(|| JsonSerdeError(format!("Missing required field:
{}", Self::NAME)))?
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}", Self::NAME),
+ })?
.to_string();
- let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| {
- JsonSerdeError(format!("Missing required field: {}",
Self::DATA_TYPE))
- })?;
+ let data_type_node = node
+ .get(Self::DATA_TYPE)
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::DATA_TYPE),
+ })?;
let data_type = DataType::deserialize_json(data_type_node)?;
@@ -429,11 +438,13 @@ impl JsonSerde for Schema {
fn deserialize_json(node: &Value) -> Result<Schema> {
let columns_node = node
.get(Self::COLUMNS_NAME)
- .ok_or_else(|| {
- JsonSerdeError(format!("Missing required field: {}",
Self::COLUMNS_NAME))
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::COLUMNS_NAME),
})?
.as_array()
- .ok_or_else(|| JsonSerdeError(format!("{} must be an array",
Self::COLUMNS_NAME)))?;
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: format!("{} must be an array", Self::COLUMNS_NAME),
+ })?;
let mut columns = Vec::with_capacity(columns_node.len());
for col_node in columns_node {
@@ -443,17 +454,17 @@ impl JsonSerde for Schema {
let mut schema_builder = Schema::builder().with_columns(columns);
if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) {
- let pk_array = pk_node
- .as_array()
- .ok_or_else(|| InvalidTableError("Primary key must be an
array".to_string()))?;
+ let pk_array = pk_node.as_array().ok_or_else(||
Error::InvalidTableError {
+ message: "Primary key must be an array".to_string(),
+ })?;
let mut primary_keys = Vec::with_capacity(pk_array.len());
for name_node in pk_array {
primary_keys.push(
name_node
.as_str()
- .ok_or_else(|| {
- InvalidTableError("Primary key element must be a
string".to_string())
+ .ok_or_else(|| Error::InvalidTableError {
+ message: "Primary key element must be a
string".to_string(),
})?
.to_string(),
);
@@ -478,9 +489,9 @@ impl TableDescriptor {
const VERSION: u32 = 1;
fn deserialize_properties(node: &Value) -> Result<HashMap<String, String>>
{
- let obj = node
- .as_object()
- .ok_or_else(|| JsonSerdeError("Properties must be an
object".to_string()))?;
+ let obj = node.as_object().ok_or_else(|| Error::JsonSerdeError {
+ message: "Properties must be an object".to_string(),
+ })?;
let mut properties = HashMap::with_capacity(obj.len());
for (key, value) in obj {
@@ -488,7 +499,9 @@ impl TableDescriptor {
key.clone(),
value
.as_str()
- .ok_or_else(|| JsonSerdeError("Property value must be a
string".to_string()))?
+ .ok_or_else(|| Error::JsonSerdeError {
+ message: "Property value must be a string".to_string(),
+ })?
.to_owned(),
);
}
@@ -545,8 +558,8 @@ impl JsonSerde for TableDescriptor {
let mut builder = TableDescriptor::builder();
// Deserialize schema
- let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(|| {
- JsonSerdeError(format!("Missing required field: {}",
Self::SCHEMA_NAME))
+ let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(||
JsonSerdeError {
+ message: format!("Missing required field: {}", Self::SCHEMA_NAME),
})?;
let schema = Schema::deserialize_json(schema_node)?;
builder = builder.schema(schema);
@@ -555,22 +568,21 @@ impl JsonSerde for TableDescriptor {
if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
let comment = comment_node
.as_str()
- .ok_or_else(|| JsonSerdeError(format!("{} must be a string",
Self::COMMENT_NAME)))?
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("{} must be a string",
Self::COMMENT_NAME),
+ })?
.to_owned();
builder = builder.comment(comment.as_str());
}
let partition_node = node
.get(Self::PARTITION_KEY_NAME)
- .ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::PARTITION_KEY_NAME
- ))
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::PARTITION_KEY_NAME),
})?
.as_array()
- .ok_or_else(|| {
- JsonSerdeError(format!("{} must be an array",
Self::PARTITION_KEY_NAME))
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("{} must be an array",
Self::PARTITION_KEY_NAME),
})?;
let mut partition_keys = Vec::with_capacity(partition_node.len());
@@ -578,11 +590,8 @@ impl JsonSerde for TableDescriptor {
partition_keys.push(
key_node
.as_str()
- .ok_or_else(|| {
- JsonSerdeError(format!(
- "{} element must be a string",
- Self::PARTITION_KEY_NAME
- ))
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("{} element must be a string",
Self::PARTITION_KEY_NAME),
})?
.to_owned(),
);
@@ -592,15 +601,17 @@ impl JsonSerde for TableDescriptor {
let mut bucket_count = None;
let mut bucket_keys = vec![];
if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) {
- let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| {
- JsonSerdeError(format!("{} must be an array",
Self::BUCKET_KEY_NAME))
+ let bucket_key_node = bucket_key_node.as_array().ok_or_else(||
JsonSerdeError {
+ message: format!("{} must be an array", Self::BUCKET_KEY_NAME),
})?;
for key_node in bucket_key_node {
bucket_keys.push(
key_node
.as_str()
- .ok_or_else(|| JsonSerdeError("Bucket key must be a
string".to_string()))?
+ .ok_or_else(|| JsonSerdeError {
+ message: "Bucket key must be a string".to_string(),
+ })?
.to_owned(),
);
}
@@ -617,18 +628,18 @@ impl JsonSerde for TableDescriptor {
// Deserialize properties
let properties =
Self::deserialize_properties(node.get(Self::PROPERTIES_NAME).ok_or_else(|| {
- JsonSerdeError(format!("Missing required field: {}",
Self::PROPERTIES_NAME))
+ JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::PROPERTIES_NAME),
+ }
})?)?;
builder = builder.properties(properties);
// Deserialize custom properties
let custom_properties = Self::deserialize_properties(
- node.get(Self::CUSTOM_PROPERTIES_NAME).ok_or_else(|| {
- JsonSerdeError(format!(
- "Missing required field: {}",
- Self::CUSTOM_PROPERTIES_NAME
- ))
- })?,
+ node.get(Self::CUSTOM_PROPERTIES_NAME)
+ .ok_or_else(|| JsonSerdeError {
+ message: format!("Missing required field: {}",
Self::CUSTOM_PROPERTIES_NAME),
+ })?,
)?;
builder = builder.custom_properties(custom_properties);
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 751dd6d..770c4f2 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::error::Error::InvalidTableError;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::datatype::{DataField, DataType, RowType};
use core::fmt;
use serde::{Deserialize, Serialize};
@@ -220,9 +220,9 @@ impl SchemaBuilder {
) -> Result<Vec<Column>> {
let names: Vec<_> = columns.iter().map(|c| &c.name).collect();
if let Some(duplicates) = Self::find_duplicates(&names) {
- return Err(InvalidTableError(format!(
- "Duplicate column names found: {duplicates:?}"
- )));
+ return Err(InvalidTableError {
+ message: format!("Duplicate column names found:
{duplicates:?}"),
+ });
}
let Some(pk) = primary_key else {
@@ -232,9 +232,9 @@ impl SchemaBuilder {
let pk_set: HashSet<_> = pk.column_names.iter().collect();
let all_columns: HashSet<_> = columns.iter().map(|c|
&c.name).collect();
if !pk_set.is_subset(&all_columns) {
- return Err(InvalidTableError(format!(
- "Primary key columns {pk_set:?} not found in schema"
- )));
+ return Err(InvalidTableError {
+ message: format!("Primary key columns {pk_set:?} not found in
schema"),
+ });
}
Ok(columns
@@ -441,12 +441,12 @@ impl TableDescriptor {
pub fn replication_factor(&self) -> Result<i32> {
self.properties
.get("table.replication.factor")
- .ok_or(InvalidTableError(
- "Replication factor is not set".to_string(),
- ))?
+ .ok_or_else(|| InvalidTableError {
+ message: "Replication factor is not set".to_string(),
+ })?
.parse()
- .map_err(|_e| {
- InvalidTableError("Replication factor can't be convert into
int".to_string())
+ .map_err(|_e| InvalidTableError {
+ message: "Replication factor can't be convert into
int".to_string(),
})
}
@@ -497,11 +497,13 @@ impl TableDescriptor {
bucket_keys.retain(|k| !partition_keys.contains(k));
if bucket_keys.is_empty() {
- return Err(InvalidTableError(format!(
- "Primary Key constraint {:?} should not be same with partition
fields {:?}.",
- schema.primary_key().unwrap().column_names(),
- partition_keys
- )));
+ return Err(Error::InvalidTableError {
+ message: format!(
+ "Primary Key constraint {:?} should not be same with
partition fields {:?}.",
+ schema.primary_key().unwrap().column_names(),
+ partition_keys
+ ),
+ });
}
Ok(bucket_keys)
@@ -518,10 +520,12 @@ impl TableDescriptor {
.iter()
.any(|k| partition_keys.contains(k))
{
- return Err(InvalidTableError(format!(
- "Bucket key {:?} shouldn't include any column in partition
keys {:?}.",
- distribution.bucket_keys, partition_keys
- )));
+ return Err(InvalidTableError {
+ message: format!(
+ "Bucket key {:?} shouldn't include any column in
partition keys {:?}.",
+ distribution.bucket_keys, partition_keys
+ ),
+ });
}
return if let Some(pk) = schema.primary_key() {
@@ -540,13 +544,15 @@ impl TableDescriptor {
.iter()
.all(|k| pk_columns.contains(k))
{
- return Err(InvalidTableError(format!(
- "Bucket keys must be a subset of primary keys
excluding partition keys for primary-key tables. \
- The primary keys are {:?}, the partition keys are
{:?}, but the user-defined bucket keys are {:?}.",
- pk.column_names(),
- partition_keys,
- distribution.bucket_keys
- )));
+ return Err(InvalidTableError {
+ message: format!(
+ "Bucket keys must be a subset of primary keys
excluding partition keys for primary-key tables. \
+ The primary keys are {:?}, the partition keys
are {:?}, but the user-defined bucket keys are {:?}.",
+ pk.column_names(),
+ partition_keys,
+ distribution.bucket_keys
+ ),
+ });
}
Ok(Some(distribution))
}
@@ -589,7 +595,9 @@ impl LogFormat {
match s.to_uppercase().as_str() {
"ARROW" => Ok(LogFormat::ARROW),
"INDEXED" => Ok(LogFormat::INDEXED),
- _ => Err(InvalidTableError(format!("Unknown log format: {s}"))),
+ _ => Err(InvalidTableError {
+ message: format!("Unknown log format: {s}"),
+ }),
}
}
}
@@ -615,7 +623,9 @@ impl KvFormat {
match s.to_uppercase().as_str() {
"INDEXED" => Ok(KvFormat::INDEXED),
"COMPACTED" => Ok(KvFormat::COMPACTED),
- _ => Err(InvalidTableError(format!("Unknown kv format: {s}"))),
+ _ => Err(Error::InvalidTableError {
+ message: format!("Unknown kv format: {s}"),
+ }),
}
}
}
@@ -961,6 +971,24 @@ impl TableBucket {
}
}
+impl Display for TableBucket {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if let Some(partition_id) = self.partition_id {
+ write!(
+ f,
+ "TableBucket(table_id={}, partition_id={}, bucket={})",
+ self.table_id, partition_id, self.bucket
+ )
+ } else {
+ write!(
+ f,
+ "TableBucket(table_id={}, bucket={})",
+ self.table_id, self.bucket
+ )
+ }
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LakeSnapshot {
pub snapshot_id: i64,
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index e59c2d9..dbbb45d 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -19,6 +19,11 @@ syntax = "proto2";
package proto;
+message ErrorResponse {
+ required int32 error_code = 1;
+ optional string error_message = 2;
+}
+
// metadata request and response, request send from client to each server.
message MetadataRequest {
repeated PbTablePath table_path = 1;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 6e8cb55..9295713 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -844,10 +844,7 @@ impl ReadContext {
}
pub fn record_batch_for_remote_log(&self, data: &[u8]) ->
Result<Option<RecordBatch>> {
- let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
- Some(result) => result,
- None => return Ok(None),
- };
+ let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
let record_batch = read_record_batch(
&body_buffer,
@@ -1086,13 +1083,17 @@ mod tests {
let result = parse_ipc_message(empty_body);
assert_eq!(
result.unwrap_err().to_string(),
- String::from("Arrow error: Parser error: Range [0, 4) is out of
bounds.\n\n")
+ String::from(
+ "Fluss hitting Arrow error Parser error: Range [0, 4) is out
of bounds.\n\n: ParseError(\"Range [0, 4) is out of bounds.\\n\\n\")."
+ )
);
let invalid_data = &[];
assert_eq!(
parse_ipc_message(invalid_data).unwrap_err().to_string(),
- String::from("Arrow error: Parser error: Invalid data length: 0")
+ String::from(
+ "Fluss hitting Arrow error Parser error: Invalid data length:
0: ParseError(\"Invalid data length: 0\")."
+ )
);
let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001,
0x00000000]);
@@ -1100,7 +1101,9 @@ mod tests {
parse_ipc_message(data_with_invalid_continuation)
.unwrap_err()
.to_string(),
- String::from("Arrow error: Parser error: Invalid continuation
marker: 1")
+ String::from(
+ "Fluss hitting Arrow error Parser error: Invalid continuation
marker: 1: ParseError(\"Invalid continuation marker: 1\")."
+ )
);
let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF,
0x00000001]);
@@ -1109,8 +1112,7 @@ mod tests {
.unwrap_err()
.to_string(),
String::from(
- "Arrow error: Parser error: Invalid data length. \
- Remaining data length 0 is shorter than specified size 1"
+ "Fluss hitting Arrow error Parser error: Invalid data length.
Remaining data length 0 is shorter than specified size 1: ParseError(\"Invalid
data length. Remaining data length 0 is shorter than specified size 1\")."
)
);
@@ -1119,7 +1121,9 @@ mod tests {
parse_ipc_message(data_with_invalid_length)
.unwrap_err()
.to_string(),
- String::from("Arrow error: Parser error: Not a record batch")
+ String::from(
+ "Fluss hitting Arrow error Parser error: Not a record batch:
ParseError(\"Not a record batch\")."
+ )
);
}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 6929b57..1ea3933 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -290,18 +290,22 @@ impl Datum<'_> {
Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) |
Datum::TimestampTz(_) => {
- return Err(RowConvertError(format!(
- "Type {:?} is not yet supported for Arrow conversion",
- std::mem::discriminant(self)
- )));
+ return Err(RowConvertError {
+ message: format!(
+ "Type {:?} is not yet supported for Arrow conversion",
+ std::mem::discriminant(self)
+ ),
+ });
}
}
- Err(RowConvertError(format!(
- "Cannot append {:?} to builder of type {}",
- self,
- std::any::type_name_of_val(builder)
- )))
+ Err(RowConvertError {
+ message: format!(
+ "Cannot append {:?} to builder of type {}",
+ self,
+ std::any::type_name_of_val(builder)
+ ),
+ })
}
}
@@ -313,11 +317,13 @@ macro_rules! impl_to_arrow {
b.append_value(*self);
Ok(())
} else {
- Err(RowConvertError(format!(
- "Cannot cast {} to {} builder",
- stringify!($ty),
- stringify!($variant)
- )))
+ Err(RowConvertError {
+ message: format!(
+ "Cannot cast {} to {} builder",
+ stringify!($ty),
+ stringify!($variant)
+ ),
+ })
}
}
}
diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs
index 84b20b1..da3a11e 100644
--- a/crates/fluss/src/rpc/error.rs
+++ b/crates/fluss/src/rpc/error.rs
@@ -17,6 +17,7 @@
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
+use prost::DecodeError;
use std::sync::Arc;
use thiserror::Error;
@@ -29,6 +30,9 @@ pub enum RpcError {
#[error("Cannot read framed message: {0}")]
ReadMessageError(#[from] crate::rpc::frame::ReadError),
+ #[error("Rpc Decode Error: {0}")]
+ RpcDecodeError(#[from] DecodeError),
+
#[error("connection error")]
ConnectionError(String),
diff --git a/crates/fluss/src/rpc/fluss_api_error.rs
b/crates/fluss/src/rpc/fluss_api_error.rs
new file mode 100644
index 0000000..b26eb72
--- /dev/null
+++ b/crates/fluss/src/rpc/fluss_api_error.rs
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+ pub code: i32,
+ pub message: String,
+}
+
+impl Debug for ApiError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ApiError")
+ .field("code", &self.code)
+ .field("message", &self.message)
+ .finish()
+ }
+}
+
+impl Display for ApiError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ Debug::fmt(self, f)
+ }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+ /// The server experienced an unexpected error when processing the request.
+ UnknownServerError = -1,
+ /// No error occurred.
+ None = 0,
+ /// The server disconnected before a response was received.
+ NetworkException = 1,
+ /// The version of API is not supported.
+ UnsupportedVersion = 2,
+ /// This message has failed its CRC checksum, exceeds the valid size, has
a null key for a primary key table, or is otherwise corrupt.
+ CorruptMessage = 3,
+ /// The database does not exist.
+ DatabaseNotExist = 4,
+ /// The database is not empty.
+ DatabaseNotEmpty = 5,
+ /// The database already exists.
+ DatabaseAlreadyExist = 6,
+ /// The table does not exist.
+ TableNotExist = 7,
+ /// The table already exists.
+ TableAlreadyExist = 8,
+ /// The schema does not exist.
+ SchemaNotExist = 9,
+ /// Exception occur while storage data for log in server.
+ LogStorageException = 10,
+ /// Exception occur while storage data for kv in server.
+ KvStorageException = 11,
+ /// Not leader or follower.
+ NotLeaderOrFollower = 12,
+ /// The record is too large.
+ RecordTooLargeException = 13,
+ /// The record is corrupt.
+ CorruptRecordException = 14,
+ /// The client has attempted to perform an operation on an invalid table.
+ InvalidTableException = 15,
+ /// The client has attempted to perform an operation on an invalid
database.
+ InvalidDatabaseException = 16,
+ /// The replication factor is larger then the number of available tablet
servers.
+ InvalidReplicationFactor = 17,
+ /// Produce request specified an invalid value for required acks.
+ InvalidRequiredAcks = 18,
+ /// The log offset is out of range.
+ LogOffsetOutOfRangeException = 19,
+ /// The table is not primary key table.
+ NonPrimaryKeyTableException = 20,
+ /// The table or bucket does not exist.
+ UnknownTableOrBucketException = 21,
+ /// The update version is invalid.
+ InvalidUpdateVersionException = 22,
+ /// The coordinator is invalid.
+ InvalidCoordinatorException = 23,
+ /// The leader epoch is invalid.
+ FencedLeaderEpochException = 24,
+ /// The request time out.
+ RequestTimeOut = 25,
+ /// The general storage exception.
+ StorageException = 26,
+ /// The server did not attempt to execute this operation.
+ OperationNotAttemptedException = 27,
+ /// Records are written to the server already, but to fewer in-sync
replicas than required.
+ NotEnoughReplicasAfterAppendException = 28,
+ /// Messages are rejected since there are fewer in-sync replicas than
required.
+ NotEnoughReplicasException = 29,
+ /// Get file access security token exception.
+ SecurityTokenException = 30,
+ /// The tablet server received an out of order sequence batch.
+ OutOfOrderSequenceException = 31,
+ /// The tablet server received a duplicate sequence batch.
+ DuplicateSequenceException = 32,
+ /// This exception is raised by the tablet server if it could not locate
the writer metadata.
+ UnknownWriterIdException = 33,
+ /// The requested column projection is invalid.
+ InvalidColumnProjection = 34,
+ /// The requested target column to write is invalid.
+ InvalidTargetColumn = 35,
+ /// The partition does not exist.
+ PartitionNotExists = 36,
+ /// The table is not partitioned.
+ TableNotPartitionedException = 37,
+ /// The timestamp is invalid.
+ InvalidTimestampException = 38,
+ /// The config is invalid.
+ InvalidConfigException = 39,
+ /// The lake storage is not configured.
+ LakeStorageNotConfiguredException = 40,
+ /// The kv snapshot is not exist.
+ KvSnapshotNotExist = 41,
+ /// The partition already exists.
+ PartitionAlreadyExists = 42,
+ /// The partition spec is invalid.
+ PartitionSpecInvalidException = 43,
+ /// There is no currently available leader for the given partition.
+ LeaderNotAvailableException = 44,
+ /// Exceed the maximum number of partitions.
+ PartitionMaxNumException = 45,
+ /// Authentication failed.
+ AuthenticateException = 46,
+ /// Security is disabled.
+ SecurityDisabledException = 47,
+ /// Authorization failed.
+ AuthorizationException = 48,
+ /// Exceed the maximum number of buckets.
+ BucketMaxNumException = 49,
+ /// The tiering epoch is invalid.
+ FencedTieringEpochException = 50,
+ /// Authentication failed with retriable exception.
+ RetriableAuthenticateException = 51,
+ /// The server rack info is invalid.
+ InvalidServerRackInfoException = 52,
+ /// The lake snapshot is not exist.
+ LakeSnapshotNotExist = 53,
+ /// The lake table already exists.
+ LakeTableAlreadyExist = 54,
+ /// The new ISR contains at least one ineligible replica.
+ IneligibleReplicaException = 55,
+ /// The alter table is invalid.
+ InvalidAlterTableException = 56,
+ /// Deletion operations are disabled on this table.
+ DeletionDisabledException = 57,
+}
+
+impl FlussError {
+ /// Returns the error code for this error.
+ pub fn code(&self) -> i32 {
+ *self as i32
+ }
+
+ /// Returns a friendly description of the error.
+ pub fn message(&self) -> &'static str {
+ match self {
+ FlussError::UnknownServerError => {
+ "The server experienced an unexpected error when processing
the request."
+ }
+ FlussError::None => "No error",
+ FlussError::NetworkException => {
+ "The server disconnected before a response was received."
+ }
+ FlussError::UnsupportedVersion => "The version of API is not
supported.",
+ FlussError::CorruptMessage => {
+ "This message has failed its CRC checksum, exceeds the valid
size, has a null key for a primary key table, or is otherwise corrupt."
+ }
+ FlussError::DatabaseNotExist => "The database does not exist.",
+ FlussError::DatabaseNotEmpty => "The database is not empty.",
+ FlussError::DatabaseAlreadyExist => "The database already exists.",
+ FlussError::TableNotExist => "The table does not exist.",
+ FlussError::TableAlreadyExist => "The table already exists.",
+ FlussError::SchemaNotExist => "The schema does not exist.",
+ FlussError::LogStorageException => {
+ "Exception occur while storage data for log in server."
+ }
+ FlussError::KvStorageException => {
+ "Exception occur while storage data for kv in server."
+ }
+ FlussError::NotLeaderOrFollower => "Not leader or follower.",
+ FlussError::RecordTooLargeException => "The record is too large.",
+ FlussError::CorruptRecordException => "The record is corrupt.",
+ FlussError::InvalidTableException => {
+ "The client has attempted to perform an operation on an
invalid table."
+ }
+ FlussError::InvalidDatabaseException => {
+ "The client has attempted to perform an operation on an
invalid database."
+ }
+ FlussError::InvalidReplicationFactor => {
+ "The replication factor is larger then the number of available
tablet servers."
+ }
+ FlussError::InvalidRequiredAcks => {
+ "Produce request specified an invalid value for required acks."
+ }
+ FlussError::LogOffsetOutOfRangeException => "The log offset is out
of range.",
+ FlussError::NonPrimaryKeyTableException => "The table is not
primary key table.",
+ FlussError::UnknownTableOrBucketException => "The table or bucket
does not exist.",
+ FlussError::InvalidUpdateVersionException => "The update version
is invalid.",
+ FlussError::InvalidCoordinatorException => "The coordinator is
invalid.",
+ FlussError::FencedLeaderEpochException => "The leader epoch is
invalid.",
+ FlussError::RequestTimeOut => "The request time out.",
+ FlussError::StorageException => "The general storage exception.",
+ FlussError::OperationNotAttemptedException => {
+ "The server did not attempt to execute this operation."
+ }
+ FlussError::NotEnoughReplicasAfterAppendException => {
+ "Records are written to the server already, but to fewer
in-sync replicas than required."
+ }
+ FlussError::NotEnoughReplicasException => {
+ "Messages are rejected since there are fewer in-sync replicas
than required."
+ }
+ FlussError::SecurityTokenException => "Get file access security
token exception.",
+ FlussError::OutOfOrderSequenceException => {
+ "The tablet server received an out of order sequence batch."
+ }
+ FlussError::DuplicateSequenceException => {
+ "The tablet server received a duplicate sequence batch."
+ }
+ FlussError::UnknownWriterIdException => {
+ "This exception is raised by the tablet server if it could not
locate the writer metadata."
+ }
+ FlussError::InvalidColumnProjection => "The requested column
projection is invalid.",
+ FlussError::InvalidTargetColumn => "The requested target column to
write is invalid.",
+ FlussError::PartitionNotExists => "The partition does not exist.",
+ FlussError::TableNotPartitionedException => "The table is not
partitioned.",
+ FlussError::InvalidTimestampException => "The timestamp is
invalid.",
+ FlussError::InvalidConfigException => "The config is invalid.",
+ FlussError::LakeStorageNotConfiguredException => "The lake storage
is not configured.",
+ FlussError::KvSnapshotNotExist => "The kv snapshot does not
exist.",
+ FlussError::PartitionAlreadyExists => "The partition already
exists.",
+ FlussError::PartitionSpecInvalidException => "The partition spec
is invalid.",
+ FlussError::LeaderNotAvailableException => {
+ "There is no currently available leader for the given
partition."
+ }
+ FlussError::PartitionMaxNumException => "Exceed the maximum number
of partitions.",
+ FlussError::AuthenticateException => "Authentication failed.",
+ FlussError::SecurityDisabledException => "Security is disabled.",
+ FlussError::AuthorizationException => "Authorization failed.",
+ FlussError::BucketMaxNumException => "Exceed the maximum number of
buckets.",
+ FlussError::FencedTieringEpochException => "The tiering epoch is
invalid.",
+ FlussError::RetriableAuthenticateException => {
+ "Authentication failed with retriable exception."
+ }
+ FlussError::InvalidServerRackInfoException => "The server rack
info is invalid.",
+ FlussError::LakeSnapshotNotExist => "The lake snapshot does not
exist.",
+ FlussError::LakeTableAlreadyExist => "The lake table already
exists.",
+ FlussError::IneligibleReplicaException => {
+ "The new ISR contains at least one ineligible replica."
+ }
+ FlussError::InvalidAlterTableException => "The alter table is
invalid.",
+ FlussError::DeletionDisabledException => {
+ "Deletion operations are disabled on this table."
+ }
+ }
+ }
+
+ /// Create an ApiError from this error with the default message.
+ pub fn to_api_error(&self, message: Option<String>) -> ApiError {
+ ApiError {
+ code: self.code(),
+ message: message.unwrap_or(self.message().to_string()),
+ }
+ }
+
+ /// Get the FlussError for the given error code.
+ /// Returns `UnknownServerError` if the code is not recognized.
+ pub fn for_code(code: i32) -> Self {
+ match code {
+ -1 => FlussError::UnknownServerError,
+ 0 => FlussError::None,
+ 1 => FlussError::NetworkException,
+ 2 => FlussError::UnsupportedVersion,
+ 3 => FlussError::CorruptMessage,
+ 4 => FlussError::DatabaseNotExist,
+ 5 => FlussError::DatabaseNotEmpty,
+ 6 => FlussError::DatabaseAlreadyExist,
+ 7 => FlussError::TableNotExist,
+ 8 => FlussError::TableAlreadyExist,
+ 9 => FlussError::SchemaNotExist,
+ 10 => FlussError::LogStorageException,
+ 11 => FlussError::KvStorageException,
+ 12 => FlussError::NotLeaderOrFollower,
+ 13 => FlussError::RecordTooLargeException,
+ 14 => FlussError::CorruptRecordException,
+ 15 => FlussError::InvalidTableException,
+ 16 => FlussError::InvalidDatabaseException,
+ 17 => FlussError::InvalidReplicationFactor,
+ 18 => FlussError::InvalidRequiredAcks,
+ 19 => FlussError::LogOffsetOutOfRangeException,
+ 20 => FlussError::NonPrimaryKeyTableException,
+ 21 => FlussError::UnknownTableOrBucketException,
+ 22 => FlussError::InvalidUpdateVersionException,
+ 23 => FlussError::InvalidCoordinatorException,
+ 24 => FlussError::FencedLeaderEpochException,
+ 25 => FlussError::RequestTimeOut,
+ 26 => FlussError::StorageException,
+ 27 => FlussError::OperationNotAttemptedException,
+ 28 => FlussError::NotEnoughReplicasAfterAppendException,
+ 29 => FlussError::NotEnoughReplicasException,
+ 30 => FlussError::SecurityTokenException,
+ 31 => FlussError::OutOfOrderSequenceException,
+ 32 => FlussError::DuplicateSequenceException,
+ 33 => FlussError::UnknownWriterIdException,
+ 34 => FlussError::InvalidColumnProjection,
+ 35 => FlussError::InvalidTargetColumn,
+ 36 => FlussError::PartitionNotExists,
+ 37 => FlussError::TableNotPartitionedException,
+ 38 => FlussError::InvalidTimestampException,
+ 39 => FlussError::InvalidConfigException,
+ 40 => FlussError::LakeStorageNotConfiguredException,
+ 41 => FlussError::KvSnapshotNotExist,
+ 42 => FlussError::PartitionAlreadyExists,
+ 43 => FlussError::PartitionSpecInvalidException,
+ 44 => FlussError::LeaderNotAvailableException,
+ 45 => FlussError::PartitionMaxNumException,
+ 46 => FlussError::AuthenticateException,
+ 47 => FlussError::SecurityDisabledException,
+ 48 => FlussError::AuthorizationException,
+ 49 => FlussError::BucketMaxNumException,
+ 50 => FlussError::FencedTieringEpochException,
+ 51 => FlussError::RetriableAuthenticateException,
+ 52 => FlussError::InvalidServerRackInfoException,
+ 53 => FlussError::LakeSnapshotNotExist,
+ 54 => FlussError::LakeTableAlreadyExist,
+ 55 => FlussError::IneligibleReplicaException,
+ 56 => FlussError::InvalidAlterTableException,
+ 57 => FlussError::DeletionDisabledException,
+ _ => FlussError::UnknownServerError,
+ }
+ }
+}
+
+impl Display for FlussError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.message())
+ }
+}
+
+impl From<ErrorResponse> for ApiError {
+ fn from(error_response: ErrorResponse) -> Self {
+ let fluss_error = FlussError::for_code(error_response.error_code);
+ fluss_error.to_api_error(error_response.error_message)
+ }
+}
+
+impl From<ApiError> for FlussError {
+ fn from(api_error: ApiError) -> Self {
+ FlussError::for_code(api_error.code)
+ }
+}
diff --git a/crates/fluss/src/rpc/frame.rs b/crates/fluss/src/rpc/frame.rs
index 44dadc9..81cc094 100644
--- a/crates/fluss/src/rpc/frame.rs
+++ b/crates/fluss/src/rpc/frame.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use prost::DecodeError;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
@@ -29,6 +30,9 @@ pub enum ReadError {
#[error("Message too large, limit is {limit} bytes but got {actual}
bytes")]
MessageTooLarge { limit: usize, actual: usize },
+
+ #[error("Fail to decode error response: {0}")]
+ ProtoErrorResponseDecodeError(#[from] DecodeError),
}
pub trait AsyncMessageRead {
diff --git a/crates/fluss/src/rpc/message/create_database.rs
b/crates/fluss/src/rpc/message/create_database.rs
index e4052ef..7d24235 100644
--- a/crates/fluss/src/rpc/message/create_database.rs
+++ b/crates/fluss/src/rpc/message/create_database.rs
@@ -22,7 +22,8 @@ use crate::error::Result as FlussResult;
use crate::proto::CreateDatabaseResponse;
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/create_table.rs
b/crates/fluss/src/rpc/message/create_table.rs
index 5802e71..69865b8 100644
--- a/crates/fluss/src/rpc/message/create_table.rs
+++ b/crates/fluss/src/rpc/message/create_table.rs
@@ -23,7 +23,8 @@ use crate::proto::CreateTableResponse;
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::ReadError;
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/database_exists.rs
b/crates/fluss/src/rpc/message/database_exists.rs
index 795eea1..7e717a4 100644
--- a/crates/fluss/src/rpc/message/database_exists.rs
+++ b/crates/fluss/src/rpc/message/database_exists.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/drop_database.rs
b/crates/fluss/src/rpc/message/drop_database.rs
index 49cbfaf..663e970 100644
--- a/crates/fluss/src/rpc/message/drop_database.rs
+++ b/crates/fluss/src/rpc/message/drop_database.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/drop_table.rs
b/crates/fluss/src/rpc/message/drop_table.rs
index 0dbc21b..a2b3f2d 100644
--- a/crates/fluss/src/rpc/message/drop_table.rs
+++ b/crates/fluss/src/rpc/message/drop_table.rs
@@ -19,10 +19,12 @@ use crate::metadata::TablePath;
use crate::{impl_read_version_type, impl_write_version_type, proto};
use crate::proto::DropTableResponse;
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/fetch.rs
b/crates/fluss/src/rpc/message/fetch.rs
index 6ebc5a2..1587606 100644
--- a/crates/fluss/src/rpc/message/fetch.rs
+++ b/crates/fluss/src/rpc/message/fetch.rs
@@ -16,9 +16,11 @@
// under the License.
use crate::proto::FetchLogResponse;
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use prost::Message;
diff --git a/crates/fluss/src/rpc/message/get_database_info.rs
b/crates/fluss/src/rpc/message/get_database_info.rs
index 85492a8..6468beb 100644
--- a/crates/fluss/src/rpc/message/get_database_info.rs
+++ b/crates/fluss/src/rpc/message/get_database_info.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
index a0e186e..a632a15 100644
--- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
+++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs
@@ -19,10 +19,12 @@ use crate::proto;
use crate::proto::PbTablePath;
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
use crate::{impl_read_version_type, impl_write_version_type};
use bytes::{Buf, BufMut};
use prost::Message;
diff --git a/crates/fluss/src/rpc/message/get_table.rs
b/crates/fluss/src/rpc/message/get_table.rs
index 4f4d6c7..61657f7 100644
--- a/crates/fluss/src/rpc/message/get_table.rs
+++ b/crates/fluss/src/rpc/message/get_table.rs
@@ -18,10 +18,12 @@
use crate::proto::{GetTableInfoRequest, GetTableInfoResponse, PbTablePath};
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
use crate::{impl_read_version_type, impl_write_version_type};
use bytes::{Buf, BufMut};
use prost::Message;
diff --git a/crates/fluss/src/rpc/message/header.rs
b/crates/fluss/src/rpc/message/header.rs
index fe60f8c..77bda7c 100644
--- a/crates/fluss/src/rpc/message/header.rs
+++ b/crates/fluss/src/rpc/message/header.rs
@@ -15,11 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use crate::proto::ErrorResponse;
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use crate::rpc::frame::{ReadError, WriteError};
use crate::rpc::message::{ReadVersionedType, WriteVersionedType};
use bytes::{Buf, BufMut};
+use prost::Message;
#[allow(dead_code)]
const REQUEST_HEADER_LENGTH: i32 = 8;
@@ -53,9 +55,10 @@ where
}
}
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq)]
pub struct ResponseHeader {
pub request_id: i32,
+ pub error_response: Option<ErrorResponse>,
}
impl<R> ReadVersionedType<R> for ResponseHeader
@@ -64,10 +67,17 @@ where
{
fn read_versioned(reader: &mut R, _version: ApiVersion) -> Result<Self,
ReadError> {
let resp_type = reader.get_u8();
+ let request_id = reader.get_i32();
if resp_type != SUCCESS_RESPONSE {
- todo!("handle unsuccess response type");
+ let error_response = ErrorResponse::decode(reader)?;
+ return Ok(ResponseHeader {
+ request_id,
+ error_response: Some(error_response),
+ });
}
- let request_id = reader.get_i32();
- Ok(ResponseHeader { request_id })
+ Ok(ResponseHeader {
+ request_id,
+ error_response: None,
+ })
}
}
diff --git a/crates/fluss/src/rpc/message/list_databases.rs
b/crates/fluss/src/rpc/message/list_databases.rs
index ce5a091..83226ab 100644
--- a/crates/fluss/src/rpc/message/list_databases.rs
+++ b/crates/fluss/src/rpc/message/list_databases.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/list_offsets.rs
b/crates/fluss/src/rpc/message/list_offsets.rs
index 500db33..9ab1f14 100644
--- a/crates/fluss/src/rpc/message/list_offsets.rs
+++ b/crates/fluss/src/rpc/message/list_offsets.rs
@@ -20,9 +20,11 @@ use crate::{impl_read_version_type, impl_write_version_type,
proto};
use crate::error::Error;
use crate::error::Result as FlussResult;
use crate::proto::ListOffsetsResponse;
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use std::collections::HashMap;
@@ -108,12 +110,15 @@ impl ListOffsetsResponse {
.map(|resp| {
if resp.error_code.is_some() {
// todo: consider use another suitable error
- Err(Error::WriteError(format!(
- "Missing offset, error message: {}",
- resp.error_message
- .as_deref()
- .unwrap_or("unknown server exception")
- )))
+ Err(Error::UnexpectedError {
+ message: format!(
+ "Missing offset, error message: {}",
+ resp.error_message
+ .as_deref()
+ .unwrap_or("unknown server exception")
+ ),
+ source: None,
+ })
} else {
// if no error msg, offset must exists
Ok((resp.bucket_id, resp.offset.unwrap()))
diff --git a/crates/fluss/src/rpc/message/list_tables.rs
b/crates/fluss/src/rpc/message/list_tables.rs
index daf57ea..ff2497a 100644
--- a/crates/fluss/src/rpc/message/list_tables.rs
+++ b/crates/fluss/src/rpc/message/list_tables.rs
@@ -18,9 +18,11 @@
use crate::{impl_read_version_type, impl_write_version_type, proto};
use crate::proto::ListTablesResponse;
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use bytes::{Buf, BufMut};
diff --git a/crates/fluss/src/rpc/message/mod.rs
b/crates/fluss/src/rpc/message/mod.rs
index 0ed5b7c..b619ee4 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -38,6 +38,7 @@ mod produce_log;
mod table_exists;
mod update_metadata;
+pub use crate::rpc::RpcError;
pub use create_database::*;
pub use create_table::*;
pub use database_exists::*;
diff --git a/crates/fluss/src/rpc/message/produce_log.rs
b/crates/fluss/src/rpc/message/produce_log.rs
index 7da2b59..39bfb3f 100644
--- a/crates/fluss/src/rpc/message/produce_log.rs
+++ b/crates/fluss/src/rpc/message/produce_log.rs
@@ -17,9 +17,11 @@
use crate::error::Result as FlussResult;
use crate::proto::{PbProduceLogReqForBucket, ProduceLogResponse};
+use crate::rpc::frame::ReadError;
+
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::{impl_read_version_type, impl_write_version_type, proto};
use std::sync::Arc;
diff --git a/crates/fluss/src/rpc/message/table_exists.rs
b/crates/fluss/src/rpc/message/table_exists.rs
index 3b71f47..ec98211 100644
--- a/crates/fluss/src/rpc/message/table_exists.rs
+++ b/crates/fluss/src/rpc/message/table_exists.rs
@@ -22,12 +22,13 @@ use crate::proto::TableExistsResponse;
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
use crate::rpc::convert::to_table_path;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::rpc::frame::ReadError;
+
use bytes::{Buf, BufMut};
use prost::Message;
-
#[derive(Debug)]
pub struct TableExistsRequest {
pub inner_request: proto::TableExistsRequest,
diff --git a/crates/fluss/src/rpc/message/update_metadata.rs
b/crates/fluss/src/rpc/message/update_metadata.rs
index 0d8ad64..a6e6288 100644
--- a/crates/fluss/src/rpc/message/update_metadata.rs
+++ b/crates/fluss/src/rpc/message/update_metadata.rs
@@ -18,10 +18,12 @@
use crate::proto::{MetadataResponse, PbTablePath};
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
-use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
use crate::metadata::TablePath;
+use crate::rpc::frame::ReadError;
+
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
use prost::Message;
diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs
index b8705a3..86e13b1 100644
--- a/crates/fluss/src/rpc/mod.rs
+++ b/crates/fluss/src/rpc/mod.rs
@@ -17,7 +17,9 @@
mod api_key;
mod api_version;
-mod error;
+pub mod error;
+mod fluss_api_error;
+pub use fluss_api_error::{ApiError, FlussError};
mod frame;
pub mod message;
pub use error::*;
diff --git a/crates/fluss/src/rpc/server_connection.rs
b/crates/fluss/src/rpc/server_connection.rs
index c474534..fdeb56f 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::cluster::ServerNode;
+use crate::error::Error;
use crate::rpc::api_version::ApiVersion;
use crate::rpc::error::RpcError;
use crate::rpc::error::RpcError::ConnectionError;
@@ -230,7 +231,7 @@ where
}
}
- pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, RpcError>
+ pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, Error>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
@@ -249,9 +250,12 @@ where
let mut buf = Vec::new();
// write header
- header.write_versioned(&mut buf, header_version)?;
+ header
+ .write_versioned(&mut buf, header_version)
+ .map_err(RpcError::WriteMessageError)?;
// write message body
- msg.write_versioned(&mut buf, body_api_version)?;
+ msg.write_versioned(&mut buf, body_api_version)
+ .map_err(RpcError::WriteMessageError)?;
let (tx, rx) = channel();
@@ -264,14 +268,21 @@ where
ConnectionState::RequestMap(map) => {
map.insert(request_id, ActiveRequest { channel: tx });
}
- ConnectionState::Poison(e) => return
Err(RpcError::Poisoned(Arc::clone(e))),
+ ConnectionState::Poison(e) => return
Err(RpcError::Poisoned(Arc::clone(e)).into()),
}
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
let mut response = rx.await.expect("Who closed this channel?!")?;
- let body = R::ResponseBody::read_versioned(&mut response.data,
body_api_version)?;
+ if let Some(error_response) = response.header.error_response {
+ return Err(Error::FlussAPIError {
+ api_error: crate::rpc::ApiError::from(error_response),
+ });
+ }
+
+ let body = R::ResponseBody::read_versioned(&mut response.data,
body_api_version)
+ .map_err(RpcError::ReadMessageError)?;
let read_bytes = response.data.position();
let message_bytes = response.data.into_inner().len() as u64;
@@ -281,7 +292,8 @@ where
read: read_bytes,
api_key: R::API_KEY,
api_version: body_api_version,
- });
+ }
+ .into());
}
Ok(body)
}
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index 0086d9c..ccb7172 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -34,6 +34,7 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
mod admin_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
+ use fluss::error::FlussError;
use fluss::metadata::{
DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema,
TableDescriptor,
TablePath,
@@ -251,4 +252,35 @@ mod admin_test {
// database shouldn't exist now
assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
}
+
+ #[tokio::test]
+ async fn test_fluss_error_response() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection
+ .get_admin()
+ .await
+ .expect("Failed to get admin client");
+
+ let table_path = TablePath::new("fluss".to_string(),
"not_exist".to_string());
+
+ let result = admin.get_table(&table_path).await;
+ assert!(result.is_err(), "Expected error but got Ok");
+
+ let error = result.unwrap_err();
+ match error {
+ fluss::error::Error::FlussAPIError { api_error } => {
+ assert_eq!(
+ api_error.code,
+ FlussError::TableNotExist.code(),
+ "Expected error code 7 (TableNotExist)"
+ );
+ assert_eq!(
+ api_error.message, "Table 'fluss.not_exist' does not
exist.",
+ "Expected specific error message"
+ );
+ }
+ other => panic!("Expected FlussAPIError, got {:?}", other),
+ }
+ }
}
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index f52d526..ca61ff8 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -38,8 +38,6 @@ mod table_remote_scan_test {
use fluss::row::{GenericRow, InternalRow};
use std::collections::HashMap;
use std::sync::Arc;
- use std::sync::atomic::AtomicUsize;
- use std::sync::atomic::Ordering;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
@@ -89,11 +87,13 @@ mod table_remote_scan_test {
temp_dir.to_string_lossy().to_string(),
);
- let cluster =
-
FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf)
- .with_remote_data_dir(temp_dir)
- .build()
- .await;
+ let cluster =
FlussTestingClusterBuilder::new_with_cluster_conf(
+ "test_table_remote",
+ &cluster_conf,
+ )
+ .with_remote_data_dir(temp_dir)
+ .build()
+ .await;
let mut guard = cluster_guard.write();
*guard = Some(cluster);
});