This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 916f64c [chore] Implement is_auto_partitioned method in table.rs (#48)
916f64c is described below
commit 916f64cefa465bccf634125048baa3ecfbb5f7b9
Author: Junbo Wang <[email protected]>
AuthorDate: Wed Jan 28 13:28:27 2026 +0800
[chore] Implement is_auto_partitioned method in table.rs (#48)
---
crates/fluss/src/client/table/partition_getter.rs | 3 +-
crates/fluss/src/metadata/table.rs | 188 ++++++++++++++++++++--
crates/fluss/src/util/partition.rs | 4 +-
3 files changed, 180 insertions(+), 15 deletions(-)
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
index 887c0a4..1a76106 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -49,8 +49,7 @@ impl PartitionGetter {
} else {
return Err(IllegalArgument {
message: format!(
- "The partition column {} is not in the row {}.",
- partition_key, row_type
+ "The partition column {partition_key} is not in the
row {row_type}."
),
});
};
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 0c0cdf5..7b93aca 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -712,14 +712,12 @@ impl TablePath {
}
if identifier.len() > MAX_NAME_LENGTH {
return Some(format!(
- "the length of '{}' is longer than the max allowed length {}",
- identifier, MAX_NAME_LENGTH
+ "the length of '{identifier}' is longer than the max allowed
length {MAX_NAME_LENGTH}"
));
}
if Self::contains_invalid_pattern(identifier) {
return Some(format!(
- "'{}' contains one or more characters other than ASCII
alphanumerics, '_' and '-'",
- identifier
+ "'{identifier}' contains one or more characters other than
ASCII alphanumerics, '_' and '-'"
));
}
None
@@ -728,8 +726,7 @@ impl TablePath {
pub fn validate_prefix(identifier: &str) -> Option<String> {
if identifier.starts_with(INTERNAL_NAME_PREFIX) {
return Some(format!(
- "'{}' is not allowed as prefix, since it is reserved for
internal databases/internal tables/internal partitions in Fluss server",
- INTERNAL_NAME_PREFIX
+ "'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it
is reserved for internal databases/internal tables/internal partitions in Fluss
server"
));
}
None
@@ -834,6 +831,75 @@ impl TableInfo {
}
}
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct AutoPartitionStrategy {
+ auto_partition_enabled: bool,
+ auto_partition_key: Option<String>,
+ auto_partition_time_unit: String,
+ auto_partition_num_precreate: i32,
+ auto_partition_num_retention: i32,
+ auto_partition_timezone: String,
+}
+
+impl AutoPartitionStrategy {
+ pub fn from(properties: &HashMap<String, String>) -> Self {
+ Self {
+ auto_partition_enabled: properties
+ .get("table.auto-partition.enabled")
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(false),
+ auto_partition_key: properties
+ .get("table.auto-partition.key")
+ .map(|s| s.to_string()),
+ auto_partition_time_unit: properties
+ .get("table.auto-partition.time-unit")
+ .map(|s| s.to_string())
+ .unwrap_or_else(|| "DAY".to_string()),
+ auto_partition_num_precreate: properties
+ .get("table.auto-partition.num-precreate")
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(2),
+ auto_partition_num_retention: properties
+ .get("table.auto-partition.num-retention")
+ .and_then(|s| s.parse().ok())
+ .unwrap_or(7),
+ auto_partition_timezone: properties
+ .get("table.auto-partition.time-zone")
+ .map(|s| s.to_string())
+ .unwrap_or_else(|| {
+ jiff::tz::TimeZone::system()
+ .iana_name()
+ .unwrap_or("UTC")
+ .to_string()
+ }),
+ }
+ }
+
+ pub fn is_auto_partition_enabled(&self) -> bool {
+ self.auto_partition_enabled
+ }
+
+ pub fn key(&self) -> Option<&str> {
+ self.auto_partition_key.as_deref()
+ }
+
+ pub fn time_unit(&self) -> &str {
+ &self.auto_partition_time_unit
+ }
+
+ pub fn num_precreate(&self) -> i32 {
+ self.auto_partition_num_precreate
+ }
+
+ pub fn num_retention(&self) -> i32 {
+ self.auto_partition_num_retention
+ }
+
+ pub fn timezone(&self) -> &str {
+ &self.auto_partition_timezone
+ }
+}
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableConfig {
pub properties: HashMap<String, String>,
@@ -866,6 +932,10 @@ impl TableConfig {
.unwrap_or(DEFAULT_KV_FORMAT);
kv_format.parse().map_err(Into::into)
}
+
+ pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy {
+ AutoPartitionStrategy::from(&self.properties)
+ }
}
impl TableInfo {
@@ -1003,7 +1073,11 @@ impl TableInfo {
}
pub fn is_auto_partitioned(&self) -> bool {
- self.is_partitioned() && todo!()
+ self.is_partitioned()
+ && self
+ .table_config
+ .get_auto_partition_strategy()
+ .is_auto_partition_enabled()
}
pub fn get_partition_keys(&self) -> &[String] {
@@ -1161,6 +1235,7 @@ impl LakeSnapshot {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::metadata::DataTypes;
#[test]
fn test_validate() {
@@ -1195,8 +1270,7 @@ mod tests {
assert_invalid_name(
&invalid_long_name,
&format!(
- "the length of '{}' is longer than the max allowed length {}",
- invalid_long_name, MAX_NAME_LENGTH
+ "the length of '{invalid_long_name}' is longer than the max
allowed length {MAX_NAME_LENGTH}"
),
);
}
@@ -1205,8 +1279,7 @@ mod tests {
let result = TablePath::detect_invalid_name(name);
assert!(
result.is_some(),
- "Expected '{}' to be invalid, but it was valid",
- name
+ "Expected '{name}' to be invalid, but it was valid"
);
assert!(
result.as_ref().unwrap().contains(expected_message),
@@ -1215,4 +1288,97 @@ mod tests {
result.unwrap()
);
}
+
+ #[test]
+ fn test_is_auto_partitioned() {
+ let schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .primary_key(vec!["id".to_string()])
+ .build()
+ .unwrap();
+
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+
+ // 1. Not partitioned, auto partition disabled
+ let mut properties = HashMap::new();
+ let table_info = TableInfo::new(
+ table_path.clone(),
+ 1,
+ 1,
+ schema.clone(),
+ vec!["id".to_string()],
+ vec![], // No partition keys
+ 1,
+ properties.clone(),
+ HashMap::new(),
+ None,
+ 0,
+ 0,
+ );
+ assert!(!table_info.is_auto_partitioned());
+
+ // 2. Not partitioned, auto partition enabled
+ properties.insert(
+ "table.auto-partition.enabled".to_string(),
+ "true".to_string(),
+ );
+ let table_info = TableInfo::new(
+ table_path.clone(),
+ 1,
+ 1,
+ schema.clone(),
+ vec!["id".to_string()],
+ vec![], // No partition keys
+ 1,
+ properties.clone(),
+ HashMap::new(),
+ None,
+ 0,
+ 0,
+ );
+ assert!(!table_info.is_auto_partitioned());
+
+ // 3. Partitioned, auto partition disabled
+ properties.insert(
+ "table.auto-partition.enabled".to_string(),
+ "false".to_string(),
+ );
+ let table_info = TableInfo::new(
+ table_path.clone(),
+ 1,
+ 1,
+ schema.clone(),
+ vec!["id".to_string()],
+ vec!["name".to_string()], // Partition keys
+ 1,
+ properties.clone(),
+ HashMap::new(),
+ None,
+ 0,
+ 0,
+ );
+ assert!(!table_info.is_auto_partitioned());
+
+ // 4. Partitioned, auto partition enabled
+ properties.insert(
+ "table.auto-partition.enabled".to_string(),
+ "true".to_string(),
+ );
+ let table_info = TableInfo::new(
+ table_path.clone(),
+ 1,
+ 1,
+ schema.clone(),
+ vec!["id".to_string()],
+ vec!["name".to_string()], // Partition keys
+ 1,
+ properties.clone(),
+ HashMap::new(),
+ None,
+ 0,
+ 0,
+ );
+ assert!(table_info.is_auto_partitioned());
+ }
}
diff --git a/crates/fluss/src/util/partition.rs
b/crates/fluss/src/util/partition.rs
index 036cac4..ccc71a6 100644
--- a/crates/fluss/src/util/partition.rs
+++ b/crates/fluss/src/util/partition.rs
@@ -26,7 +26,7 @@ use std::fmt::Write;
fn hex_string(bytes: &[u8]) -> String {
let mut hex = String::with_capacity(bytes.len() * 2);
for &b in bytes {
- write!(hex, "{:02x}", b).unwrap();
+ write!(hex, "{b:02x}").unwrap();
}
hex
}
@@ -84,7 +84,7 @@ fn milli_to_string(milli: i32) -> String {
.div_euclid(MILLIS_PER_SECOND as i32);
let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32);
- format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms)
+ format!("{hour:02}-{min:02}-{sec:02}_{ms:03}")
}
fn time_to_string(time: Time) -> String {