Copilot commented on code in PR #214:
URL: https://github.com/apache/fluss-rust/pull/214#discussion_r2725548048


##########
crates/fluss/src/util/partition.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utils for partition.
+
+#![allow(dead_code)]
+
+use crate::error::{Error, Result};
+use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, 
TablePath};
+use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz};
+use jiff::ToSpan;
+use jiff::Zoned;
+use jiff::civil::DateTime;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum AutoPartitionTimeUnit {
+    Year,
+    Quarter,
+    Month,
+    Day,
+    Hour,
+}
+
+pub fn validate_partition_spec(
+    table_path: &TablePath,
+    partition_keys: &[String],
+    partition_spec: &PartitionSpec,
+    is_create: bool,
+) -> Result<()> {
+    let partition_spec_map = partition_spec.get_spec_map();
+    if partition_keys.len() != partition_spec_map.len() {
+        return Err(Error::InvalidPartition {
+            message: format!(
+                "PartitionSpec size is not equal to partition keys size for 
partitioned table {}.",
+                table_path
+            ),
+        });
+    }
+
+    let mut reordered_partition_values: Vec<&str> = 
Vec::with_capacity(partition_keys.len());
+    for partition_key in partition_keys {
+        if let Some(value) = partition_spec_map.get(partition_key) {
+            reordered_partition_values.push(value);
+        } else {
+            return Err(Error::InvalidPartition {
+                message: format!(
+                    "PartitionSpec {} does not contain partition key '{}' for 
partitioned table {}.",
+                    partition_spec, partition_key, table_path
+                ),
+            });
+        }
+    }
+
+    validate_partition_values(&reordered_partition_values, is_create)
+}
+
+fn validate_partition_values(partition_values: &[&str], is_create: bool) -> 
Result<()> {
+    for value in partition_values {
+        let invalid_name_error = TablePath::detect_invalid_name(value);
+        let prefix_error = if is_create {
+            TablePath::validate_prefix(value)
+        } else {
+            None
+        };
+
+        if invalid_name_error.is_some() || prefix_error.is_some() {
+            let error_msg = invalid_name_error.unwrap_or_else(|| 
prefix_error.unwrap());
+            return Err(Error::InvalidPartition {
+                message: format!("The partition value {} is invalid: {}", 
value, error_msg),
+            });
+        }
+    }
+    Ok(())
+}
+
+/// Generate [`ResolvedPartitionSpec`] for auto partition in server. When we 
auto creating a
+/// partition, we need to first generate a [`ResolvedPartitionSpec`].
+///
+/// The value is the formatted time with the specified time unit.
+pub fn generate_auto_partition(
+    partition_keys: Vec<String>,
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> ResolvedPartitionSpec {
+    let auto_partition_field_spec = generate_auto_partition_time(current, 
offset, time_unit);
+    ResolvedPartitionSpec::from_partition_name(partition_keys, 
&auto_partition_field_spec)
+}
+
+pub fn generate_auto_partition_time(
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> String {
+    match time_unit {
+        AutoPartitionTimeUnit::Year => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().years(offset))
+                .expect("year overflow");
+            format!("{}", adjusted.year())
+        }
+        AutoPartitionTimeUnit::Quarter => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset * 3))
+                .expect("quarter overflow");
+            let quarter = (adjusted.month() as i32 - 1) / 3 + 1;
+            format!("{}{}", adjusted.year(), quarter)
+        }
+        AutoPartitionTimeUnit::Month => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset))
+                .expect("month overflow");
+            format!("{}{:02}", adjusted.year(), adjusted.month())
+        }
+        AutoPartitionTimeUnit::Day => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().days(offset))
+                .expect("day overflow");
+            format!(
+                "{}{:02}{:02}",
+                adjusted.year(),
+                adjusted.month(),
+                adjusted.day()
+            )
+        }
+        AutoPartitionTimeUnit::Hour => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().hours(offset))
+                .expect("hour overflow");

Review Comment:
   The use of `expect` with overflow error messages could be improved. When 
`checked_add` returns `None`, it means the date/time arithmetic would overflow, 
which is a runtime error condition that should be handled gracefully rather 
than causing a panic. Consider returning a `Result` type from this function and 
propagating the error using the `?` operator or by constructing an appropriate 
`Error` variant.
   
   For example:
   ```rust
   let adjusted = current
       .checked_add(jiff::Span::new().years(offset))
       .ok_or_else(|| Error::IllegalArgument {
           message: "Year offset would cause overflow".to_string(),
       })?;
   ```
   
   This applies to all time unit cases (year, quarter, month, day, hour).



##########
crates/fluss/src/util/partition.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utils for partition.
+
+#![allow(dead_code)]
+
+use crate::error::{Error, Result};
+use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, 
TablePath};
+use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz};
+use jiff::ToSpan;
+use jiff::Zoned;
+use jiff::civil::DateTime;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum AutoPartitionTimeUnit {
+    Year,
+    Quarter,
+    Month,
+    Day,
+    Hour,
+}
+
+pub fn validate_partition_spec(
+    table_path: &TablePath,
+    partition_keys: &[String],
+    partition_spec: &PartitionSpec,
+    is_create: bool,
+) -> Result<()> {
+    let partition_spec_map = partition_spec.get_spec_map();
+    if partition_keys.len() != partition_spec_map.len() {
+        return Err(Error::InvalidPartition {
+            message: format!(
+                "PartitionSpec size is not equal to partition keys size for 
partitioned table {}.",
+                table_path
+            ),
+        });
+    }
+
+    let mut reordered_partition_values: Vec<&str> = 
Vec::with_capacity(partition_keys.len());
+    for partition_key in partition_keys {
+        if let Some(value) = partition_spec_map.get(partition_key) {
+            reordered_partition_values.push(value);
+        } else {
+            return Err(Error::InvalidPartition {
+                message: format!(
+                    "PartitionSpec {} does not contain partition key '{}' for 
partitioned table {}.",
+                    partition_spec, partition_key, table_path
+                ),
+            });
+        }
+    }
+
+    validate_partition_values(&reordered_partition_values, is_create)
+}
+
+fn validate_partition_values(partition_values: &[&str], is_create: bool) -> 
Result<()> {
+    for value in partition_values {
+        let invalid_name_error = TablePath::detect_invalid_name(value);
+        let prefix_error = if is_create {
+            TablePath::validate_prefix(value)
+        } else {
+            None
+        };
+
+        if invalid_name_error.is_some() || prefix_error.is_some() {
+            let error_msg = invalid_name_error.unwrap_or_else(|| 
prefix_error.unwrap());
+            return Err(Error::InvalidPartition {
+                message: format!("The partition value {} is invalid: {}", 
value, error_msg),
+            });
+        }
+    }
+    Ok(())
+}
+
+/// Generate [`ResolvedPartitionSpec`] for auto partition in server. When we 
auto creating a
+/// partition, we need to first generate a [`ResolvedPartitionSpec`].
+///
+/// The value is the formatted time with the specified time unit.
+pub fn generate_auto_partition(
+    partition_keys: Vec<String>,
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> ResolvedPartitionSpec {
+    let auto_partition_field_spec = generate_auto_partition_time(current, 
offset, time_unit);
+    ResolvedPartitionSpec::from_partition_name(partition_keys, 
&auto_partition_field_spec)
+}
+
+pub fn generate_auto_partition_time(
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> String {
+    match time_unit {
+        AutoPartitionTimeUnit::Year => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().years(offset))
+                .expect("year overflow");
+            format!("{}", adjusted.year())
+        }
+        AutoPartitionTimeUnit::Quarter => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset * 3))
+                .expect("quarter overflow");
+            let quarter = (adjusted.month() as i32 - 1) / 3 + 1;
+            format!("{}{}", adjusted.year(), quarter)
+        }
+        AutoPartitionTimeUnit::Month => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset))
+                .expect("month overflow");
+            format!("{}{:02}", adjusted.year(), adjusted.month())
+        }
+        AutoPartitionTimeUnit::Day => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().days(offset))
+                .expect("day overflow");
+            format!(
+                "{}{:02}{:02}",
+                adjusted.year(),
+                adjusted.month(),
+                adjusted.day()
+            )
+        }
+        AutoPartitionTimeUnit::Hour => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().hours(offset))
+                .expect("hour overflow");
+            format!(
+                "{}{:02}{:02}{:02}",
+                adjusted.year(),
+                adjusted.month(),
+                adjusted.day(),
+                adjusted.hour()
+            )
+        }
+    }
+}
+
+fn hex_string(bytes: &[u8]) -> String {
+    let mut hex = String::with_capacity(bytes.len() * 2);
+    for &b in bytes {
+        let h = format!("{:x}", b);
+        if h.len() == 1 {
+            hex.push('0');
+        }
+        hex.push_str(&h);
+    }
+    hex
+}
+
+fn reformat_float(value: f32) -> String {
+    if value.is_nan() {
+        "NaN".to_string()
+    } else if value.is_infinite() {
+        if value > 0.0 {
+            "Inf".to_string()
+        } else {
+            "-Inf".to_string()
+        }
+    } else {
+        value.to_string().replace('.', "_")
+    }
+}
+
+fn reformat_double(value: f64) -> String {
+    if value.is_nan() {
+        "NaN".to_string()
+    } else if value.is_infinite() {
+        if value > 0.0 {
+            "Inf".to_string()
+        } else {
+            "-Inf".to_string()
+        }
+    } else {
+        value.to_string().replace('.', "_")
+    }
+}
+
+const UNIX_EPOCH_DATE: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
+
+fn day_to_string(days: i32) -> String {
+    let date = UNIX_EPOCH_DATE + days.days();
+    format!("{:04}-{:02}-{:02}", date.year(), date.month(), date.day())
+}
+
+fn date_to_string(date: Date) -> String {
+    day_to_string(date.get_inner())
+}
+
+const NANOS_PER_MILLIS: i64 = 1_000_000;
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MILLIS_PER_MINUTE: i64 = 60 * MILLIS_PER_SECOND;
+const MILLIS_PER_HOUR: i64 = 60 * MILLIS_PER_MINUTE;
+
+fn milli_to_string(milli: i32) -> String {
+    let hour = milli.div_euclid(MILLIS_PER_HOUR as i32);
+    let min = milli
+        .rem_euclid(MILLIS_PER_HOUR as i32)
+        .div_euclid(MILLIS_PER_MINUTE as i32);
+    let sec = milli
+        .rem_euclid(MILLIS_PER_MINUTE as i32)
+        .div_euclid(MILLIS_PER_SECOND as i32);
+    let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32);
+
+    format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms)
+}
+
+fn time_to_string(time: Time) -> String {
+    milli_to_string(time.get_inner())
+}
+
+/// Always add nanoseconds whether TimestampNtz and TimestampLtz are compact 
or not.
+fn timestamp_ntz_to_string(ts: TimestampNtz) -> String {
+    let millis = ts.get_millisecond();
+    let nano_of_milli = ts.get_nano_of_millisecond();
+
+    let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + 
(nano_of_milli as i64);
+    let total_secs = millis / MILLIS_PER_SECOND;
+
+    let epoch = jiff::Timestamp::UNIX_EPOCH;
+    let ts_jiff = epoch + jiff::Span::new().seconds(total_secs);
+    let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+    format_date_time(total_nanos, dt)
+}
+
+fn timestamp_ltz_to_string(ts: TimestampLtz) -> String {
+    let millis = ts.get_epoch_millisecond();
+    let nano_of_milli = ts.get_nano_of_millisecond();
+
+    let total_nanos = (millis % MILLIS_PER_SECOND) * NANOS_PER_MILLIS + 
(nano_of_milli as i64);
+    let total_secs = millis / MILLIS_PER_SECOND;
+
+    let epoch = jiff::Timestamp::UNIX_EPOCH;
+    let ts_jiff = epoch + jiff::Span::new().seconds(total_secs);
+    let dt = ts_jiff.to_zoned(jiff::tz::TimeZone::UTC).datetime();
+
+    format_date_time(total_nanos, dt)
+}
+
+fn format_date_time(total_nanos: i64, dt: DateTime) -> String {
+    if total_nanos > 0 {
+        format!(
+            "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}_{}",
+            dt.year(),
+            dt.month(),
+            dt.day(),
+            dt.hour(),
+            dt.minute(),
+            dt.second(),
+            total_nanos
+        )
+    } else {
+        format!(
+            "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}",
+            dt.year(),
+            dt.month(),
+            dt.day(),
+            dt.hour(),
+            dt.minute(),
+            dt.second(),
+        )
+    }
+}
+
+/// Converts a Datum value to its string representation for partition naming.
+pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> String {
+    match (value, data_type) {
+        (Datum::String(s), DataType::Char(_) | DataType::String(_)) => 
s.to_string(),
+        (Datum::Bool(b), DataType::Boolean(_)) => b.to_string(),
+        (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => 
hex_string(bytes),
+        (Datum::Int8(v), DataType::TinyInt(_)) => v.to_string(),
+        (Datum::Int16(v), DataType::SmallInt(_)) => v.to_string(),
+        (Datum::Int32(v), DataType::Int(_)) => v.to_string(),
+        (Datum::Int64(v), DataType::BigInt(_)) => v.to_string(),
+        (Datum::Date(d), DataType::Date(_)) => date_to_string(*d),
+        (Datum::Time(t), DataType::Time(_)) => time_to_string(*t),
+        (Datum::Float32(f), DataType::Float(_)) => 
reformat_float(f.into_inner()),
+        (Datum::Float64(f), DataType::Double(_)) => 
reformat_double(f.into_inner()),
+        (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => 
timestamp_ltz_to_string(*ts),
+        (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => 
timestamp_ntz_to_string(*ts),
+        _ => panic!(
+            "Unsupported data type for partition key: {:?}, value: {:?}",
+            data_type, value
+        ),

Review Comment:
   Using `panic!` for unsupported data types in `convert_value_to_string` is 
inconsistent with error handling patterns seen elsewhere in the codebase. Since 
this function is called at runtime with partition values, encountering an 
unsupported type should return an error rather than panic. Consider changing 
the return type to `Result<String>` and returning an `Error::IllegalArgument` 
for unsupported types, similar to the pattern used in other parts of the 
codebase (e.g., `crates/fluss/src/record/arrow.rs:306`).
   ```suggestion
   pub fn convert_value_to_string(value: &Datum, data_type: &DataType) -> 
Result<String> {
       match (value, data_type) {
           (Datum::String(s), DataType::Char(_) | DataType::String(_)) => 
Ok(s.to_string()),
           (Datum::Bool(b), DataType::Boolean(_)) => Ok(b.to_string()),
           (Datum::Blob(bytes), DataType::Binary(_) | DataType::Bytes(_)) => 
Ok(hex_string(bytes)),
           (Datum::Int8(v), DataType::TinyInt(_)) => Ok(v.to_string()),
           (Datum::Int16(v), DataType::SmallInt(_)) => Ok(v.to_string()),
           (Datum::Int32(v), DataType::Int(_)) => Ok(v.to_string()),
           (Datum::Int64(v), DataType::BigInt(_)) => Ok(v.to_string()),
           (Datum::Date(d), DataType::Date(_)) => Ok(date_to_string(*d)),
           (Datum::Time(t), DataType::Time(_)) => Ok(time_to_string(*t)),
           (Datum::Float32(f), DataType::Float(_)) => 
Ok(reformat_float(f.into_inner())),
           (Datum::Float64(f), DataType::Double(_)) => 
Ok(reformat_double(f.into_inner())),
           (Datum::TimestampLtz(ts), DataType::TimestampLTz(_)) => {
               Ok(timestamp_ltz_to_string(*ts))
           }
           (Datum::TimestampNtz(ts), DataType::Timestamp(_)) => {
               Ok(timestamp_ntz_to_string(*ts))
           }
           _ => Err(Error::IllegalArgument(format!(
               "Unsupported data type for partition key: {:?}, value: {:?}",
               data_type, value
           ))),
   ```



##########
crates/fluss/src/client/table/partition_getter.rs:
##########
@@ -17,40 +17,186 @@
 
 use crate::error::Error::IllegalArgument;
 use crate::error::Result;
-use crate::metadata::{DataType, RowType};
+use crate::metadata::{DataType, ResolvedPartitionSpec, RowType};
+use crate::row::InternalRow;
 use crate::row::field_getter::FieldGetter;
+use crate::util::partition;
 
+/// A getter to get partition name from a row.
 #[allow(dead_code)]
-pub struct PartitionGetter<'a> {
-    partitions: Vec<(&'a String, &'a DataType, FieldGetter)>,
+pub struct PartitionGetter {
+    partition_keys: Vec<String>,
+    partitions: Vec<(DataType, FieldGetter)>,
 }
 
 #[allow(dead_code)]
-impl<'a> PartitionGetter<'a> {
-    pub fn new(row_type: &'a RowType, partition_keys: &'a Vec<String>) -> 
Result<Self> {
+impl PartitionGetter {
+    pub fn new(row_type: &RowType, partition_keys: Vec<String>) -> 
Result<Self> {
         let mut partitions = Vec::with_capacity(partition_keys.len());
 
-        for partition_key in partition_keys {
+        for partition_key in &partition_keys {
             if let Some(partition_col_index) = 
row_type.get_field_index(partition_key.as_str()) {
-                let data_type = &row_type
+                let data_type = row_type
                     .fields()
                     .get(partition_col_index)
                     .unwrap()
-                    .data_type;
-                let field_getter = FieldGetter::create(data_type, 
partition_col_index);
+                    .data_type
+                    .clone();
+                let field_getter = FieldGetter::create(&data_type, 
partition_col_index);
 
-                partitions.push((partition_key, data_type, field_getter));
+                partitions.push((data_type, field_getter));
             } else {
                 return Err(IllegalArgument {
                     message: format!(
-                        "The partition column {partition_key} is not in the 
row {row_type}."
+                        "The partition column {} is not in the row {}.",
+                        partition_key, row_type
                     ),
                 });
             };
         }
 
-        Ok(Self { partitions })
+        Ok(Self {
+            partition_keys,
+            partitions,
+        })
     }
 
-    // TODO Implement get partition
+    pub fn get_partition(&self, row: &dyn InternalRow) -> Result<String> {
+        self.get_partition_spec(row)
+            .map(|ps| ps.get_partition_name())
+    }
+
+    pub fn get_partition_spec(&self, row: &dyn InternalRow) -> 
Result<ResolvedPartitionSpec> {
+        let mut partition_values = Vec::with_capacity(self.partitions.len());
+
+        for (data_type, field_getter) in &self.partitions {
+            let value = field_getter.get_field(row);
+            if value.is_null() {
+                return Err(IllegalArgument {
+                    message: "Partition value shouldn't be null.".to_string(),
+                });
+            }
+            partition_values.push(partition::convert_value_to_string(&value, 
data_type));
+        }
+
+        ResolvedPartitionSpec::new(self.partition_keys.clone(), 
partition_values)

Review Comment:
   The `partition_keys` field is cloned in `get_partition_spec` (line 82) every 
time this method is called. Since `partition_keys` is only used for 
constructing `ResolvedPartitionSpec`, consider whether this cloning is 
necessary or if there's a more efficient approach. If 
`ResolvedPartitionSpec::new` could accept a reference, or if the partition_keys 
could be pre-computed once, it would avoid repeated allocations.



##########
crates/fluss/src/util/partition.rs:
##########
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utils for partition.
+
+#![allow(dead_code)]
+
+use crate::error::{Error, Result};
+use crate::metadata::{DataType, PartitionSpec, ResolvedPartitionSpec, 
TablePath};
+use crate::row::{Date, Datum, Time, TimestampLtz, TimestampNtz};
+use jiff::ToSpan;
+use jiff::Zoned;
+use jiff::civil::DateTime;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum AutoPartitionTimeUnit {
+    Year,
+    Quarter,
+    Month,
+    Day,
+    Hour,
+}
+
+pub fn validate_partition_spec(
+    table_path: &TablePath,
+    partition_keys: &[String],
+    partition_spec: &PartitionSpec,
+    is_create: bool,
+) -> Result<()> {
+    let partition_spec_map = partition_spec.get_spec_map();
+    if partition_keys.len() != partition_spec_map.len() {
+        return Err(Error::InvalidPartition {
+            message: format!(
+                "PartitionSpec size is not equal to partition keys size for 
partitioned table {}.",
+                table_path
+            ),
+        });
+    }
+
+    let mut reordered_partition_values: Vec<&str> = 
Vec::with_capacity(partition_keys.len());
+    for partition_key in partition_keys {
+        if let Some(value) = partition_spec_map.get(partition_key) {
+            reordered_partition_values.push(value);
+        } else {
+            return Err(Error::InvalidPartition {
+                message: format!(
+                    "PartitionSpec {} does not contain partition key '{}' for 
partitioned table {}.",
+                    partition_spec, partition_key, table_path
+                ),
+            });
+        }
+    }
+
+    validate_partition_values(&reordered_partition_values, is_create)
+}
+
+fn validate_partition_values(partition_values: &[&str], is_create: bool) -> 
Result<()> {
+    for value in partition_values {
+        let invalid_name_error = TablePath::detect_invalid_name(value);
+        let prefix_error = if is_create {
+            TablePath::validate_prefix(value)
+        } else {
+            None
+        };
+
+        if invalid_name_error.is_some() || prefix_error.is_some() {
+            let error_msg = invalid_name_error.unwrap_or_else(|| 
prefix_error.unwrap());
+            return Err(Error::InvalidPartition {
+                message: format!("The partition value {} is invalid: {}", 
value, error_msg),
+            });
+        }
+    }
+    Ok(())
+}
+
+/// Generate [`ResolvedPartitionSpec`] for auto partition in server. When we 
auto creating a
+/// partition, we need to first generate a [`ResolvedPartitionSpec`].
+///
+/// The value is the formatted time with the specified time unit.
+pub fn generate_auto_partition(
+    partition_keys: Vec<String>,
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> ResolvedPartitionSpec {
+    let auto_partition_field_spec = generate_auto_partition_time(current, 
offset, time_unit);
+    ResolvedPartitionSpec::from_partition_name(partition_keys, 
&auto_partition_field_spec)
+}
+
+pub fn generate_auto_partition_time(
+    current: &Zoned,
+    offset: i32,
+    time_unit: AutoPartitionTimeUnit,
+) -> String {
+    match time_unit {
+        AutoPartitionTimeUnit::Year => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().years(offset))
+                .expect("year overflow");
+            format!("{}", adjusted.year())
+        }
+        AutoPartitionTimeUnit::Quarter => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset * 3))
+                .expect("quarter overflow");
+            let quarter = (adjusted.month() as i32 - 1) / 3 + 1;
+            format!("{}{}", adjusted.year(), quarter)
+        }
+        AutoPartitionTimeUnit::Month => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().months(offset))
+                .expect("month overflow");
+            format!("{}{:02}", adjusted.year(), adjusted.month())
+        }
+        AutoPartitionTimeUnit::Day => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().days(offset))
+                .expect("day overflow");
+            format!(
+                "{}{:02}{:02}",
+                adjusted.year(),
+                adjusted.month(),
+                adjusted.day()
+            )
+        }
+        AutoPartitionTimeUnit::Hour => {
+            let adjusted = current
+                .checked_add(jiff::Span::new().hours(offset))
+                .expect("hour overflow");
+            format!(
+                "{}{:02}{:02}{:02}",
+                adjusted.year(),
+                adjusted.month(),
+                adjusted.day(),
+                adjusted.hour()
+            )
+        }
+    }
+}
+
+fn hex_string(bytes: &[u8]) -> String {
+    let mut hex = String::with_capacity(bytes.len() * 2);
+    for &b in bytes {
+        let h = format!("{:x}", b);
+        if h.len() == 1 {
+            hex.push('0');
+        }
+        hex.push_str(&h);
+    }
+    hex
+}

Review Comment:
   The `hex_string` function uses `format!("{:x}", b)` followed by manual 
zero-padding. This is less efficient and less idiomatic than using 
`format!("{:02x}", b)` which handles both formatting and zero-padding in one 
call. This would simplify the code and improve performance slightly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to