This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c1dfcee  chore: Introduce PartitionGetter and Partition utils (#214)
c1dfcee is described below

commit c1dfceec8a388f8aa9677a43cfb80963a74b35ee
Author: Keith Lee <[email protected]>
AuthorDate: Tue Jan 27 13:17:52 2026 +0000

    chore: Introduce PartitionGetter and Partition utils (#214)
---
 crates/fluss/src/client/table/partition_getter.rs | 177 ++++++-
 crates/fluss/src/error.rs                         |   6 +
 crates/fluss/src/metadata/partition.rs            |  62 +--
 crates/fluss/src/metadata/table.rs                | 110 +++++
 crates/fluss/src/row/datum.rs                     |  10 +-
 crates/fluss/src/util/mod.rs                      |   1 +
 crates/fluss/src/util/partition.rs                | 532 ++++++++++++++++++++++
 7 files changed, 847 insertions(+), 51 deletions(-)

diff --git a/crates/fluss/src/client/table/partition_getter.rs 
b/crates/fluss/src/client/table/partition_getter.rs
index 4529d86..887c0a4 100644
--- a/crates/fluss/src/client/table/partition_getter.rs
+++ b/crates/fluss/src/client/table/partition_getter.rs
@@ -17,40 +17,191 @@
 
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::{DataType, RowType};
+use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
+use crate::row::InternalRow;
 use crate::row::field_getter::FieldGetter;
+use crate::util::partition;
+use std::sync::Arc;
 
+/// A getter to get partition name from a row.
 #[allow(dead_code)]
-pub struct PartitionGetter<'a> {
-    partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
+pub struct PartitionGetter {
+    partition_keys: Arc<[String]>,
+    partitions: Vec<(DataType, FieldGetter)>,
 }
 
 #[allow(dead_code)]
-impl<'a> PartitionGetter<'a> {
-    pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) -> 
Result<Self> {
+impl PartitionGetter {
+    pub fn new(row_type: &RowType, partition_keys: Arc<[String]>) -> 
Result<Self> {
         let mut partitions = Vec::with_capacity(partition_keys.len());
 
-        for partition_key in partition_keys {
+        for partition_key in partition_keys.iter() {
             if let Some(partition_col_index) = 
row_type.get_field_index(partition_key.as_str()) {
-                let data_type = &row_type
+                let data_type = row_type
                     .fields()
                     .get(partition_col_index)
                     .unwrap()
-                    .data_type;
-                let field_getter = FieldGetter::create(data_type, 
partition_col_index);
+                    .data_type
+                    .clone();
+                let field_getter = FieldGetter::create(&data_type, 
partition_col_index);
 
-                partitions.push((partition_key, data_type, field_getter));
+                partitions.push((data_type, field_getter));
             } else {
                 return Err(IllegalArgument {
                     message: format!(
-                        "The partition column {partition_key} is not in the 
row {row_type}."
+                        "The partition column {} is not in the row {}.",
+                        partition_key, row_type
                     ),
                 });
             };
         }
 
-        Ok(Self { partitions })
+        Ok(Self {
+            partition_keys,
+            partitions,
+        })
     }
 
-    // TODO Implement get partition
+    pub fn get_partition(&self, row: &dyn InternalRow) -> Result<String> {
+        self.get_partition_spec(row)
+            .map(|ps| ps.get_partition_name())
+    }
+
+    pub fn get_partition_spec(&self, row: &dyn InternalRow) -> 
Result<ResolvedPartitionSpec> {
+        let mut partition_values = Vec::with_capacity(self.partitions.len());
+
+        for (data_type, field_getter) in &self.partitions {
+            let value = field_getter.get_field(row);
+            if value.is_null() {
+                return Err(IllegalArgument {
+                    message: "Partition value shouldn't be null.".to_string(),
+                });
+            }
+            partition_values.push(partition::convert_value_of_type(&value, 
data_type)?);
+        }
+
+        ResolvedPartitionSpec::new(Arc::clone(&self.partition_keys), 
partition_values)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::{DataField, IntType, StringType};
+    use crate::row::{Datum, GenericRow};
+
+    #[test]
+    fn test_partition_getter_single_key() {
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
+            DataField::new(
+                "region".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+        ]);
+
+        let getter = PartitionGetter::new(&row_type, 
Arc::from(["region".to_string()]))
+            .expect("should succeed");
+
+        let row = GenericRow::from_data(vec![Datum::Int32(42), 
Datum::from("US")]);
+        let partition_name = getter.get_partition(&row).expect("should 
succeed");
+        assert_eq!(partition_name, "US");
+    }
+
+    #[test]
+    fn test_partition_getter_multiple_keys() {
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
+            DataField::new(
+                "date".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+            DataField::new(
+                "region".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+        ]);
+
+        let getter = PartitionGetter::new(
+            &row_type,
+            Arc::from(["date".to_string(), "region".to_string()]),
+        )
+        .expect("should succeed");
+
+        let row = GenericRow::from_data(vec![
+            Datum::Int32(42),
+            Datum::from("2024-01-15"),
+            Datum::from("US"),
+        ]);
+        let partition_name = getter.get_partition(&row).expect("should 
succeed");
+        assert_eq!(partition_name, "2024-01-15$US");
+    }
+
+    #[test]
+    fn test_partition_getter_invalid_column() {
+        let row_type = RowType::new(vec![DataField::new(
+            "id".to_string(),
+            DataType::Int(IntType::new()),
+            None,
+        )]);
+
+        let result = PartitionGetter::new(&row_type, 
Arc::from(["nonexistent".to_string()]));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_partition_getter_null_value() {
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
+            DataField::new(
+                "region".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+        ]);
+
+        let getter = PartitionGetter::new(&row_type, 
Arc::from(["region".to_string()]))
+            .expect("should succeed");
+
+        let row = GenericRow::from_data(vec![Datum::Int32(42), Datum::Null]);
+        let result = getter.get_partition(&row);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_get_partition_spec() {
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataType::Int(IntType::new()), 
None),
+            DataField::new(
+                "date".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+            DataField::new(
+                "region".to_string(),
+                DataType::String(StringType::new()),
+                None,
+            ),
+        ]);
+
+        let getter = PartitionGetter::new(
+            &row_type,
+            Arc::from(["date".to_string(), "region".to_string()]),
+        )
+        .expect("should succeed");
+
+        let row = GenericRow::from_data(vec![
+            Datum::Int32(42),
+            Datum::from("2024-01-15"),
+            Datum::from("US"),
+        ]);
+        let spec = getter.get_partition_spec(&row).expect("should succeed");
+
+        assert_eq!(spec.get_partition_keys(), &["date", "region"]);
+        assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
+        assert_eq!(spec.get_partition_name(), "2024-01-15$US");
+    }
 }
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 368d8ab..68426d7 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -93,6 +93,12 @@ pub enum Error {
     )]
     IllegalArgument { message: String },
 
+    #[snafu(
+        visibility(pub(crate)),
+        display("Fluss hitting invalid partition error {}.", message)
+    )]
+    InvalidPartition { message: String },
+
     #[snafu(
         visibility(pub(crate)),
         display("Fluss hitting IO not supported error {}.", message)
diff --git a/crates/fluss/src/metadata/partition.rs 
b/crates/fluss/src/metadata/partition.rs
index e40fbf9..bc1935c 100644
--- a/crates/fluss/src/metadata/partition.rs
+++ b/crates/fluss/src/metadata/partition.rs
@@ -15,11 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::PartitionId;
 use crate::error::{Error, Result};
 use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
+use crate::{PartitionId, TableId};
 use std::collections::HashMap;
 use std::fmt::{Display, Formatter};
+use std::sync::Arc;
 
 /// Represents a partition spec in fluss. Partition columns and values are NOT 
of strict order, and
 /// they need to be re-arranged to the correct order by comparing with a list 
of strictly ordered
@@ -72,20 +73,21 @@ impl Display for PartitionSpec {
 /// partition keys.
 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
 pub struct ResolvedPartitionSpec {
-    partition_keys: Vec<String>,
+    partition_keys: Arc<[String]>,
     partition_values: Vec<String>,
 }
 
 pub const PARTITION_SPEC_SEPARATOR: &str = "$";
 
 impl ResolvedPartitionSpec {
-    pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) -> 
Result<Self> {
+    pub fn new(partition_keys: Arc<[String]>, partition_values: Vec<String>) 
-> Result<Self> {
         if partition_keys.len() != partition_values.len() {
             return Err(Error::IllegalArgument {
                 message: "The number of partition keys and partition values 
should be the same."
                     .to_string(),
             });
         }
+
         Ok(Self {
             partition_keys,
             partition_values,
@@ -93,7 +95,7 @@ impl ResolvedPartitionSpec {
     }
 
     pub fn from_partition_spec(
-        partition_keys: Vec<String>,
+        partition_keys: Arc<[String]>,
         partition_spec: &PartitionSpec,
     ) -> Self {
         let partition_values =
@@ -104,14 +106,7 @@ impl ResolvedPartitionSpec {
         }
     }
 
-    pub fn from_partition_value(partition_key: String, partition_value: 
String) -> Self {
-        Self {
-            partition_keys: vec![partition_key],
-            partition_values: vec![partition_value],
-        }
-    }
-
-    pub fn from_partition_name(partition_keys: Vec<String>, partition_name: 
&str) -> Self {
+    pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name: 
&str) -> Self {
         let partition_values: Vec<String> = partition_name
             .split(PARTITION_SPEC_SEPARATOR)
             .map(|s| s.to_string())
@@ -140,7 +135,7 @@ impl ResolvedPartitionSpec {
         }
 
         Ok(Self {
-            partition_keys: keys,
+            partition_keys: Arc::from(keys),
             partition_values: values,
         })
     }
@@ -236,6 +231,7 @@ impl ResolvedPartitionSpec {
             .iter()
             .map(|kv| kv.value.clone())
             .collect();
+
         Self {
             partition_keys,
             partition_values,
@@ -243,7 +239,7 @@ impl ResolvedPartitionSpec {
     }
 
     fn get_reordered_partition_values(
-        partition_keys: &[String],
+        partition_keys: &Arc<[String]>,
         partition_spec: &PartitionSpec,
     ) -> Vec<String> {
         let partition_spec_map = partition_spec.get_spec_map();
@@ -310,7 +306,7 @@ impl PartitionInfo {
 }
 
 impl Display for PartitionInfo {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
         write!(
             f,
             "Partition{{name='{}', id={}}}",
@@ -323,12 +319,12 @@ impl Display for PartitionInfo {
 /// A class to identify a table partition, containing the table id and the 
partition id.
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub struct TablePartition {
-    table_id: i64,
+    table_id: TableId,
     partition_id: PartitionId,
 }
 
 impl TablePartition {
-    pub fn new(table_id: i64, partition_id: PartitionId) -> Self {
+    pub fn new(table_id: TableId, partition_id: PartitionId) -> Self {
         Self {
             table_id,
             partition_id,
@@ -361,7 +357,7 @@ mod tests {
     #[test]
     fn test_resolved_partition_spec_name() {
         let spec = ResolvedPartitionSpec::new(
-            vec!["date".to_string(), "region".to_string()],
+            Arc::from(["date".to_string(), "region".to_string()]),
             vec!["2024-01-15".to_string(), "US".to_string()],
         )
         .unwrap();
@@ -376,7 +372,7 @@ mod tests {
     #[test]
     fn test_resolved_partition_spec_from_partition_name() {
         let spec = ResolvedPartitionSpec::from_partition_name(
-            vec!["date".to_string(), "region".to_string()],
+            Arc::from(["date".to_string(), "region".to_string()]),
             "2024-01-15$US",
         );
 
@@ -396,7 +392,7 @@ mod tests {
     #[test]
     fn test_resolved_partition_spec_mismatched_lengths() {
         let result = ResolvedPartitionSpec::new(
-            vec!["date".to_string(), "region".to_string()],
+            Arc::from(["date".to_string(), "region".to_string()]),
             vec!["2024-01-15".to_string()],
         );
 
@@ -405,9 +401,11 @@ mod tests {
 
     #[test]
     fn test_partition_info() {
-        let spec =
-            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
-                .unwrap();
+        let spec = ResolvedPartitionSpec::new(
+            Arc::from(["date".to_string()]),
+            vec!["2024-01-15".to_string()],
+        )
+        .unwrap();
 
         let info = PartitionInfo::new(42, spec);
         assert_eq!(info.get_partition_id(), 42);
@@ -438,9 +436,11 @@ mod tests {
 
     #[test]
     fn test_partition_info_pb_roundtrip() {
-        let spec =
-            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
-                .unwrap();
+        let spec = ResolvedPartitionSpec::new(
+            Arc::from(["date".to_string()]),
+            vec!["2024-01-15".to_string()],
+        )
+        .unwrap();
         let info = PartitionInfo::new(42, spec);
 
         let pb = info.to_pb();
@@ -453,14 +453,16 @@ mod tests {
     #[test]
     fn test_contains() {
         let full_spec = ResolvedPartitionSpec::new(
-            vec!["date".to_string(), "region".to_string()],
+            Arc::from(["date".to_string(), "region".to_string()]),
             vec!["2024-01-15".to_string(), "US".to_string()],
         )
         .unwrap();
 
-        let partial_spec =
-            ResolvedPartitionSpec::new(vec!["date".to_string()], 
vec!["2024-01-15".to_string()])
-                .unwrap();
+        let partial_spec = ResolvedPartitionSpec::new(
+            Arc::from(["date".to_string()]),
+            vec!["2024-01-15".to_string()],
+        )
+        .unwrap();
 
         assert!(full_spec.contains(&partial_spec).unwrap());
     }
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 3b9da7d..0c0cdf5 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -678,6 +678,10 @@ impl Display for TablePath {
     }
 }
 
+const MAX_NAME_LENGTH: usize = 200;
+
+const INTERNAL_NAME_PREFIX: &str = "__";
+
 impl TablePath {
     pub fn new(db: String, tbl: String) -> Self {
         TablePath {
@@ -695,6 +699,52 @@ impl TablePath {
     pub fn table(&self) -> &str {
         &self.table
     }
+
+    pub fn detect_invalid_name(identifier: &str) -> Option<String> {
+        if identifier.is_empty() {
+            return Some("the empty string is not allowed".to_string());
+        }
+        if identifier == "." {
+            return Some("'.' is not allowed".to_string());
+        }
+        if identifier == ".." {
+            return Some("'..' is not allowed".to_string());
+        }
+        if identifier.len() > MAX_NAME_LENGTH {
+            return Some(format!(
+                "the length of '{}' is longer than the max allowed length {}",
+                identifier, MAX_NAME_LENGTH
+            ));
+        }
+        if Self::contains_invalid_pattern(identifier) {
+            return Some(format!(
+                "'{}' contains one or more characters other than ASCII 
alphanumerics, '_' and '-'",
+                identifier
+            ));
+        }
+        None
+    }
+
+    pub fn validate_prefix(identifier: &str) -> Option<String> {
+        if identifier.starts_with(INTERNAL_NAME_PREFIX) {
+            return Some(format!(
+                "'{}' is not allowed as prefix, since it is reserved for 
internal databases/internal tables/internal partitions in Fluss server",
+                INTERNAL_NAME_PREFIX
+            ));
+        }
+        None
+    }
+
+    // Valid characters for Fluss table names are the ASCII alphanumerics, '_' 
and '-'.
+    fn contains_invalid_pattern(identifier: &str) -> bool {
+        for c in identifier.chars() {
+            let valid_char = c.is_ascii_alphanumeric() || c == '_' || c == '-';
+            if !valid_char {
+                return true;
+            }
+        }
+        false
+    }
 }
 
 /// A database name, table name and partition name combo. It's used to 
represent the physical path of
@@ -1106,3 +1156,63 @@ impl LakeSnapshot {
         &self.table_buckets_offset
     }
 }
+
+/// Tests for [`TablePath`].
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_validate() {
+        // assert valid name
+        let path = TablePath::new("db_2-abc3".to_string(), 
"table-1_abc_2".to_string());
+        assert!(TablePath::detect_invalid_name(path.database()).is_none());
+        assert!(TablePath::detect_invalid_name(path.table()).is_none());
+        assert_eq!(path.to_string(), "db_2-abc3.table-1_abc_2");
+
+        // assert invalid name prefix
+        assert!(
+            TablePath::validate_prefix("__table-1")
+                .unwrap()
+                .contains("'__' is not allowed as prefix")
+        );
+
+        // check max length
+        let long_name = "a".repeat(200);
+        assert!(TablePath::detect_invalid_name(&long_name).is_none());
+
+        // assert invalid names
+        assert_invalid_name("*abc", "'*abc' contains one or more characters 
other than");
+        assert_invalid_name(
+            "table.abc",
+            "'table.abc' contains one or more characters other than",
+        );
+        assert_invalid_name("", "the empty string is not allowed");
+        assert_invalid_name(" ", "' ' contains one or more characters other 
than");
+        assert_invalid_name(".", "'.' is not allowed");
+        assert_invalid_name("..", "'..' is not allowed");
+        let invalid_long_name = "a".repeat(201);
+        assert_invalid_name(
+            &invalid_long_name,
+            &format!(
+                "the length of '{}' is longer than the max allowed length {}",
+                invalid_long_name, MAX_NAME_LENGTH
+            ),
+        );
+    }
+
+    fn assert_invalid_name(name: &str, expected_message: &str) {
+        let result = TablePath::detect_invalid_name(name);
+        assert!(
+            result.is_some(),
+            "Expected '{}' to be invalid, but it was valid",
+            name
+        );
+        assert!(
+            result.as_ref().unwrap().contains(expected_message),
+            "Expected message containing '{}', but got '{}'",
+            expected_message,
+            result.unwrap()
+        );
+    }
+}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index b808373..e1b70ad 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -789,10 +789,7 @@ impl TimestampNtz {
         }
     }
 
-    pub fn from_millis_nanos(
-        millisecond: i64,
-        nano_of_millisecond: i32,
-    ) -> crate::error::Result<Self> {
+    pub fn from_millis_nanos(millisecond: i64, nano_of_millisecond: i32) -> 
Result<Self> {
         if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
             return Err(crate::error::Error::IllegalArgument {
                 message: format!(
@@ -836,10 +833,7 @@ impl TimestampLtz {
         }
     }
 
-    pub fn from_millis_nanos(
-        epoch_millisecond: i64,
-        nano_of_millisecond: i32,
-    ) -> crate::error::Result<Self> {
+    pub fn from_millis_nanos(epoch_millisecond: i64, nano_of_millisecond: i32) 
-> Result<Self> {
         if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
             return Err(crate::error::Error::IllegalArgument {
                 message: format!(
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 3760487..b987fe2 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 pub mod murmur_hash;
+pub mod partition;
 pub mod varint;
 
 use crate::TableId;
diff --git a/crates/fluss/src/util/partition.rs 
b/crates/fluss/src/util/partition.rs
new file mode 100644
index 0000000..036cac4
--- /dev/null
+++ b/crates/fluss/src/util/partition.rs
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Utils for partition.
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz};
+use jiff::ToSpan;
+use std::fmt::Write;
+
+fn hex_string(bytes: &[u8]) -> String {
+    let mut hex = String::with_capacity(bytes.len() * 2);
+    for &b in bytes {
+        write!(hex, "{:02x}", b).unwrap();
+    }
+    hex
+}
+
+fn reformat_float(value: f32) -> String {
+    if value.is_nan() {
+        "NaN".to_string()
+    } else if value.is_infinite() {
+        if value > 0.0 {
+            "Inf".to_string()
+        } else {
+            "-Inf".to_string()
+        }
+    } else {
+        value.to_string().replace('.', "_")
+    }
+}
+
+fn reformat_double(value: f64) -> String {
+    if value.is_nan() {
+        "NaN".to_string()
+    } else if value.is_infinite() {
+        if value > 0.0 {
+            "Inf".to_string()
+        } else {
+            "-Inf".to_string()
+        }
+    } else {
+        value.to_string().replace('.', "_")
+    }
+}
+
+const UNIX_EPOCH_DATE: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
+
+fn day_to_string(days: i32) -> String {
+    let date = UNIX_EPOCH_DATE + days.days();
+    format!("{:04}-{:02}-{:02}", date.year(), date.month(), date.day())
+}
+
+fn date_to_string(date: Date) -> String {
+    day_to_string(date.get_inner())
+}
+
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND;
+const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE;
+
+fn milli_to_string(milli: i32) -> String {
+    let hour = milli.div_euclid(MILLIS_PER_HOUR as i32);
+    let min = milli
+        .rem_euclid(MILLIS_PER_HOUR as i32)
+        .div_euclid(MILLIS_PER_MINUTE as i32);
+    let sec = milli
+        .rem_euclid(MILLIS_PER_MINUTE as i32)
+        .div_euclid(MILLIS_PER_SECOND as i32);
+    let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32);
+
+    format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms)
+}
+
+fn time_to_string(time: Time) -> String {
+    milli_to_string(time.get_inner())
+}
+
+trait Timestamp {
+    fn get_milli(&self) -> i64;
+    fn get_nano_of_milli(&self) -> i32;
+}
+
+impl Timestamp for TimestampNtz {
+    fn get_milli(&self) -> i64 {
+        self.get_millisecond()
+    }
+
+    fn get_nano_of_milli(&self) -> i32 {
+        self.get_nano_of_millisecond()
+    }
+}
+
+impl Timestamp for TimestampLtz {
+    fn get_milli(&self) -> i64 {
+        self.get_epoch_millisecond()
+    }
+
+    fn get_nano_of_milli(&self) -> i32 {
+        self.get_nano_of_millisecond()
+    }
+}
+
+/// This formats date time while adhering to java side behaviour
+///
+fn timestamp_to_string<T: Timestamp>(ts: T) -> String {
+    let millis = ts.get_milli();
+    let nanos = ts.get_nano_of_milli();
+
+    let millis_of_second = millis.rem_euclid(MILLIS_PER_SECOND);
+    let total_secs = millis.div_euclid(MILLIS_PER_SECOND);
+
+    let epoch = jiff::Timestamp::UNIX_EPOCH;
+    let ts_jiff = epoch + jiff::Span::new().seconds(total_secs);
+    let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+    if nanos > 0 {
+        format!(
+            "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}{:06}",
+            dt.year(),
+            dt.month(),
+            dt.day(),
+            dt.hour(),
+            dt.minute(),
+            dt.second(),
+            millis_of_second,
+            nanos
+        )
+    } else if millis_of_second > 0 {
+        format!(
+            "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{:03}",
+            dt.year(),
+            dt.month(),
+            dt.day(),
+            dt.hour(),
+            dt.minute(),
+            dt.second(),
+            millis_of_second
+        )
+    } else {
+        format!(
+            "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_",
+            dt.year(),
+            dt.month(),
+            dt.day(),
+            dt.hour(),
+            dt.minute(),
+            dt.second(),
+        )
+    }
+}
+
+/// Converts a Datum value to its string representation for partition naming.
+pub fn convert_value_of_type(value: &Datum, data_type: &DataType) -> 
Result<String> {
+    match (value, data_type) {
+        (Datum::String(s), DataType::Char(_) | DataType::String(_)) => 
Ok(s.to_string()),
+        (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()),
+        (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => 
Ok(hex_string(bytes)),
+        (Datum::Int8(v), DataType::TinyInt(_)) => Ok(v.to_string()),
+        (Datum::Int16(v), DataType::SmallInt(_)) => Ok(v.to_string()),
+        (Datum::Int32(v), DataType::Int(_)) => Ok(v.to_string()),
+        (Datum::Int64(v), DataType::BigInt(_)) => Ok(v.to_string()),
+        (Datum::Date(d), DataType::Date(_)) => Ok(date_to_string(*d)),
+        (Datum::Time(t), DataType::Time(_)) => Ok(time_to_string(*t)),
+        (Datum::Float32(f), DataType::Float(_)) => 
Ok(reformat_float(f.into_inner())),
+        (Datum::Float64(f), DataType::Double(_)) => 
Ok(reformat_double(f.into_inner())),
+        (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => 
Ok(timestamp_to_string(*ts)),
+        (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => 
Ok(timestamp_to_string(*ts)),
+        _ => Err(IllegalArgument {
+            message: format!(
+                "Unsupported conversion to partition key from data type: 
{data_type:?}, value: {value:?}"
+            ),
+        }),
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::{
+        BigIntType, BinaryType, BooleanType, BytesType, CharType, DateType, 
DoubleType, FloatType,
+        IntType, SmallIntType, StringType, TimeType, TimestampLTzType, 
TimestampType, TinyIntType,
+    };
+    use crate::row::{Date, Time, TimestampLtz, TimestampNtz};
+    use std::borrow::Cow;
+
+    use crate::metadata::TablePath;
+
+    #[test]
+    fn test_string() {
+        let datum = Datum::String(Cow::Borrowed("Fluss"));
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::String(StringType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "Fluss");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_char() {
+        let datum = Datum::String(Cow::Borrowed("F"));
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Char(CharType::new(1)))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "F");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_boolean() {
+        let datum = Datum::Bool(true);
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Boolean(BooleanType::new()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "true");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_byte() {
+        let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 
0xFF]));
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Bytes(BytesType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "1020304050ff");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_binary() {
+        let datum = Datum::Blob(Cow::Borrowed(&[0x10, 0x20, 0x30, 0x40, 0x50, 
0xFF]));
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Binary(BinaryType::new(6)))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "1020304050ff");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_tiny_int() {
+        let datum = Datum::Int8(100);
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::TinyInt(TinyIntType::new()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "100");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_small_int() {
+        let datum = Datum::Int16(-32760);
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::SmallInt(SmallIntType::new()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "-32760");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_int() {
+        let datum = Datum::Int32(299000);
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Int(IntType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "299000");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_big_int() {
+        let datum = Datum::Int64(1748662955428);
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::BigInt(BigIntType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "1748662955428");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_date() {
+        let datum = Datum::Date(Date::new(20235));
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Date(DateType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-27");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_time() {
+        let datum = Datum::Time(Time::new(5402199));
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Time(TimeType::new(3).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "01-30-02_199");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_float() {
+        let datum = Datum::Float32(5.73.into());
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Float(FloatType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "5_73");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        let datum = Datum::Float32(f32::NAN.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+                .expect("datum conversion to partition string failed"),
+            "NaN"
+        );
+
+        let datum = Datum::Float32(f32::INFINITY.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+                .expect("datum conversion to partition string failed"),
+            "Inf"
+        );
+
+        let datum = Datum::Float32(f32::NEG_INFINITY.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Float(FloatType::new()))
+                .expect("datum conversion to partition string failed"),
+            "-Inf"
+        );
+    }
+
+    #[test]
+    fn test_double() {
+        let datum = Datum::Float64(5.73737.into());
+
+        let to_string_result = convert_value_of_type(&datum, 
&DataType::Double(DoubleType::new()))
+            .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "5_73737");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        let datum = Datum::Float64(f64::NAN.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+                .expect("datum conversion to partition string failed"),
+            "NaN"
+        );
+
+        let datum = Datum::Float64(f64::INFINITY.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+                .expect("datum conversion to partition string failed"),
+            "Inf"
+        );
+
+        let datum = Datum::Float64(f64::NEG_INFINITY.into());
+        assert_eq!(
+            convert_value_of_type(&datum, &DataType::Double(DoubleType::new()))
+                .expect("datum conversion to partition string failed"),
+            "-Inf"
+        );
+    }
+
+    #[test]
+    fn test_timestamp_ntz() {
+        let datum = Datum::TimestampNtz(
+            TimestampNtz::from_millis_nanos(1748662955428, 99988)
+                .expect("TimestampNtz init failed"),
+        );
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero nanos of millis
+        let datum = Datum::TimestampNtz(
+            TimestampNtz::from_millis_nanos(1748662955428, 
0).expect("TimestampNtz init failed"),
+        );
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_428");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero millis
+        let datum = Datum::TimestampNtz(
+            TimestampNtz::from_millis_nanos(1748662955000, 99988)
+                .expect("TimestampNtz init failed"),
+        );
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero millis and zero nanos
+        let datum = Datum::TimestampNtz(
+            TimestampNtz::from_millis_nanos(1748662955000, 
0).expect("TimestampNtz init failed"),
+        );
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Negative millis
+        let datum = Datum::TimestampNtz(
+            TimestampNtz::from_millis_nanos(-1748662955428, 99988)
+                .expect("TimestampNtz init failed"),
+        );
+
+        let to_string_result =
+            convert_value_of_type(&datum, 
&DataType::Timestamp(TimestampType::new(9).unwrap()))
+                .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+
+    #[test]
+    fn test_timestamp_ltz() {
+        let datum = Datum::TimestampLtz(
+            TimestampLtz::from_millis_nanos(1748662955428, 99988)
+                .expect("TimestampLtz init failed"),
+        );
+
+        let to_string_result = convert_value_of_type(
+            &datum,
+            &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+        )
+        .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_428099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero nanos of millis
+        let datum = Datum::TimestampLtz(
+            TimestampLtz::from_millis_nanos(1748662955428, 
0).expect("TimestampLtz init failed"),
+        );
+
+        let to_string_result = convert_value_of_type(
+            &datum,
+            &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+        )
+        .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_428");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero millis
+        let datum = Datum::TimestampLtz(
+            TimestampLtz::from_millis_nanos(1748662955000, 99988)
+                .expect("TimestampLtz init failed"),
+        );
+
+        let to_string_result = convert_value_of_type(
+            &datum,
+            &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+        )
+        .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_000099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Zero millis and zero nanos
+        let datum = Datum::TimestampLtz(
+            TimestampLtz::from_millis_nanos(1748662955000, 
0).expect("TimestampLtz init failed"),
+        );
+
+        let to_string_result = convert_value_of_type(
+            &datum,
+            &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+        )
+        .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "2025-05-31-03-42-35_");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+
+        // Negative millis
+        let datum = Datum::TimestampLtz(
+            TimestampLtz::from_millis_nanos(-1748662955428, 99988)
+                .expect("TimestampLtz init failed"),
+        );
+
+        let to_string_result = convert_value_of_type(
+            &datum,
+            &DataType::TimestampLTz(TimestampLTzType::new(9).unwrap()),
+        )
+        .expect("datum conversion to partition string failed");
+        assert_eq!(to_string_result, "1914-08-03-20-17-24_572099988");
+        let detect_invalid = TablePath::detect_invalid_name(&to_string_result);
+        assert!(detect_invalid.is_none());
+    }
+}


Reply via email to