Copilot commented on code in PR #77:
URL: https://github.com/apache/fluss-rust/pull/77#discussion_r2616192884


##########
crates/fluss/tests/integration/admin.rs:
##########
@@ -33,11 +33,14 @@ static SHARED_FLUSS_CLUSTER: 
Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
 mod admin_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
+    use fluss::error::FlussError;
     use fluss::metadata::{
         DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, 
TableDescriptor,
         TablePath,
     };
     use std::sync::Arc;
+    use std::sync::atomic::AtomicUsize;
+    use std::sync::atomic::Ordering;

Review Comment:
   The imports `AtomicUsize` and `Ordering` are not used anywhere in the file 
and should be removed.



##########
crates/fluss/tests/integration/admin.rs:
##########
@@ -250,4 +253,35 @@ mod admin_test {
         // database shouldn't exist now
         assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
     }
+
+    #[tokio::test]
+    async fn test_fluss_error_response() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection
+            .get_admin()
+            .await
+            .expect("Failed to get admin client");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"not_exixt".to_string());

Review Comment:
   Typo in table name: "not_exixt" should be "not_exist". This typo also 
appears in the expected error message on line 280, which should also be 
corrected to maintain consistency.



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot is not exist.",
+            FlussError::PartitionAlreadyExists => "The partition already 
exists.",
+            FlussError::PartitionSpecInvalidException => "The partition spec 
is invalid.",
+            FlussError::LeaderNotAvailableException => {
+                "There is no currently available leader for the given 
partition."
+            }
+            FlussError::PartitionMaxNumException => "Exceed the maximum number 
of partitions.",
+            FlussError::AuthenticateException => "Authentication failed.",
+            FlussError::SecurityDisabledException => "Security is disabled.",
+            FlussError::AuthorizationException => "Authorization failed",
+            FlussError::BucketMaxNumException => "Exceed the maximum number of 
buckets",

Review Comment:
   Missing period at the end of error message for consistency. Lines 259 and 
260 are missing periods while all other error messages in this function have 
periods.
   ```suggestion
               FlussError::AuthorizationException => "Authorization failed.",
               FlussError::BucketMaxNumException => "Exceed the maximum number 
of buckets.",
   ```



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot is not exist.",
+            FlussError::PartitionAlreadyExists => "The partition already 
exists.",
+            FlussError::PartitionSpecInvalidException => "The partition spec 
is invalid.",
+            FlussError::LeaderNotAvailableException => {
+                "There is no currently available leader for the given 
partition."
+            }
+            FlussError::PartitionMaxNumException => "Exceed the maximum number 
of partitions.",
+            FlussError::AuthenticateException => "Authentication failed.",
+            FlussError::SecurityDisabledException => "Security is disabled.",
+            FlussError::AuthorizationException => "Authorization failed",
+            FlussError::BucketMaxNumException => "Exceed the maximum number of 
buckets",

Review Comment:
   Missing period at the end of error message for consistency. Lines 259 and 
260 are missing periods while all other error messages in this function have 
periods.
   ```suggestion
               FlussError::AuthorizationException => "Authorization failed.",
               FlussError::BucketMaxNumException => "Exceed the maximum number 
of buckets.",
   ```



##########
crates/fluss/src/io/storage.rs:
##########
@@ -39,9 +39,9 @@ impl Storage {
             Scheme::Memory => Ok(Self::Memory),
             #[cfg(feature = "storage-fs")]
             Scheme::Fs => Ok(Self::LocalFs),
-            _ => Err(error::Error::IoUnsupported(
-                "Unsupported storage feature".to_string(),
-            )),
+            _schema => Err(error::Error::IoUnsupported {

Review Comment:
   Variable name `_schema` should be `_scheme` to match the context and the 
`scheme` variable on line 35. The parameter represents a URI scheme, not a 
database schema.
   ```suggestion
               _scheme => Err(error::Error::IoUnsupported {
   ```



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot is not exist.",

Review Comment:
   Grammatical error: "is not exist" should be "does not exist" to match the 
correct English grammar used in other similar error messages (e.g., line 250, 
line 266).
   ```suggestion
               FlussError::KvSnapshotNotExist => "The kv snapshot does not 
exist.",
   ```



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot is not exist.",
+            FlussError::PartitionAlreadyExists => "The partition already 
exists.",
+            FlussError::PartitionSpecInvalidException => "The partition spec 
is invalid.",
+            FlussError::LeaderNotAvailableException => {
+                "There is no currently available leader for the given 
partition."
+            }
+            FlussError::PartitionMaxNumException => "Exceed the maximum number 
of partitions.",
+            FlussError::AuthenticateException => "Authentication failed.",
+            FlussError::SecurityDisabledException => "Security is disabled.",
+            FlussError::AuthorizationException => "Authorization failed",
+            FlussError::BucketMaxNumException => "Exceed the maximum number of 
buckets",
+            FlussError::FencedTieringEpochException => "The tiering epoch is 
invalid.",
+            FlussError::RetriableAuthenticateException => {
+                "Authentication failed with retriable exception. "

Review Comment:
   Extra trailing space at the end of the error message. The message ends with 
a period followed by a space, while other error messages only end with a period.
   ```suggestion
                   "Authentication failed with retriable exception."
   ```



##########
crates/fluss/src/error.rs:
##########
@@ -15,48 +15,137 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::rpc::RpcError;
+pub use crate::rpc::RpcError;
+pub use crate::rpc::{ApiError, FlussError};
+
 use arrow_schema::ArrowError;
+use snafu::Snafu;
 use std::{io, result};
-use thiserror::Error;
 
 pub type Result<T> = result::Result<T, Error>;
 
-#[derive(Debug, Error)]
+#[derive(Debug, Snafu)]
 pub enum Error {
-    #[error(transparent)]
-    Io(#[from] io::Error),
+    #[snafu(
+        whatever,
+        display("Fluss hitting Unexpected error {}: {:?}", message, source)

Review Comment:
   Inconsistent capitalization in error display message. Line 31 uses 
"Unexpected error" (capitalized) while lines 42, 49, and 73 use "unexpected" 
(lowercase). The error display messages should use consistent capitalization 
throughout.
   ```suggestion
           display("Fluss hitting unexpected error {}: {:?}", message, source)
   ```



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::ErrorResponse;
+use std::fmt::{Debug, Display, Formatter};
+
+/// API error response from Fluss server
+pub struct ApiError {
+    pub code: i32,
+    pub message: String,
+}
+
+impl Debug for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ApiError")
+            .field("code", &self.code)
+            .field("message", &self.message)
+            .finish()
+    }
+}
+
+impl Display for ApiError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        Debug::fmt(self, f)
+    }
+}
+
+/// Fluss protocol errors. These errors are part of the client-server protocol.
+/// The error codes cannot be changed, but the names can be.
+///
+/// Do not add exceptions that occur only on the client or only on the server 
here.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[repr(i32)]
+pub enum FlussError {
+    /// The server experienced an unexpected error when processing the request.
+    UnknownServerError = -1,
+    /// No error occurred.
+    None = 0,
+    /// The server disconnected before a response was received.
+    NetworkException = 1,
+    /// The version of API is not supported.
+    UnsupportedVersion = 2,
+    /// This message has failed its CRC checksum, exceeds the valid size, has 
a null key for a primary key table, or is otherwise corrupt.
+    CorruptMessage = 3,
+    /// The database does not exist.
+    DatabaseNotExist = 4,
+    /// The database is not empty.
+    DatabaseNotEmpty = 5,
+    /// The database already exists.
+    DatabaseAlreadyExist = 6,
+    /// The table does not exist.
+    TableNotExist = 7,
+    /// The table already exists.
+    TableAlreadyExist = 8,
+    /// The schema does not exist.
+    SchemaNotExist = 9,
+    /// Exception occur while storage data for log in server.
+    LogStorageException = 10,
+    /// Exception occur while storage data for kv in server.
+    KvStorageException = 11,
+    /// Not leader or follower.
+    NotLeaderOrFollower = 12,
+    /// The record is too large.
+    RecordTooLargeException = 13,
+    /// The record is corrupt.
+    CorruptRecordException = 14,
+    /// The client has attempted to perform an operation on an invalid table.
+    InvalidTableException = 15,
+    /// The client has attempted to perform an operation on an invalid 
database.
+    InvalidDatabaseException = 16,
+    /// The replication factor is larger then the number of available tablet 
servers.
+    InvalidReplicationFactor = 17,
+    /// Produce request specified an invalid value for required acks.
+    InvalidRequiredAcks = 18,
+    /// The log offset is out of range.
+    LogOffsetOutOfRangeException = 19,
+    /// The table is not primary key table.
+    NonPrimaryKeyTableException = 20,
+    /// The table or bucket does not exist.
+    UnknownTableOrBucketException = 21,
+    /// The update version is invalid.
+    InvalidUpdateVersionException = 22,
+    /// The coordinator is invalid.
+    InvalidCoordinatorException = 23,
+    /// The leader epoch is invalid.
+    FencedLeaderEpochException = 24,
+    /// The request time out.
+    RequestTimeOut = 25,
+    /// The general storage exception.
+    StorageException = 26,
+    /// The server did not attempt to execute this operation.
+    OperationNotAttemptedException = 27,
+    /// Records are written to the server already, but to fewer in-sync 
replicas than required.
+    NotEnoughReplicasAfterAppendException = 28,
+    /// Messages are rejected since there are fewer in-sync replicas than 
required.
+    NotEnoughReplicasException = 29,
+    /// Get file access security token exception.
+    SecurityTokenException = 30,
+    /// The tablet server received an out of order sequence batch.
+    OutOfOrderSequenceException = 31,
+    /// The tablet server received a duplicate sequence batch.
+    DuplicateSequenceException = 32,
+    /// This exception is raised by the tablet server if it could not locate 
the writer metadata.
+    UnknownWriterIdException = 33,
+    /// The requested column projection is invalid.
+    InvalidColumnProjection = 34,
+    /// The requested target column to write is invalid.
+    InvalidTargetColumn = 35,
+    /// The partition does not exist.
+    PartitionNotExists = 36,
+    /// The table is not partitioned.
+    TableNotPartitionedException = 37,
+    /// The timestamp is invalid.
+    InvalidTimestampException = 38,
+    /// The config is invalid.
+    InvalidConfigException = 39,
+    /// The lake storage is not configured.
+    LakeStorageNotConfiguredException = 40,
+    /// The kv snapshot is not exist.
+    KvSnapshotNotExist = 41,
+    /// The partition already exists.
+    PartitionAlreadyExists = 42,
+    /// The partition spec is invalid.
+    PartitionSpecInvalidException = 43,
+    /// There is no currently available leader for the given partition.
+    LeaderNotAvailableException = 44,
+    /// Exceed the maximum number of partitions.
+    PartitionMaxNumException = 45,
+    /// Authentication failed.
+    AuthenticateException = 46,
+    /// Security is disabled.
+    SecurityDisabledException = 47,
+    /// Authorization failed.
+    AuthorizationException = 48,
+    /// Exceed the maximum number of buckets.
+    BucketMaxNumException = 49,
+    /// The tiering epoch is invalid.
+    FencedTieringEpochException = 50,
+    /// Authentication failed with retriable exception.
+    RetriableAuthenticateException = 51,
+    /// The server rack info is invalid.
+    InvalidServerRackInfoException = 52,
+    /// The lake snapshot is not exist.
+    LakeSnapshotNotExist = 53,
+    /// The lake table already exists.
+    LakeTableAlreadyExist = 54,
+    /// The new ISR contains at least one ineligible replica.
+    IneligibleReplicaException = 55,
+    /// The alter table is invalid.
+    InvalidAlterTableException = 56,
+    /// Deletion operations are disabled on this table.
+    DeletionDisabledException = 57,
+}
+
+impl FlussError {
+    /// Returns the error code for this error.
+    pub fn code(&self) -> i32 {
+        *self as i32
+    }
+
+    /// Returns a friendly description of the error.
+    pub fn message(&self) -> &'static str {
+        match self {
+            FlussError::UnknownServerError => {
+                "The server experienced an unexpected error when processing 
the request."
+            }
+            FlussError::None => "No error",
+            FlussError::NetworkException => {
+                "The server disconnected before a response was received."
+            }
+            FlussError::UnsupportedVersion => "The version of API is not 
supported.",
+            FlussError::CorruptMessage => {
+                "This message has failed its CRC checksum, exceeds the valid 
size, has a null key for a primary key table, or is otherwise corrupt."
+            }
+            FlussError::DatabaseNotExist => "The database does not exist.",
+            FlussError::DatabaseNotEmpty => "The database is not empty.",
+            FlussError::DatabaseAlreadyExist => "The database already exists.",
+            FlussError::TableNotExist => "The table does not exist.",
+            FlussError::TableAlreadyExist => "The table already exists.",
+            FlussError::SchemaNotExist => "The schema does not exist.",
+            FlussError::LogStorageException => {
+                "Exception occur while storage data for log in server."
+            }
+            FlussError::KvStorageException => {
+                "Exception occur while storage data for kv in server."
+            }
+            FlussError::NotLeaderOrFollower => "Not leader or follower.",
+            FlussError::RecordTooLargeException => "The record is too large.",
+            FlussError::CorruptRecordException => "The record is corrupt.",
+            FlussError::InvalidTableException => {
+                "The client has attempted to perform an operation on an 
invalid table."
+            }
+            FlussError::InvalidDatabaseException => {
+                "The client has attempted to perform an operation on an 
invalid database."
+            }
+            FlussError::InvalidReplicationFactor => {
+                "The replication factor is larger then the number of available 
tablet servers."
+            }
+            FlussError::InvalidRequiredAcks => {
+                "Produce request specified an invalid value for required acks."
+            }
+            FlussError::LogOffsetOutOfRangeException => "The log offset is out 
of range.",
+            FlussError::NonPrimaryKeyTableException => "The table is not 
primary key table.",
+            FlussError::UnknownTableOrBucketException => "The table or bucket 
does not exist.",
+            FlussError::InvalidUpdateVersionException => "The update version 
is invalid.",
+            FlussError::InvalidCoordinatorException => "The coordinator is 
invalid.",
+            FlussError::FencedLeaderEpochException => "The leader epoch is 
invalid.",
+            FlussError::RequestTimeOut => "The request time out.",
+            FlussError::StorageException => "The general storage exception.",
+            FlussError::OperationNotAttemptedException => {
+                "The server did not attempt to execute this operation."
+            }
+            FlussError::NotEnoughReplicasAfterAppendException => {
+                "Records are written to the server already, but to fewer 
in-sync replicas than required."
+            }
+            FlussError::NotEnoughReplicasException => {
+                "Messages are rejected since there are fewer in-sync replicas 
than required."
+            }
+            FlussError::SecurityTokenException => "Get file access security 
token exception.",
+            FlussError::OutOfOrderSequenceException => {
+                "The tablet server received an out of order sequence batch."
+            }
+            FlussError::DuplicateSequenceException => {
+                "The tablet server received a duplicate sequence batch."
+            }
+            FlussError::UnknownWriterIdException => {
+                "This exception is raised by the tablet server if it could not 
locate the writer metadata."
+            }
+            FlussError::InvalidColumnProjection => "The requested column 
projection is invalid.",
+            FlussError::InvalidTargetColumn => "The requested target column to 
write is invalid.",
+            FlussError::PartitionNotExists => "The partition does not exist.",
+            FlussError::TableNotPartitionedException => "The table is not 
partitioned.",
+            FlussError::InvalidTimestampException => "The timestamp is 
invalid.",
+            FlussError::InvalidConfigException => "The config is invalid.",
+            FlussError::LakeStorageNotConfiguredException => "The lake storage 
is not configured.",
+            FlussError::KvSnapshotNotExist => "The kv snapshot is not exist.",
+            FlussError::PartitionAlreadyExists => "The partition already 
exists.",
+            FlussError::PartitionSpecInvalidException => "The partition spec 
is invalid.",
+            FlussError::LeaderNotAvailableException => {
+                "There is no currently available leader for the given 
partition."
+            }
+            FlussError::PartitionMaxNumException => "Exceed the maximum number 
of partitions.",
+            FlussError::AuthenticateException => "Authentication failed.",
+            FlussError::SecurityDisabledException => "Security is disabled.",
+            FlussError::AuthorizationException => "Authorization failed",
+            FlussError::BucketMaxNumException => "Exceed the maximum number of 
buckets",
+            FlussError::FencedTieringEpochException => "The tiering epoch is 
invalid.",
+            FlussError::RetriableAuthenticateException => {
+                "Authentication failed with retriable exception. "
+            }
+            FlussError::InvalidServerRackInfoException => "The server rack 
info is invalid.",
+            FlussError::LakeSnapshotNotExist => "The lake snapshot is not 
exist.",

Review Comment:
   Grammatical error: "is not exist" should be "does not exist" to match the 
correct English grammar used in other similar error messages (e.g., lines 189, 
192, 245, 266).
   ```suggestion
               FlussError::LakeSnapshotNotExist => "The lake snapshot does not 
exist.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to