Copilot commented on code in PR #208: URL: https://github.com/apache/fluss-rust/pull/208#discussion_r2724255613
########## crates/fluss/src/metadata/partition.rs: ########## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionId; +use crate::error::{Error, Result}; +use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +/// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and +/// they need to be re-arranged to the correct order by comparing with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionSpec { + partition_spec: HashMap<String, String>, +} + +impl PartitionSpec { + pub fn new(partition_spec: HashMap<String, String>) -> Self { + Self { partition_spec } + } + + pub fn get_spec_map(&self) -> &HashMap<String, String> { + &self.partition_spec + } + + pub fn to_pb(&self) -> PbPartitionSpec { + PbPartitionSpec { + partition_key_values: self + .partition_spec + .iter() + .map(|(k, v)| PbKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + } + } + + pub fn from_pb(pb: &PbPartitionSpec) -> Self { + let partition_spec = pb + .partition_key_values + .iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + Self { partition_spec } + } +} + +impl Display for PartitionSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "PartitionSpec{{{:?}}}", self.partition_spec) + } +} + +/// Represents a partition, which is the resolved version of PartitionSpec. The partition +/// spec is re-arranged into the correct order by comparing it with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResolvedPartitionSpec { + partition_keys: Vec<String>, + partition_values: Vec<String>, +} + +pub const PARTITION_SPEC_SEPARATOR: &str = "$"; + +impl ResolvedPartitionSpec { + pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) -> Result<Self> { + if partition_keys.len() != partition_values.len() { + return Err(Error::IllegalArgument { + message: "The number of partition keys and partition values should be the same." + .to_string(), + }); + } + Ok(Self { + partition_keys, + partition_values, + }) + } + + pub fn from_partition_spec( + partition_keys: Vec<String>, + partition_spec: &PartitionSpec, + ) -> Self { + let partition_values = + Self::get_reordered_partition_values(&partition_keys, partition_spec); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { + Self { + partition_keys: vec![partition_key], + partition_values: vec![partition_value], + } + } + + pub fn from_partition_name(partition_keys: Vec<String>, partition_name: &str) -> Self { + let partition_values: Vec<String> = + partition_name.split('$').map(|s| s.to_string()).collect(); Review Comment: `ResolvedPartitionSpec::from_partition_name` splits on a hard-coded '$' instead of using `PARTITION_SPEC_SEPARATOR`. This makes the implementation easy to break if the separator constant ever changes; use the constant for parsing as well. ```suggestion partition_name .split(PARTITION_SPEC_SEPARATOR) .map(|s| s.to_string()) .collect(); ``` ########## crates/fluss/src/metadata/partition.rs: ########## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionId; +use crate::error::{Error, Result}; +use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +/// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and +/// they need to be re-arranged to the correct order by comparing with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionSpec { + partition_spec: HashMap<String, String>, +} + +impl PartitionSpec { + pub fn new(partition_spec: HashMap<String, String>) -> Self { + Self { partition_spec } + } + + pub fn get_spec_map(&self) -> &HashMap<String, String> { + &self.partition_spec + } + + pub fn to_pb(&self) -> PbPartitionSpec { + PbPartitionSpec { + partition_key_values: self + .partition_spec + .iter() + .map(|(k, v)| PbKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + } + } + + pub fn from_pb(pb: &PbPartitionSpec) -> Self { + let partition_spec = pb + .partition_key_values + .iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + Self { partition_spec } + } +} + +impl Display for PartitionSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "PartitionSpec{{{:?}}}", self.partition_spec) + } +} + +/// Represents a partition, which is the resolved version of PartitionSpec. The partition +/// spec is re-arranged into the correct order by comparing it with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResolvedPartitionSpec { + partition_keys: Vec<String>, + partition_values: Vec<String>, +} + +pub const PARTITION_SPEC_SEPARATOR: &str = "$"; + +impl ResolvedPartitionSpec { + pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) -> Result<Self> { + if partition_keys.len() != partition_values.len() { + return Err(Error::IllegalArgument { + message: "The number of partition keys and partition values should be the same." + .to_string(), + }); + } + Ok(Self { + partition_keys, + partition_values, + }) + } + + pub fn from_partition_spec( + partition_keys: Vec<String>, + partition_spec: &PartitionSpec, + ) -> Self { + let partition_values = + Self::get_reordered_partition_values(&partition_keys, partition_spec); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { + Self { + partition_keys: vec![partition_key], + partition_values: vec![partition_value], + } + } + + pub fn from_partition_name(partition_keys: Vec<String>, partition_name: &str) -> Self { + let partition_values: Vec<String> = + partition_name.split('$').map(|s| s.to_string()).collect(); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_qualified_name(qualified_partition_name: &str) -> Result<Self> { + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for pair in qualified_partition_name.split('/') { + let parts: Vec<&str> = pair.splitn(2, '=').collect(); + if parts.len() != 2 { + return Err(Error::IllegalArgument { + message: format!( + "Invalid partition name format. Expected key=value, got: {}", + pair + ), + }); + } + keys.push(parts[0].to_string()); + values.push(parts[1].to_string()); + } + + Ok(Self { + partition_keys: keys, + partition_values: values, + }) + } + + pub fn get_partition_keys(&self) -> &[String] { + &self.partition_keys + } + + pub fn get_partition_values(&self) -> &[String] { + &self.partition_values + } + + pub fn to_partition_spec(&self) -> PartitionSpec { + let mut spec_map = HashMap::new(); + for (i, key) in self.partition_keys.iter().enumerate() { + spec_map.insert(key.clone(), self.partition_values[i].clone()); + } + PartitionSpec::new(spec_map) + } + + /// Generate the partition name for a partition table of specify partition values. + /// + /// The partition name is in the following format: value1$value2$...$valueN + pub fn get_partition_name(&self) -> String { + self.partition_values.join(PARTITION_SPEC_SEPARATOR) + } + + /// Returns the qualified partition name for a partition spec. + /// The format is: key1=value1/key2=value2/.../keyN=valueN + pub fn get_partition_qualified_name(&self) -> String { + let mut sb = String::new(); + for (i, key) in self.partition_keys.iter().enumerate() { + sb.push_str(key); + sb.push('='); + sb.push_str(&self.partition_values[i]); + if i != self.partition_keys.len() - 1 { + sb.push('/'); + } + } + sb + } + + pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result<bool> { + let other_partition_keys = other.get_partition_keys(); + let other_partition_values = other.get_partition_values(); + + let mut expected_partition_values = Vec::new(); + for other_partition_key in other_partition_keys { + let key_index = self + .partition_keys + .iter() + .position(|k| k == other_partition_key); + match key_index { + Some(idx) => expected_partition_values.push(self.partition_values[idx].clone()), + None => { + return Err(Error::IllegalArgument { + message: format!( + "table don't contains this partitionKey: {}", Review Comment: The error message in `ResolvedPartitionSpec::contains` is grammatically incorrect ("table don't contains this partitionKey") and mixes casing (partitionKey). Since this is surfaced to callers, please reword it to a clear, user-facing message (e.g., "table does not contain partition key: ..."). ```suggestion "table does not contain partition key: {}", ``` ########## crates/fluss/tests/integration/admin.rs: ########## @@ -223,6 +224,136 @@ mod admin_test { assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false); } + #[tokio::test] + async fn test_partition_apis() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection + .get_admin() + .await + .expect("Failed to get admin client"); + + let test_db_name = "test_partition_apis_db"; + let db_descriptor = DatabaseDescriptorBuilder::default() + .comment("Database for test_partition_apis") + .build(); + + admin + .create_database(test_db_name, true, Some(&db_descriptor)) + .await + .expect("Failed to create test database"); + + let test_table_name = "partitioned_table"; + let table_path = TablePath::new(test_db_name.to_string(), test_table_name.to_string()); + + let table_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("dt", DataTypes::string()) + .primary_key(vec!["id".to_string(), "dt".to_string()]) + .build() + .expect("Failed to build table schema"); + + let table_descriptor = TableDescriptor::builder() + .schema(table_schema) + .distributed_by(Some(3), vec!["id".to_string()]) + .partitioned_by(vec!["dt".to_string()]) + .property("table.replication.factor", "1") + .log_format(LogFormat::ARROW) + .kv_format(KvFormat::COMPACTED) + .build() + .expect("Failed to build table descriptor"); + + admin + .create_table(&table_path, &table_descriptor, true) + .await + .expect("Failed to create partitioned table"); + + let partitions = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partitions"); + assert!( + partitions.is_empty(), + "Expected no partitions initially, found {}", + partitions.len() + ); + + let mut partition_values = HashMap::new(); + partition_values.insert("dt".to_string(), "2024-01-15".to_string()); + let partition_spec = PartitionSpec::new(partition_values); + + admin + .create_partition(&table_path, &partition_spec, false) + .await + .expect("Failed to create partition"); + + let partitions = admin + .list_partition_infos(&table_path) + .await + .expect("Failed to list partitions"); + assert_eq!( + partitions.len(), + 1, + "Expected exactly one partition after creation" + ); + assert_eq!( + partitions[0].get_partition_name(), + "2024-01-15", + "Partition name mismatch" + ); + + // list with partial spec filter - should find the partition + let partitions_with_spec = admin + .list_partition_infos_with_spec(&table_path, Some(&partition_spec)) + .await + .expect("Failed to list partitions with spec"); Review Comment: The test claims to cover a "partial spec filter", but the table is only partitioned by a single key (dt) and the filter spec includes that key, so it only exercises exact-match filtering. To validate partial-spec semantics, consider partitioning by 2+ keys and querying with a subset of keys (e.g., create partitions for dt+region, then filter only by dt). ########## crates/fluss/src/metadata/table.rs: ########## @@ -697,26 +697,65 @@ impl TablePath { } } +/// A database name, table name and partition name combo. It's used to represent the physical path of +/// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table), the +/// partition_name will be not null, otherwise null. Review Comment: This doc comment uses Java-style "null" terminology ("partition_name will be not null, otherwise null"), but `partition_name` is an `Option<String>`. Please update the wording to reflect `Some`/`None` to avoid misleading documentation. ```suggestion /// a bucket. If the bucket belongs to a partition (i.e., the table is a partitioned table), /// `partition_name` will be `Some(...)`; otherwise, it will be `None`. ``` ########## crates/fluss/src/metadata/partition.rs: ########## @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionId; +use crate::error::{Error, Result}; +use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec}; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +/// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and +/// they need to be re-arranged to the correct order by comparing with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionSpec { + partition_spec: HashMap<String, String>, +} + +impl PartitionSpec { + pub fn new(partition_spec: HashMap<String, String>) -> Self { + Self { partition_spec } + } + + pub fn get_spec_map(&self) -> &HashMap<String, String> { + &self.partition_spec + } + + pub fn to_pb(&self) -> PbPartitionSpec { + PbPartitionSpec { + partition_key_values: self + .partition_spec + .iter() + .map(|(k, v)| PbKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + } + } + + pub fn from_pb(pb: &PbPartitionSpec) -> Self { + let partition_spec = pb + .partition_key_values + .iter() + .map(|kv| (kv.key.clone(), kv.value.clone())) + .collect(); + Self { partition_spec } + } +} + +impl Display for PartitionSpec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "PartitionSpec{{{:?}}}", self.partition_spec) + } +} + +/// Represents a partition, which is the resolved version of PartitionSpec. The partition +/// spec is re-arranged into the correct order by comparing it with a list of strictly ordered +/// partition keys. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ResolvedPartitionSpec { + partition_keys: Vec<String>, + partition_values: Vec<String>, +} + +pub const PARTITION_SPEC_SEPARATOR: &str = "$"; + +impl ResolvedPartitionSpec { + pub fn new(partition_keys: Vec<String>, partition_values: Vec<String>) -> Result<Self> { + if partition_keys.len() != partition_values.len() { + return Err(Error::IllegalArgument { + message: "The number of partition keys and partition values should be the same." + .to_string(), + }); + } + Ok(Self { + partition_keys, + partition_values, + }) + } + + pub fn from_partition_spec( + partition_keys: Vec<String>, + partition_spec: &PartitionSpec, + ) -> Self { + let partition_values = + Self::get_reordered_partition_values(&partition_keys, partition_spec); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_value(partition_key: String, partition_value: String) -> Self { + Self { + partition_keys: vec![partition_key], + partition_values: vec![partition_value], + } + } + + pub fn from_partition_name(partition_keys: Vec<String>, partition_name: &str) -> Self { + let partition_values: Vec<String> = + partition_name.split('$').map(|s| s.to_string()).collect(); + Self { + partition_keys, + partition_values, + } + } + + pub fn from_partition_qualified_name(qualified_partition_name: &str) -> Result<Self> { + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for pair in qualified_partition_name.split('/') { + let parts: Vec<&str> = pair.splitn(2, '=').collect(); + if parts.len() != 2 { + return Err(Error::IllegalArgument { + message: format!( + "Invalid partition name format. Expected key=value, got: {}", + pair + ), + }); + } + keys.push(parts[0].to_string()); + values.push(parts[1].to_string()); + } + + Ok(Self { + partition_keys: keys, + partition_values: values, + }) + } + + pub fn get_partition_keys(&self) -> &[String] { + &self.partition_keys + } + + pub fn get_partition_values(&self) -> &[String] { + &self.partition_values + } + + pub fn to_partition_spec(&self) -> PartitionSpec { + let mut spec_map = HashMap::new(); + for (i, key) in self.partition_keys.iter().enumerate() { + spec_map.insert(key.clone(), self.partition_values[i].clone()); + } + PartitionSpec::new(spec_map) + } + + /// Generate the partition name for a partition table of specify partition values. Review Comment: Doc comment grammar: "for a partition table of specify partition values" is ungrammatical/unclear. Please rephrase (e.g., "with specified partition values") to improve readability. ```suggestion /// Generate the partition name for a partition table with specified partition values. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
