This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ea9c57a chore: Fix API error not being propagated on python side.
(#340)
ea9c57a is described below
commit ea9c57aefbe96522593039b1f7d2201de73ecd78
Author: Keith Lee <[email protected]>
AuthorDate: Mon Feb 16 00:37:16 2026 +0000
chore: Fix API error not being propagated on python side. (#340)
---
bindings/python/src/admin.rs | 12 +++++-------
crates/fluss/src/client/connection.rs | 15 +++++++++++++--
crates/fluss/src/error.rs | 9 +++++++++
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 9a96bea..30db375 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -345,7 +345,7 @@ impl FlussAdmin {
admin
.drop_table(&core_table_path, ignore_if_not_exists)
.await
- .map_err(|e| FlussError::new_err(format!("Failed to drop
table: {e}")))?;
+ .map_err(|e| FlussError::from_core_error(&e))?;
Python::attach(|py| Ok(py.None()))
})
@@ -378,7 +378,7 @@ impl FlussAdmin {
let offsets = admin
.list_offsets(&core_table_path, &bucket_ids, offset_spec)
.await
- .map_err(|e| FlussError::new_err(format!("Failed to list
offsets: {e}")))?;
+ .map_err(|e| FlussError::from_core_error(&e))?;
Python::attach(|py| {
let dict = pyo3::types::PyDict::new(py);
@@ -420,9 +420,7 @@ impl FlussAdmin {
let offsets = admin
.list_partition_offsets(&core_table_path, &partition_name,
&bucket_ids, offset_spec)
.await
- .map_err(|e| {
- FlussError::new_err(format!("Failed to list partition
offsets: {e}"))
- })?;
+ .map_err(|e| FlussError::from_core_error(&e))?;
Python::attach(|py| {
let dict = pyo3::types::PyDict::new(py);
@@ -459,7 +457,7 @@ impl FlussAdmin {
admin
.create_partition(&core_table_path, &core_partition_spec,
ignore_if_exists)
.await
- .map_err(|e| FlussError::new_err(format!("Failed to create
partition: {e}")))?;
+ .map_err(|e| FlussError::from_core_error(&e))?;
Python::attach(|py| Ok(py.None()))
})
@@ -490,7 +488,7 @@ impl FlussAdmin {
let partition_infos = admin
.list_partition_infos_with_spec(&core_table_path,
core_partition_spec.as_ref())
.await
- .map_err(|e| FlussError::new_err(format!("Failed to list
partitions: {e}")))?;
+ .map_err(|e| FlussError::from_core_error(&e))?;
Python::attach(|py| {
let py_list = pyo3::types::PyList::empty(py);
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index b370682..a17e57f 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -24,7 +24,7 @@ use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
-use crate::error::Result;
+use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;
pub struct FlussConnection {
@@ -88,7 +88,18 @@ impl FlussConnection {
pub async fn get_table(&self, table_path: &TablePath) ->
Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
- let table_info =
self.metadata.get_cluster().get_table(table_path)?.clone();
+ let table_info = self
+ .metadata
+ .get_cluster()
+ .get_table(table_path)
+ .map_err(|e| {
+ if e.api_error() == Some(FlussError::InvalidTableException) {
+ Error::table_not_exist(format!("Table not found:
{table_path}"))
+ } else {
+ e
+ }
+ })?
+ .clone();
Ok(FlussTable::new(self, self.metadata.clone(), table_info))
}
}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index af9f274..59524a6 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -112,6 +112,15 @@ pub enum Error {
/// These create `FlussAPIError` with the correct protocol error code,
/// consistent with Java where e.g. `InvalidTableException` always carries
code 15.
impl Error {
+ pub fn table_not_exist(message: impl Into<String>) -> Self {
+ Error::FlussAPIError {
+ api_error: ApiError {
+ code: FlussError::TableNotExist.code(),
+ message: message.into(),
+ },
+ }
+ }
+
pub fn invalid_table(message: impl Into<String>) -> Self {
Error::FlussAPIError {
api_error: ApiError {