This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f43550  chore: rust client to return error when currently 
unimplemented non-default ZSTD compression is configured (#120)
0f43550 is described below

commit 0f4355076471dae69319920fc8c59b84c948aa7f
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 2 08:35:43 2026 +0000

    chore: rust client to return error when currently unimplemented non-default 
ZSTD compression is configured (#120)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 crates/fluss/src/client/table/scanner.rs          |  7 ++--
 crates/fluss/src/compression/arrow_compression.rs | 41 +++++++++++++++++------
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 11bdfa3..bf39839 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -344,10 +344,7 @@ impl LogFetcher {
                 if let Error::RpcError { source, .. } = &e
                     && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
                 {
-                    warn!(
-                        "Retrying after encountering error while updating 
table metadata: {}",
-                        e
-                    );
+                    warn!("Retrying after encountering error while updating 
table metadata: {e}");
                     Ok(())
                 } else {
                     Err(e)
@@ -395,7 +392,7 @@ impl LogFetcher {
                 let server_node = match cluster.get_tablet_server(leader) {
                     Some(node) => node,
                     None => {
-                        warn!("No server node found for leader {}, retrying", 
leader);
+                        warn!("No server node found for leader {leader}, 
retrying");
                         Self::handle_fetch_failure(metadata, &leader, 
&fetch_request).await;
                         return;
                     }
diff --git a/crates/fluss/src/compression/arrow_compression.rs 
b/crates/fluss/src/compression/arrow_compression.rs
index 32dfadb..8121a51 100644
--- a/crates/fluss/src/compression/arrow_compression.rs
+++ b/crates/fluss/src/compression/arrow_compression.rs
@@ -17,6 +17,7 @@
 
 use crate::error::{Error, Result};
 use arrow::ipc::CompressionType;
+use arrow_schema::ArrowError;
 use std::collections::HashMap;
 
 pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str = 
"table.log.arrow.compression.zstd.level";
@@ -71,21 +72,31 @@ impl ArrowCompressionInfo {
         {
             Some(Ok(level)) if !(1..=22).contains(&level) => 
Err(Error::IllegalArgument {
                 message: format!(
-                    "Invalid ZSTD compression level: {}. Expected a value 
between 1 and 22.",
-                    level
+                    "Invalid ZSTD compression level: {level}. Expected a value 
between 1 and 22."
                 ),
             }),
             Some(Err(e)) => Err(Error::IllegalArgument {
                 message: format!(
-                    "Invalid ZSTD compression level. Expected a value between 
1 and 22. {}",
-                    e
+                    "Invalid ZSTD compression level. Expected a value between 
1 and 22. {e}"
                 ),
             }),
-
-            Some(Ok(level)) => Ok(Self {
-                compression_type,
-                compression_level: level,
-            }),
+            Some(Ok(level)) => {
+                // TODO Remove once non-default ZSTD compression level is 
implemented https://github.com/apache/fluss-rust/issues/109
+                if level != DEFAULT_ZSTD_COMPRESSION_LEVEL {
+                    return Err(Error::ArrowError {
+                        message: format!(
+                            "Rust client currently only implements default 
ZSTD compression level {DEFAULT_ZSTD_COMPRESSION_LEVEL}. Got: {level}."
+                        ),
+                        source: ArrowError::NotYetImplemented(format!(
+                            "zstd compression level {level}."
+                        )),
+                    });
+                }
+                Ok(Self {
+                    compression_type,
+                    compression_level: level,
+                })
+            }
             None => Ok(Self {
                 compression_type,
                 compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL,
@@ -171,11 +182,19 @@ mod tests {
             "ZSTD",
         )]));
         assert_eq!(compression_info.unwrap().compression_level, 3);
-        let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[
+    }
+
+    // TODO Remove once non-default ZSTD compression level is implemented 
https://github.com/apache/fluss-rust/issues/109
+    #[test]
+    fn test_from_conf_zstd_compression_level_error_when_non_default() {
+        let result = ArrowCompressionInfo::from_conf(&mk_map(&[
             ("table.log.arrow.compression.type", "ZSTD"),
             ("table.log.arrow.compression.zstd.level", "1"),
         ]));
-        assert_eq!(compression_info.unwrap().compression_level, 1);
+        assert!(result.is_err());
+        assert!(result.unwrap_err().to_string().contains(
+            "Rust client currently only implements default ZSTD compression 
level 3. Got: 1."
+        ));
     }
 
     #[test]

Reply via email to