This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c1dfcee chore: Introduce PartitionGetter and Partition utils (#214)
c1dfcee is described below
commit c1dfceec8a388f8aa9677a43cfb80963a74b35ee
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 27 13:17:52 2026 +0000
chore: Introduce PartitionGetter and Partition utils (#214)
---
crates/fluss/src/client/table/partition_getter.rs | 177 ++++++-
crates/fluss/src/error.rs | 6 +
crates/fluss/src/metadata/partition.rs | 62 +--
crates/fluss/src/metadata/table.rs | 110 +++++
crates/fluss/src/row/datum.rs | 10 +-
crates/fluss/src/util/mod.rs | 1 +
crates/fluss/src/util/partition.rs | 532 ++++++++++++++++++++++
7 files changed, 847 insertions(+), 51 deletions(-)
diff --git a/crates/fluss/src/client/table/partition_getter.rs
b/crates/fluss/src/client/table/partition_getter.rs
index 4529d86..887c0a4 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -17,40 +17,191 @@
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::{DataType, RowType};
+use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
+use crate::row::InternalRow;
use crate::row::field_getter::FieldGetter;
+use crate::util::partition;
+use std::sync::Arc;
+/// A getter to get partition name from a row.
#[allow(dead_code)]
-pub struct PartitionGetter<'a> {
- partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
+pub struct PartitionGetter {
+ partition_keys: Arc<[String]>,
+ partitions: Vec<(DataType, FieldGetter)>,
}
#[allow(dead_code)]
-impl<'a> PartitionGetter<'a> {
- pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) ->
Result<Self> {
+impl PartitionGetter {
+ pub fn new(row_type: &RowType, partition_keys: Arc<[String]>) ->
Result<Self> {
let mut partitions = Vec::with_capacity(partition_keys.len());
- for partition_key in partition_keys {
+ for partition_key in partition_keys.iter() {
if let Some(partition_col_index) =
row_type.get_field_index(partition_key.as_str()) {
- let data_type = &row_type
+ let data_type = row_type
.fields()
.get(partition_col_index)
.unwrap()
- .data_type;
- let field_getter = FieldGetter::create(data_type,
partition_col_index);
+ .data_type
+ .clone();
+ let field_getter = FieldGetter::create(&data_type,
partition_col_index);
- partitions.push((partition_key, data_type, field_getter));
+ partitions.push((data_type, field_getter));
} else {
return Err(IllegalArgument {
message: format!(
- "The partition column {partition_key} is not in the
row {row_type}."
+ "The partition column {} is not in the row {}.",
+ partition_key, row_type
),
});
};
}
- Ok(Self { partitions })
+ Ok(Self {
+ partition_keys,
+ partitions,
+ })
}
- // TODO Implement get partition
+ pub fn get_partition(&self, row: &dyn InternalRow) -> Result<String> {
+ self.get_partition_spec(row)
+ .map(|ps| ps.get_partition_name())
+ }
+
+ pub fn get_partition_spec(&self, row: &dyn InternalRow) ->
Result<ResolvedPartitionSpec> {
+ let mut partition_values = Vec::with_capacity(self.partitions.len());
+
+ for (data_type, field_getter) in &self.partitions {
+ let value = field_getter.get_field(row);
+ if value.is_null() {
+ return Err(IllegalArgument {
+ message: "Partition value shouldn't be null.".to_string(),
+ });
+ }
+ partition_values.push(partition::convert_value_of_type(&value,
data_type)?);
+ }
+
+ ResolvedPartitionSpec::new(Arc::clone(&self.partition_keys),
partition_values)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{DataField, IntType, StringType};
+ use crate::row::{Datum, GenericRow};
+
+ #[test]
+ fn test_partition_getter_single_key() {
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
+ DataField::new(
+ "region".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ ]);
+
+ let getter = PartitionGetter::new(&row_type,
Arc::from(["region".to_string()]))
+ .expect("should succeed");
+
+ let row = GenericRow::from_data(vec![Datum::Int32(42),
Datum::from("US")]);
+ let partition_name = getter.get_partition(&row).expect("should
succeed");
+ assert_eq!(partition_name, "US");
+ }
+
+ #[test]
+ fn test_partition_getter_multiple_keys() {
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
+ DataField::new(
+ "date".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ DataField::new(
+ "region".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ ]);
+
+ let getter = PartitionGetter::new(
+ &row_type,
+ Arc::from(["date".to_string(), "region".to_string()]),
+ )
+ .expect("should succeed");
+
+ let row = GenericRow::from_data(vec![
+ Datum::Int32(42),
+ Datum::from("2024-01-15"),
+ Datum::from("US"),
+ ]);
+ let partition_name = getter.get_partition(&row).expect("should
succeed");
+ assert_eq!(partition_name, "2024-01-15$US");
+ }
+
+ #[test]
+ fn test_partition_getter_invalid_column() {
+ let row_type = RowType::new(vec![DataField::new(
+ "id".to_string(),
+ DataType::Int(IntType::new()),
+ None,
+ )]);
+
+ let result = PartitionGetter::new(&row_type,
Arc::from(["nonexistent".to_string()]));
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_partition_getter_null_value() {
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
+ DataField::new(
+ "region".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ ]);
+
+ let getter = PartitionGetter::new(&row_type,
Arc::from(["region".to_string()]))
+ .expect("should succeed");
+
+ let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]);
+ let result = getter.get_partition(&row);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_get_partition_spec() {
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataType::Int(IntType::new()),
None),
+ DataField::new(
+ "date".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ DataField::new(
+ "region".to_string(),
+ DataType::String(StringType::new()),
+ None,
+ ),
+ ]);
+
+ let getter = PartitionGetter::new(
+ &row_type,
+ Arc::from(["date".to_string(), "region".to_string()]),
+ )
+ .expect("should succeed");
+
+ let row = GenericRow::from_data(vec![
+ Datum::Int32(42),
+ Datum::from("2024-01-15"),
+ Datum::from("US"),
+ ]);
+ let spec = getter.get_partition_spec(&row).expect("should succeed");
+
+ assert_eq!(spec.get_partition_keys(), &["date", "region"]);
+ assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+ assert_eq!(spec.get_partition_name(), "2024-01-15$US");
+ }
}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 368d8ab..68426d7 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -93,6 +93,12 @@ pub enum Error {
)]
IllegalArgument { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting invalid partition error {}.", message)
+ )]
+ InvalidPartition { message: String },
+
#[snafu(
visibility(pub(crate)),
display("Fluss hitting IO not supported error {}.", message)
diff --git a/crates/fluss/src/metadata/partition.rs
b/crates/fluss/src/metadata/partition.rs
index e40fbf9..bc1935c 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use crate::PartitionId;
use crate::error::{Error, Result};
use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
+use crate::{PartitionId, TableId};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
+use std::sync::Arc;
/// Represents a partition spec in fluss. Partition columns and values are NOT
of strict order, and
/// they need to be re-arranged to the correct order by comparing with a list
of strictly ordered
@@ -72,20 +73,21 @@ impl Display for PartitionSpec {
/// partition keys.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ResolvedPartitionSpec {
- partition_keys: Vec<String>,
+ partition_keys: Arc<[String]>,
partition_values: Vec<String>,
}
pub const PARTITION_SPEC_SEPARATOR: &str = "$";
impl ResolvedPartitionSpec {
- pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) ->
Result<Self> {
+ pub fn new(partition_keys: Arc<[String]>, partition_values: Vec<String>)
-> Result<Self> {
if partition_keys.len() != partition_values.len() {
return Err(Error::IllegalArgument {
message: "The number of partition keys and partition values
should be the same."
.to_string(),
});
}
+
Ok(Self {
partition_keys,
partition_values,
@@ -93,7 +95,7 @@ impl ResolvedPartitionSpec {
}
pub fn from_partition_spec(
- partition_keys: Vec<String>,
+ partition_keys: Arc<[String]>,
partition_spec: &PartitionSpec,
) -> Self {
let partition_values =
@@ -104,14 +106,7 @@ impl ResolvedPartitionSpec {
}
}
- pub fn from_partition_value(partition_key: String, partition_value:
String) -> Self {
- Self {
- partition_keys: vec![partition_key],
- partition_values: vec![partition_value],
- }
- }
-
- pub fn from_partition_name(partition_keys: Vec<String>, partition_name:
&str) -> Self {
+ pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name:
&str) -> Self {
let partition_values: Vec<String> = partition_name
.split(PARTITION_SPEC_SEPARATOR)
.map(|s| s.to_string())
@@ -140,7 +135,7 @@ impl ResolvedPartitionSpec {
}
Ok(Self {
- partition_keys: keys,
+ partition_keys: Arc::from(keys),
partition_values: values,
})
}
@@ -236,6 +231,7 @@ impl ResolvedPartitionSpec {
.iter()
.map(|kv| kv.value.clone())
.collect();
+
Self {
partition_keys,
partition_values,
@@ -243,7 +239,7 @@ impl ResolvedPartitionSpec {
}
fn get_reordered_partition_values(
- partition_keys: &[String],
+ partition_keys: &Arc<[String]>,
partition_spec: &PartitionSpec,
) -> Vec<String> {
let partition_spec_map = partition_spec.get_spec_map();
@@ -310,7 +306,7 @@ impl PartitionInfo {
}
impl Display for PartitionInfo {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"Partition{{name='{}', id={}}}",
@@ -323,12 +319,12 @@ impl Display for PartitionInfo {
/// A class to identify a table partition, containing the table id and the
partition id.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TablePartition {
- table_id: i64,
+ table_id: TableId,
partition_id: PartitionId,
}
impl TablePartition {
- pub fn new(table_id: i64, partition_id: PartitionId) -> Self {
+ pub fn new(table_id: TableId, partition_id: PartitionId) -> Self {
Self {
table_id,
partition_id,
@@ -361,7 +357,7 @@ mod tests {
#[test]
fn test_resolved_partition_spec_name() {
let spec = ResolvedPartitionSpec::new(
- vec!["date".to_string(), "region".to_string()],
+ Arc::from(["date".to_string(), "region".to_string()]),
vec!["2024-01-15".to_string(), "US".to_string()],
)
.unwrap();
@@ -376,7 +372,7 @@ mod tests {
#[test]
fn test_resolved_partition_spec_from_partition_name() {
let spec = ResolvedPartitionSpec::from_partition_name(
- vec!["date".to_string(), "region".to_string()],
+ Arc::from(["date".to_string(), "region".to_string()]),
"2024-01-15$US",
);
@@ -396,7 +392,7 @@ mod tests {
#[test]
fn test_resolved_partition_spec_mismatched_lengths() {
let result = ResolvedPartitionSpec::new(
- vec!["date".to_string(), "region".to_string()],
+ Arc::from(["date".to_string(), "region".to_string()]),
vec!["2024-01-15".to_string()],
);
@@ -405,9 +401,11 @@ mod tests {
#[test]
fn test_partition_info() {
- let spec =
- ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
- .unwrap();
+ let spec = ResolvedPartitionSpec::new(
+ Arc::from(["date".to_string()]),
+ vec!["2024-01-15".to_string()],
+ )
+ .unwrap();
let info = PartitionInfo::new(42, spec);
assert_eq!(info.get_partition_id(), 42);
@@ -438,9 +436,11 @@ mod tests {
#[test]
fn test_partition_info_pb_roundtrip() {
- let spec =
- ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
- .unwrap();
+ let spec = ResolvedPartitionSpec::new(
+ Arc::from(["date".to_string()]),
+ vec!["2024-01-15".to_string()],
+ )
+ .unwrap();
let info = PartitionInfo::new(42, spec);
let pb = info.to_pb();
@@ -453,14 +453,16 @@ mod tests {
#[test]
fn test_contains() {
let full_spec = ResolvedPartitionSpec::new(
- vec!["date".to_string(), "region".to_string()],
+ Arc::from(["date".to_string(), "region".to_string()]),
vec!["2024-01-15".to_string(), "US".to_string()],
)
.unwrap();
- let partial_spec =
- ResolvedPartitionSpec::new(vec!["date".to_string()],
vec!["2024-01-15".to_string()])
- .unwrap();
+ let partial_spec = ResolvedPartitionSpec::new(
+ Arc::from(["date".to_string()]),
+ vec!["2024-01-15".to_string()],
+ )
+ .unwrap();
assert!(full_spec.contains(&partial_spec).unwrap());
}
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 3b9da7d..0c0cdf5 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -678,6 +678,10 @@ impl Display for TablePath {
}
}
+const MAX_NAME_LENGTH: usize = 200;
+
+const INTERNAL_NAME_PREFIX: &str = "__";
+
impl TablePath {
pub fn new(db: String, tbl: String) -> Self {
TablePath {
@@ -695,6 +699,52 @@ impl TablePath {
pub fn table(&self) -> &str {
&self.table
}
+
+ pub fn detect_invalid_name(identifier: &str) -> Option<String> {
+ if identifier.is_empty() {
+ return Some("the empty string is not allowed".to_string());
+ }
+ if identifier == "." {
+ return Some("'.' is not allowed".to_string());
+ }
+ if identifier == ".." {
+ return Some("'..' is not allowed".to_string());
+ }
+ if identifier.len() > MAX_NAME_LENGTH {
+ return Some(format!(
+ "the length of '{}' is longer than the max allowed length {}",
+ identifier, MAX_NAME_LENGTH
+ ));
+ }
+ if Self::contains_invalid_pattern(identifier) {
+ return Some(format!(
+ "'{}' contains one or more characters other than ASCII
alphanumerics, '_' and '-'",
+ identifier
+ ));
+ }
+ None
+ }
+
+ pub fn validate_prefix(identifier: &str) -> Option<String> {
+ if identifier.starts_with(INTERNAL_NAME_PREFIX) {
+ return Some(format!(
+ "'{}' is not allowed as prefix, since it is reserved for
internal databases/internal tables/internal partitions in Fluss server",
+ INTERNAL_NAME_PREFIX
+ ));
+ }
+ None
+ }
+
+ // Valid characters for Fluss table names are the ASCII alphanumerics, '_'
and '-'.
+ fn contains_invalid_pattern(identifier: &str) -> bool {
+ for c in identifier.chars() {
+ let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-';
+ if !valid_char {
+ return true;
+ }
+ }
+ false
+ }
}
/// A database name, table name and partition name combo. It's used to
represent the physical path of
@@ -1106,3 +1156,63 @@ impl LakeSnapshot {
&self.table_buckets_offset
}
}
+
+/// Tests for [`TablePath`].
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_validate() {
+ // assert valid name
+ let path = TablePath::new("db_2-abc3".to_string(),
"table-1_abc_2".to_string());
+ assert!(TablePath::detect_invalid_name(path.database()).is_none());
+ assert!(TablePath::detect_invalid_name(path.table()).is_none());
+ assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2");
+
+ // assert invalid name prefix
+ assert!(
+ TablePath::validate_prefix("__table-1")
+ .unwrap()
+ .contains("'__' is not allowed as prefix")
+ );
+
+ // check max length
+ let long_name = "a".repeat(200);
+ assert!(TablePath::detect_invalid_name(&long_name).is_none());
+
+ // assert invalid names
+ assert_invalid_name("*abc", "'*abc' contains one or more characters
other than");
+ assert_invalid_name(
+ "table.abc",
+ "'table.abc' contains one or more characters other than",
+ );
+ assert_invalid_name("", "the empty string is not allowed");
+ assert_invalid_name(" ", "' ' contains one or more characters other
than");
+ assert_invalid_name(".", "'.' is not allowed");
+ assert_invalid_name("..", "'..' is not allowed");
+ let invalid_long_name = "a".repeat(201);
+ assert_invalid_name(
+ &invalid_long_name,
+ &format!(
+ "the length of '{}' is longer than the max allowed length {}",
+ invalid_long_name, MAX_NAME_LENGTH
+ ),
+ );
+ }
+
+ fn assert_invalid_name(name: &str, expected_message: &str) {
+ let result = TablePath::detect_invalid_name(name);
+ assert!(
+ result.is_some(),
+ "Expected '{}' to be invalid, but it was valid",
+ name
+ );
+ assert!(
+ result.as_ref().unwrap().contains(expected_message),
+ "Expected message containing '{}', but got '{}'",
+ expected_message,
+ result.unwrap()
+ );
+ }
+}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index b808373..e1b70ad 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -789,10 +789,7 @@ impl TimestampNtz {
}
}
- pub fn from_millis_nanos(
- millisecond: i64,
- nano_of_millisecond: i32,
- ) -> crate::error::Result<Self> {
+ pub fn from_millis_nanos(millisecond: i64, nano_of_millisecond: i32) ->
Result<Self> {
if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
return Err(crate::error::Error::IllegalArgument {
message: format!(
@@ -836,10 +833,7 @@ impl TimestampLtz {
}
}
- pub fn from_millis_nanos(
- epoch_millisecond: i64,
- nano_of_millisecond: i32,
- ) -> crate::error::Result<Self> {
+ pub fn from_millis_nanos(epoch_millisecond: i64, nano_of_millisecond: i32)
-> Result<Self> {
if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
return Err(crate::error::Error::IllegalArgument {
message: format!(
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 3760487..b987fe2 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -16,6 +16,7 @@
// under the License.
pub mod murmur_hash;
+pub mod partition;
pub mod varint;
use crate::TableId;
diff --git a/crates/fluss/src/util/partition.rs
b/crates/fluss/src/util/partition.rs
new file mode 100644
index 0000000..036cac4
--- /dev/null
+++ b/crates/fluss/src/util/partition.rs
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Utils for partition.
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz};
+use jiff::ToSpan;
+use std::fmt::Write;
+
+fn hex_string(bytes: &[u8]) -> String {
+ let mut hex = String::with_capacity(bytes.len() * 2);
+ for &b in bytes {
+ write!(hex, "{:02x}", b).unwrap();
+ }
+ hex
+}
+
+fn reformat_float(value: f32) -> String {
+ if value.is_nan() {
+ "NaN".to_string()
+ } else if value.is_infinite() {
+ if value > 0.0 {
+ "Inf".to_string()
+ } else {
+ "-Inf".to_string()
+ }
+ } else {
+ value.to_string().replace('.', "_")
+ }
+}
+
+fn reformat_double(value: f64) -> String {
+ if value.is_nan() {
+ "NaN".to_string()
+ } else if value.is_infinite() {
+ if value > 0.0 {
+ "Inf".to_string()
+ } else {
+ "-Inf".to_string()
+ }
+ } else {
+ value.to_string().replace('.', "_")
+ }
+}
+
+const UNIX_EPOCH_DATE: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
+
+fn day_to_string(days: i32) -> String {
+ let date = UNIX_EPOCH_DATE + days.days();
+ format!("{:04}-{:02}-{:02}", date.year(), date.month(), date.day())
+}
+
+fn date_to_string(date: Date) -> String {
+ day_to_string(date.get_inner())
+}
+
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND;
+const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE;
+
+fn milli_to_string(milli: i32) -> String {
+ let hour = milli.div_euclid(MILLIS_PER_HOUR as i32);
+ let min = milli
+ .rem_euclid(MILLIS_PER_HOUR as i32)
+ .div_euclid(MILLIS_PER_MINUTE as i32);
+ let sec = milli
+ .rem_euclid(MILLIS_PER_MINUTE as i32)
+ .div_euclid(MILLIS_PER_SECOND as i32);
+ let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32);
+
+ format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms)
+}
+
+fn time_to_string(time: Time) -> String {
+ milli_to_string(time.get_inner())
+}
+
+trait Timestamp {
+ fn get_milli(&self) -> i64;
+ fn get_nano_of_milli(&self) -> i32;
+}
+
+impl Timestamp for TimestampNtz {
+ fn get_milli(&self) -> i64 {
+ self.get_millisecond()
+ }
+
+ fn get_nano_of_milli(&self) -> i32 {
+ self.get_nano_of_millisecond()
+ }
+}
+
+impl Timestamp for TimestampLtz {
+ fn get_milli(&self) -> i64 {
+ self.get_epoch_millisecond()
+ }
+
+ fn get_nano_of_milli(&self) -> i32 {
+ self.get_nano_of_millisecond()
+ }
+}
+
+/// This formats date time while adhering to java side behaviour
+///
+fn timestamp_to_string<T: Timestamp>(ts: T) -> String {
+ let millis = ts.get_milli();
+ let nanos = ts.get_nano_of_milli();
+
+ let millis_of_second = millis.rem_euclid(MILLIS_PER_SECOND);
+ let total_secs = millis.div_euclid(MILLIS_PER_SECOND);
+
+ let epoch = jiff::Timestamp::UNIX_EPOCH;
+ let ts_jiff = epoch + jiff::Span::new().seconds(total_secs);
+ let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+ if nanos > 0 {
+ format!(
+ "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}{:06}",
+ dt.year(),
+ dt.month(),
+ dt.day(),
+ dt.hour(),
+ dt.minute(),
+ dt.second(),
+ millis_of_second,
+ nanos
+ )
+ } else if millis_of_second > 0 {
+ format!(
+ "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}",
+ dt.year(),
+ dt.month(),
+ dt.day(),
+ dt.hour(),
+ dt.minute(),
+ dt.second(),
+ millis_of_second
+ )
+ } else {
+ format!(
+ "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_",
+ dt.year(),
+ dt.month(),
+ dt.day(),
+ dt.hour(),
+ dt.minute(),
+ dt.second(),
+ )
+ }
+}
+
+/// Converts a Datum value to its string representation for partition naming.
+pub fn convert_value_of_type(value: &Datum, data_type: &DataType) ->
Result<String> {
+ match (value, data_type) {
+ (Datum::String(s), DataType::Char(_) | DataType::String(_)) =>
Ok(s.to_string()),
+ (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()),
+ (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) =>
Ok(hex_string(bytes)),
+ (Datum::Int8(v), DataType::TinyInt(_)) => Ok(v.to_string()),
+ (Datum::Int16(v), DataType::SmallInt(_)) => Ok(v.to_string()),
+ (Datum::Int32(v), DataType::Int(_)) => Ok(v.to_string()),
+ (Datum::Int64(v), DataType::BigInt(_)) => Ok(v.to_string()),
+ (Datum::Date(d), DataType::Date(_)) => Ok(date_to_string(*d)),
+ (Datum::Time(t), DataType::Time(_)) => Ok(time_to_string(*t)),
+ (Datum::Float32(f), DataType::Float(_)) =>
Ok(reformat_float(f.into_inner())),
+ (Datum::Float64(f), DataType::Double(_)) =>
Ok(reformat_double(f.into_inner())),
+ (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) =>
Ok(timestamp_to_string(*ts)),
+ (Datum::TimestampNtz(ts), DataType::Timestamp(_)) =>
Ok(timestamp_to_string(*ts)),
+ _ => Err(IllegalArgument {
+ message: format!(
+ "Unsupported conversion to partition key from data type:
{data_type:?}, value: {value:?}"
+ ),
+ }),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{
+ BigIntType, BinaryType, BooleanType, BytesType, CharType, DateType,
DoubleType, FloatType,
+ IntType, SmallIntType, StringType, TimeType, TimestampLTzType,
TimestampType, TinyIntType,
+ };
+ use crate::row::{Date, Time, TimestampLtz, TimestampNtz};
+ use std::borrow::Cow;
+
+ use crate::metadata::TablePath;
+
+ #[test]
+ fn test_string() {
+ let datum = Datum::String(Cow::Borrowed("Fluss"));
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::String(StringType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "Fluss");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_char() {
+ let datum = Datum::String(Cow::Borrowed("F"));
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Char(CharType::new(1)))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "F");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_boolean() {
+ let datum = Datum::Bool(true);
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Boolean(BooleanType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "true");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_byte() {
+ let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50,
0xFF]));
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Bytes(BytesType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "1020304050ff");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_binary() {
+ let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50,
0xFF]));
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Binary(BinaryType::new(6)))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "1020304050ff");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_tiny_int() {
+ let datum = Datum::Int8(100);
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::TinyInt(TinyIntType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "100");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_small_int() {
+ let datum = Datum::Int16(-32760);
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::SmallInt(SmallIntType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "-32760");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_int() {
+ let datum = Datum::Int32(299000);
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Int(IntType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "299000");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_big_int() {
+ let datum = Datum::Int64(1748662955428);
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::BigInt(BigIntType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "1748662955428");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_date() {
+ let datum = Datum::Date(Date::new(20235));
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Date(DateType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-27");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_time() {
+ let datum = Datum::Time(Time::new(5402199));
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Time(TimeType::new(3).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "01-30-02_199");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_float() {
+ let datum = Datum::Float32(5.73.into());
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Float(FloatType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "5_73");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ let datum = Datum::Float32(f32::NAN.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+ .expect("datum conversion to partition string failed"),
+ "NaN"
+ );
+
+ let datum = Datum::Float32(f32::INFINITY.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+ .expect("datum conversion to partition string failed"),
+ "Inf"
+ );
+
+ let datum = Datum::Float32(f32::NEG_INFINITY.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+ .expect("datum conversion to partition string failed"),
+ "-Inf"
+ );
+ }
+
+ #[test]
+ fn test_double() {
+ let datum = Datum::Float64(5.73737.into());
+
+ let to_string_result = convert_value_of_type(&datum,
&DataType::Double(DoubleType::new()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "5_73737");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ let datum = Datum::Float64(f64::NAN.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+ .expect("datum conversion to partition string failed"),
+ "NaN"
+ );
+
+ let datum = Datum::Float64(f64::INFINITY.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+ .expect("datum conversion to partition string failed"),
+ "Inf"
+ );
+
+ let datum = Datum::Float64(f64::NEG_INFINITY.into());
+ assert_eq!(
+ convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+ .expect("datum conversion to partition string failed"),
+ "-Inf"
+ );
+ }
+
+ #[test]
+ fn test_timestamp_ntz() {
+ let datum = Datum::TimestampNtz(
+ TimestampNtz::from_millis_nanos(1748662955428, 99988)
+ .expect("TimestampNtz init failed"),
+ );
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero nanos of millis
+ let datum = Datum::TimestampNtz(
+ TimestampNtz::from_millis_nanos(1748662955428,
0).expect("TimestampNtz init failed"),
+ );
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_428");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero millis
+ let datum = Datum::TimestampNtz(
+ TimestampNtz::from_millis_nanos(1748662955000, 99988)
+ .expect("TimestampNtz init failed"),
+ );
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero millis and zero nanos
+ let datum = Datum::TimestampNtz(
+ TimestampNtz::from_millis_nanos(1748662955000,
0).expect("TimestampNtz init failed"),
+ );
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Negative millis
+ let datum = Datum::TimestampNtz(
+ TimestampNtz::from_millis_nanos(-1748662955428, 99988)
+ .expect("TimestampNtz init failed"),
+ );
+
+ let to_string_result =
+ convert_value_of_type(&datum,
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+
+ #[test]
+ fn test_timestamp_ltz() {
+ let datum = Datum::TimestampLtz(
+ TimestampLtz::from_millis_nanos(1748662955428, 99988)
+ .expect("TimestampLtz init failed"),
+ );
+
+ let to_string_result = convert_value_of_type(
+ &datum,
+ &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+ )
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero nanos of millis
+ let datum = Datum::TimestampLtz(
+ TimestampLtz::from_millis_nanos(1748662955428,
0).expect("TimestampLtz init failed"),
+ );
+
+ let to_string_result = convert_value_of_type(
+ &datum,
+ &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+ )
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_428");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero millis
+ let datum = Datum::TimestampLtz(
+ TimestampLtz::from_millis_nanos(1748662955000, 99988)
+ .expect("TimestampLtz init failed"),
+ );
+
+ let to_string_result = convert_value_of_type(
+ &datum,
+ &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+ )
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Zero millis and zero nanos
+ let datum = Datum::TimestampLtz(
+ TimestampLtz::from_millis_nanos(1748662955000,
0).expect("TimestampLtz init failed"),
+ );
+
+ let to_string_result = convert_value_of_type(
+ &datum,
+ &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+ )
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "2025-05-31-03-42-35_");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+
+ // Negative millis
+ let datum = Datum::TimestampLtz(
+ TimestampLtz::from_millis_nanos(-1748662955428, 99988)
+ .expect("TimestampLtz init failed"),
+ );
+
+ let to_string_result = convert_value_of_type(
+ &datum,
+ &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+ )
+ .expect("datum conversion to partition string failed");
+ assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988");
+ let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+ assert!(detect_invalid.is_none());
+ }
+}