This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0f43550 chore: rust client to return error when currently
unimplemented non-default ZSTD compression is configured (#120)
0f43550 is described below
commit 0f4355076471dae69319920fc8c59b84c948aa7f
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 2 08:35:43 2026 +0000
chore: rust client to return error when currently unimplemented non-default
ZSTD compression is configured (#120)
---------
Co-authored-by: luoyuxia <[email protected]>
---
crates/fluss/src/client/table/scanner.rs | 7 ++--
crates/fluss/src/compression/arrow_compression.rs | 41 +++++++++++++++++------
2 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 11bdfa3..bf39839 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -344,10 +344,7 @@ impl LogFetcher {
if let Error::RpcError { source, .. } = &e
&& matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
{
- warn!(
- "Retrying after encountering error while updating
table metadata: {}",
- e
- );
+ warn!("Retrying after encountering error while updating
table metadata: {e}");
Ok(())
} else {
Err(e)
@@ -395,7 +392,7 @@ impl LogFetcher {
let server_node = match cluster.get_tablet_server(leader) {
Some(node) => node,
None => {
- warn!("No server node found for leader {}, retrying",
leader);
+ warn!("No server node found for leader {leader},
retrying");
Self::handle_fetch_failure(metadata, &leader,
&fetch_request).await;
return;
}
diff --git a/crates/fluss/src/compression/arrow_compression.rs
b/crates/fluss/src/compression/arrow_compression.rs
index 32dfadb..8121a51 100644
--- a/crates/fluss/src/compression/arrow_compression.rs
+++ b/crates/fluss/src/compression/arrow_compression.rs
@@ -17,6 +17,7 @@
use crate::error::{Error, Result};
use arrow::ipc::CompressionType;
+use arrow_schema::ArrowError;
use std::collections::HashMap;
pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str =
"table.log.arrow.compression.zstd.level";
@@ -71,21 +72,31 @@ impl ArrowCompressionInfo {
{
Some(Ok(level)) if !(1..=22).contains(&level) =>
Err(Error::IllegalArgument {
message: format!(
- "Invalid ZSTD compression level: {}. Expected a value
between 1 and 22.",
- level
+ "Invalid ZSTD compression level: {level}. Expected a value
between 1 and 22."
),
}),
Some(Err(e)) => Err(Error::IllegalArgument {
message: format!(
- "Invalid ZSTD compression level. Expected a value between
1 and 22. {}",
- e
+ "Invalid ZSTD compression level. Expected a value between
1 and 22. {e}"
),
}),
-
- Some(Ok(level)) => Ok(Self {
- compression_type,
- compression_level: level,
- }),
+ Some(Ok(level)) => {
+ // TODO Remove once non-default ZSTD compression level is
implemented https://github.com/apache/fluss-rust/issues/109
+ if level != DEFAULT_ZSTD_COMPRESSION_LEVEL {
+ return Err(Error::ArrowError {
+ message: format!(
+ "Rust client currently only implements default
ZSTD compression level {DEFAULT_ZSTD_COMPRESSION_LEVEL}. Got: {level}."
+ ),
+ source: ArrowError::NotYetImplemented(format!(
+ "zstd compression level {level}."
+ )),
+ });
+ }
+ Ok(Self {
+ compression_type,
+ compression_level: level,
+ })
+ }
None => Ok(Self {
compression_type,
compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL,
@@ -171,11 +182,19 @@ mod tests {
"ZSTD",
)]));
assert_eq!(compression_info.unwrap().compression_level, 3);
- let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[
+ }
+
+ // TODO Remove once non-default ZSTD compression level is implemented
https://github.com/apache/fluss-rust/issues/109
+ #[test]
+ fn test_from_conf_zstd_compression_level_error_when_non_default() {
+ let result = ArrowCompressionInfo::from_conf(&mk_map(&[
("table.log.arrow.compression.type", "ZSTD"),
("table.log.arrow.compression.zstd.level", "1"),
]));
- assert_eq!(compression_info.unwrap().compression_level, 1);
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains(
+ "Rust client currently only implements default ZSTD compression
level 3. Got: 1."
+ ));
}
#[test]