This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9862026 feat: Partition Binding and safe PartitionSpecBuilder (#491)
9862026 is described below
commit 9862026b9f3c885a82e7b8b8da414c0c97436537
Author: Christian <[email protected]>
AuthorDate: Wed Aug 14 16:30:26 2024 +0200
feat: Partition Binding and safe PartitionSpecBuilder (#491)
* Initial commit
* Fixes
* Replace UnboundPartitionSpec Builder
* Fix tests, allow year, month day partition
* Comments
* typos
* Fix UnboundBuild setting partition_id
* Add test for unbound spec without partition ids
* Fix into_unbound fn name
* Split bound & unbound Partition builder, change add_partition_fields
* Improve comment
* Fix fmt
* Review fixes
* Remove partition_names() HashSet creation
---
crates/catalog/memory/src/catalog.rs | 3 +-
crates/catalog/rest/src/catalog.rs | 6 +-
crates/iceberg/src/catalog/mod.rs | 34 +-
.../src/expr/visitors/expression_evaluator.rs | 14 +-
.../expr/visitors/inclusive_metrics_evaluator.rs | 11 +-
.../src/expr/visitors/inclusive_projection.rs | 110 +--
crates/iceberg/src/spec/manifest.rs | 8 +-
crates/iceberg/src/spec/partition.rs | 854 ++++++++++++++++++++-
crates/iceberg/src/spec/table_metadata.rs | 78 +-
9 files changed, 945 insertions(+), 173 deletions(-)
diff --git a/crates/catalog/memory/src/catalog.rs
b/crates/catalog/memory/src/catalog.rs
index d86bbfe..44086f8 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -358,9 +358,8 @@ mod tests {
assert_eq!(metadata.current_schema().as_ref(), expected_schema);
- let expected_partition_spec = PartitionSpec::builder()
+ let expected_partition_spec = PartitionSpec::builder(expected_schema)
.with_spec_id(0)
- .with_fields(vec![])
.build()
.unwrap();
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 30f2e29..d74c8de 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -1467,13 +1467,13 @@ mod tests {
.properties(HashMap::from([("owner".to_string(),
"testx".to_string())]))
.partition_spec(
UnboundPartitionSpec::builder()
- .with_fields(vec![UnboundPartitionField::builder()
+ .add_partition_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
- .build()
- .unwrap(),
+ .unwrap()
+ .build(),
)
.sort_order(
SortOrder::builder()
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index bc98772..aa2311b 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -229,7 +229,7 @@ pub struct TableCreation {
/// The schema of the table.
pub schema: Schema,
/// The partition spec of the table, could be None.
- #[builder(default, setter(strip_option))]
+ #[builder(default, setter(strip_option, into))]
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
@@ -476,7 +476,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, Summary,
- TableMetadataBuilder, Transform, Type, UnboundPartitionField,
UnboundPartitionSpec,
+ TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement,
TableUpdate};
@@ -820,29 +820,13 @@ mod tests {
"#,
TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
- .with_unbound_partition_field(
- UnboundPartitionField::builder()
- .source_id(4)
- .name("ts_day".to_string())
- .transform(Transform::Day)
- .build(),
- )
- .with_unbound_partition_field(
- UnboundPartitionField::builder()
- .source_id(1)
- .name("id_bucket".to_string())
- .transform(Transform::Bucket(16))
- .build(),
- )
- .with_unbound_partition_field(
- UnboundPartitionField::builder()
- .source_id(2)
- .name("id_truncate".to_string())
- .transform(Transform::Truncate(4))
- .build(),
- )
- .build()
- .unwrap(),
+ .add_partition_field(4, "ts_day".to_string(),
Transform::Day)
+ .unwrap()
+ .add_partition_field(1, "id_bucket".to_string(),
Transform::Bucket(16))
+ .unwrap()
+ .add_partition_field(2, "id_truncate".to_string(),
Transform::Truncate(4))
+ .unwrap()
+ .build(),
},
);
}
diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
index 3700a9b..d8a47ec 100644
--- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -258,8 +258,9 @@ mod tests {
UnaryExpression,
};
use crate::spec::{
- DataContentType, DataFile, DataFileFormat, Datum, Literal,
NestedField, PartitionField,
- PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef,
Struct, Transform, Type,
+ DataContentType, DataFile, DataFileFormat, Datum, Literal,
NestedField, PartitionSpec,
+ PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform,
Type,
+ UnboundPartitionField,
};
use crate::Result;
@@ -274,14 +275,15 @@ mod tests {
))])
.build()?;
- let spec = PartitionSpec::builder()
+ let spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
+ .add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
- .field_id(1)
+ .partition_id(1)
.transform(Transform::Identity)
.build()])
+ .unwrap()
.build()
.unwrap();
@@ -298,7 +300,7 @@ mod tests {
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Schema::builder()
- .with_schema_id(partition_spec.spec_id)
+ .with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?;
diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
index 430ebfc..e8e7337 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
@@ -495,8 +495,8 @@ mod test {
UnaryExpression,
};
use crate::spec::{
- DataContentType, DataFile, DataFileFormat, Datum, NestedField,
PartitionField,
- PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type,
+ DataContentType, DataFile, DataFileFormat, Datum, NestedField,
PartitionSpec,
+ PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
};
const INT_MIN_VALUE: i32 = 30;
@@ -1656,14 +1656,15 @@ mod test {
.unwrap();
let table_schema_ref = Arc::new(table_schema);
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&table_schema_ref)
.with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
+ .add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
- .field_id(1)
+ .partition_id(1)
.transform(Transform::Identity)
.build()])
+ .unwrap()
.build()
.unwrap();
let partition_spec_ref = Arc::new(partition_spec);
diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
index 9cfbb4f..716f086 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
@@ -40,7 +40,7 @@ impl InclusiveProjection {
fn get_parts_for_field_id(&mut self, field_id: i32) ->
&Vec<PartitionField> {
if let std::collections::hash_map::Entry::Vacant(e) =
self.cached_parts.entry(field_id) {
let mut parts: Vec<PartitionField> = vec![];
- for partition_spec_field in &self.partition_spec.fields {
+ for partition_spec_field in self.partition_spec.fields() {
if partition_spec_field.source_id == field_id {
parts.push(partition_spec_field.clone())
}
@@ -236,6 +236,7 @@ mod tests {
use crate::expr::{Bind, Predicate, Reference};
use crate::spec::{
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType,
Schema, Transform, Type,
+ UnboundPartitionField,
};
fn build_test_schema() -> Schema {
@@ -265,9 +266,8 @@ mod tests {
fn test_inclusive_projection_logic_ops() {
let schema = build_test_schema();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![])
.build()
.unwrap();
@@ -296,14 +296,17 @@ mod tests {
fn test_inclusive_projection_identity_transform() {
let schema = build_test_schema();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
- .source_id(1)
- .name("a".to_string())
- .field_id(1)
- .transform(Transform::Identity)
- .build()])
+ .add_unbound_field(
+ UnboundPartitionField::builder()
+ .source_id(1)
+ .name("a".to_string())
+ .partition_id(1)
+ .transform(Transform::Identity)
+ .build(),
+ )
+ .unwrap()
.build()
.unwrap();
@@ -330,30 +333,29 @@ mod tests {
fn test_inclusive_projection_date_transforms() {
let schema = build_test_schema();
- let partition_spec = PartitionSpec::builder()
- .with_spec_id(1)
- .with_fields(vec![
- PartitionField::builder()
- .source_id(2)
- .name("year".to_string())
- .field_id(2)
- .transform(Transform::Year)
- .build(),
- PartitionField::builder()
- .source_id(2)
- .name("month".to_string())
- .field_id(2)
- .transform(Transform::Month)
- .build(),
- PartitionField::builder()
- .source_id(2)
- .name("day".to_string())
- .field_id(2)
- .transform(Transform::Day)
- .build(),
- ])
- .build()
- .unwrap();
+ let partition_spec = PartitionSpec {
+ spec_id: 1,
+ fields: vec![
+ PartitionField {
+ source_id: 2,
+ name: "year".to_string(),
+ field_id: 1000,
+ transform: Transform::Year,
+ },
+ PartitionField {
+ source_id: 2,
+ name: "month".to_string(),
+ field_id: 1001,
+ transform: Transform::Month,
+ },
+ PartitionField {
+ source_id: 2,
+ name: "day".to_string(),
+ field_id: 1002,
+ transform: Transform::Day,
+ },
+ ],
+ };
let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
@@ -378,14 +380,17 @@ mod tests {
fn test_inclusive_projection_truncate_transform() {
let schema = build_test_schema();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
- .source_id(3)
- .name("name".to_string())
- .field_id(3)
- .transform(Transform::Truncate(4))
- .build()])
+ .add_unbound_field(
+ UnboundPartitionField::builder()
+ .source_id(3)
+ .name("name_truncate".to_string())
+ .partition_id(3)
+ .transform(Transform::Truncate(4))
+ .build(),
+ )
+ .unwrap()
.build()
.unwrap();
@@ -398,7 +403,7 @@ mod tests {
// applying InclusiveProjection to bound_predicate
// should result in the 'name STARTS WITH "Testy McTest"'
- // predicate being transformed to 'name STARTS WITH "Test"',
+ // predicate being transformed to 'name_truncate STARTS WITH "Test"',
// since a `Truncate(4)` partition will map values of
// name that start with "Testy McTest" into a partition
// for values of name that start with the first four letters
@@ -406,7 +411,7 @@ mod tests {
let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();
- let expected = "name STARTS WITH \"Test\"".to_string();
+ let expected = "name_truncate STARTS WITH \"Test\"".to_string();
assert_eq!(result.to_string(), expected)
}
@@ -415,14 +420,17 @@ mod tests {
fn test_inclusive_projection_bucket_transform() {
let schema = build_test_schema();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![PartitionField::builder()
- .source_id(1)
- .name("a".to_string())
- .field_id(1)
- .transform(Transform::Bucket(7))
- .build()])
+ .add_unbound_field(
+ UnboundPartitionField::builder()
+ .source_id(1)
+ .name("a_bucket[7]".to_string())
+ .partition_id(1)
+ .transform(Transform::Bucket(7))
+ .build(),
+ )
+ .unwrap()
.build()
.unwrap();
@@ -440,7 +448,7 @@ mod tests {
let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();
- let expected = "a = 2".to_string();
+ let expected = "a_bucket[7] = 2".to_string();
assert_eq!(result.to_string(), expected)
}
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index e2f8251..14b8a80 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -227,14 +227,14 @@ impl ManifestWriter {
)?;
avro_writer.add_user_metadata(
"partition-spec".to_string(),
- to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
+ to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| {
Error::new(ErrorKind::DataInvalid, "Fail to serialize
partition spec")
.with_source(err)
})?,
)?;
avro_writer.add_user_metadata(
"partition-spec-id".to_string(),
- manifest.metadata.partition_spec.spec_id.to_string(),
+ manifest.metadata.partition_spec.spec_id().to_string(),
)?;
avro_writer.add_user_metadata(
"format-version".to_string(),
@@ -300,12 +300,12 @@ impl ManifestWriter {
self.output.write(Bytes::from(content)).await?;
let partition_summary =
-
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
+
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());
Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_length: length as i64,
- partition_spec_id: manifest.metadata.partition_spec.spec_id,
+ partition_spec_id: manifest.metadata.partition_spec.spec_id(),
content: manifest.metadata.content,
// sequence_number and min_sequence_number with
UNASSIGNED_SEQUENCE_NUMBER will be replace with
// real sequence number in `ManifestListWriter`.
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index f1244e4..055b48e 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -25,7 +25,10 @@ use typed_builder::TypedBuilder;
use super::transform::Transform;
use super::{NestedField, Schema, StructType};
-use crate::{Error, ErrorKind};
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
+pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
/// Reference to [`PartitionSpec`].
pub type PartitionSpecRef = Arc<PartitionSpec>;
@@ -44,22 +47,37 @@ pub struct PartitionField {
pub transform: Transform,
}
+impl PartitionField {
+ /// To unbound partition field
+ pub fn into_unbound(self) -> UnboundPartitionField {
+ self.into()
+ }
+}
+
/// Partition spec that defines how to produce a tuple of partition values
from a record.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default,
Builder)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
#[serde(rename_all = "kebab-case")]
-#[builder(setter(prefix = "with"))]
pub struct PartitionSpec {
/// Identifier for PartitionSpec
- pub spec_id: i32,
+ pub(crate) spec_id: i32,
/// Details of the partition spec
- #[builder(setter(each(name = "with_partition_field")))]
- pub fields: Vec<PartitionField>,
+ pub(crate) fields: Vec<PartitionField>,
}
impl PartitionSpec {
/// Create partition spec builer
- pub fn builder() -> PartitionSpecBuilder {
- PartitionSpecBuilder::default()
+ pub fn builder(schema: &Schema) -> PartitionSpecBuilder {
+ PartitionSpecBuilder::new(schema)
+ }
+
+ /// Spec id of the partition spec
+ pub fn spec_id(&self) -> i32 {
+ self.spec_id
+ }
+
+ /// Fields of the partition spec
+ pub fn fields(&self) -> &[PartitionField] {
+ &self.fields
}
/// Returns if the partition spec is unpartitioned.
@@ -74,7 +92,7 @@ impl PartitionSpec {
}
/// Returns the partition type of this partition spec.
- pub fn partition_type(&self, schema: &Schema) -> Result<StructType, Error>
{
+ pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
let mut fields = Vec::with_capacity(self.fields.len());
for partition_field in &self.fields {
let field = schema
@@ -96,6 +114,13 @@ impl PartitionSpec {
}
Ok(StructType::new(fields))
}
+
+ /// Turn this partition spec into an unbound partition spec.
+ ///
+ /// The `field_id` is retained as `partition_id` in the unbound partition
spec.
+ pub fn to_unbound(self) -> UnboundPartitionSpec {
+ self.into()
+ }
}
/// Reference to [`UnboundPartitionSpec`].
@@ -117,16 +142,13 @@ pub struct UnboundPartitionField {
}
/// Unbound partition spec can be built without a schema and later bound to a
schema.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default,
Builder)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
#[serde(rename_all = "kebab-case")]
-#[builder(setter(prefix = "with"))]
pub struct UnboundPartitionSpec {
/// Identifier for PartitionSpec
- #[builder(default, setter(strip_option))]
- pub spec_id: Option<i32>,
+ pub(crate) spec_id: Option<i32>,
/// Details of the partition spec
- #[builder(setter(each(name = "with_unbound_partition_field")))]
- pub fields: Vec<UnboundPartitionField>,
+ pub(crate) fields: Vec<UnboundPartitionField>,
}
impl UnboundPartitionSpec {
@@ -134,6 +156,424 @@ impl UnboundPartitionSpec {
pub fn builder() -> UnboundPartitionSpecBuilder {
UnboundPartitionSpecBuilder::default()
}
+
+ /// Bind this unbound partition spec to a schema.
+ pub fn bind(self, schema: &Schema) -> Result<PartitionSpec> {
+ PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
+ }
+
+ /// Spec id of the partition spec
+ pub fn spec_id(&self) -> Option<i32> {
+ self.spec_id
+ }
+
+ /// Fields of the partition spec
+ pub fn fields(&self) -> &[UnboundPartitionField] {
+ &self.fields
+ }
+}
+
+impl From<PartitionField> for UnboundPartitionField {
+ fn from(field: PartitionField) -> Self {
+ UnboundPartitionField {
+ source_id: field.source_id,
+ partition_id: Some(field.field_id),
+ name: field.name,
+ transform: field.transform,
+ }
+ }
+}
+
+impl From<PartitionSpec> for UnboundPartitionSpec {
+ fn from(spec: PartitionSpec) -> Self {
+ UnboundPartitionSpec {
+ spec_id: Some(spec.spec_id),
+ fields: spec.fields.into_iter().map(Into::into).collect(),
+ }
+ }
+}
+
+/// Create a new UnboundPartitionSpec
+#[derive(Debug, Default)]
+pub struct UnboundPartitionSpecBuilder {
+ spec_id: Option<i32>,
+ fields: Vec<UnboundPartitionField>,
+}
+
+impl UnboundPartitionSpecBuilder {
+ /// Create a new partition spec builder with the given schema.
+ pub fn new() -> Self {
+ Self {
+ spec_id: None,
+ fields: vec![],
+ }
+ }
+
+ /// Set the spec id for the partition spec.
+ pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+ self.spec_id = Some(spec_id);
+ self
+ }
+
+ /// Add a new partition field to the partition spec from an unbound
partition field.
+ pub fn add_partition_field(
+ self,
+ source_id: i32,
+ target_name: impl ToString,
+ transformation: Transform,
+ ) -> Result<Self> {
+ let field = UnboundPartitionField {
+ source_id,
+ partition_id: None,
+ name: target_name.to_string(),
+ transform: transformation,
+ };
+ self.add_partition_field_internal(field)
+ }
+
+ /// Add multiple partition fields to the partition spec.
+ pub fn add_partition_fields(
+ self,
+ fields: impl IntoIterator<Item = UnboundPartitionField>,
+ ) -> Result<Self> {
+ let mut builder = self;
+ for field in fields {
+ builder = builder.add_partition_field_internal(field)?;
+ }
+ Ok(builder)
+ }
+
+ fn add_partition_field_internal(mut self, field: UnboundPartitionField) ->
Result<Self> {
+ self.check_name_set_and_unique(&field.name)?;
+ self.check_for_redundant_partitions(field.source_id,
&field.transform)?;
+ if let Some(partition_id) = field.partition_id {
+ self.check_partition_id_unique(partition_id)?;
+ }
+ self.fields.push(field);
+ Ok(self)
+ }
+
+ /// Build the unbound partition spec.
+ pub fn build(self) -> UnboundPartitionSpec {
+ UnboundPartitionSpec {
+ spec_id: self.spec_id,
+ fields: self.fields,
+ }
+ }
+}
+
+/// Create valid partition specs for a given schema.
+#[derive(Debug)]
+pub struct PartitionSpecBuilder<'a> {
+ spec_id: Option<i32>,
+ last_assigned_field_id: i32,
+ fields: Vec<UnboundPartitionField>,
+ schema: &'a Schema,
+}
+
+impl<'a> PartitionSpecBuilder<'a> {
+ /// Create a new partition spec builder with the given schema.
+ pub fn new(schema: &'a Schema) -> Self {
+ Self {
+ spec_id: None,
+ fields: vec![],
+ last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+ schema,
+ }
+ }
+
+ /// Create a new partition spec builder from an existing unbound partition
spec.
+ pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema)
-> Result<Self> {
+ let mut builder =
+
Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
+
+ for field in unbound.fields {
+ builder = builder.add_unbound_field(field)?;
+ }
+ Ok(builder)
+ }
+
+ /// Set the last assigned field id for the partition spec.
+ ///
+ /// Set this field when a new partition spec is created for an existing
TableMetaData.
+ /// As `field_id` must be unique in V2 metadata, this should be set to
+ /// the highest field id used previously.
+ pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32)
-> Self {
+ self.last_assigned_field_id = last_assigned_field_id;
+ self
+ }
+
+ /// Set the spec id for the partition spec.
+ pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+ self.spec_id = Some(spec_id);
+ self
+ }
+
+ /// Add a new partition field to the partition spec.
+ pub fn add_partition_field(
+ self,
+ source_name: impl AsRef<str>,
+ target_name: impl Into<String>,
+ transform: Transform,
+ ) -> Result<Self> {
+ let source_id = self
+ .schema
+ .field_by_name(source_name.as_ref())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot find source column with name: {} in schema",
+ source_name.as_ref()
+ ),
+ )
+ })?
+ .id;
+ let field = UnboundPartitionField {
+ source_id,
+ partition_id: None,
+ name: target_name.into(),
+ transform,
+ };
+
+ self.add_unbound_field(field)
+ }
+
+ /// Add a new partition field to the partition spec.
+ ///
+ /// If `partition_id` is set, it is used as the field id.
+ /// Otherwise, a new `field_id` is assigned.
+ pub fn add_unbound_field(mut self, field: UnboundPartitionField) ->
Result<Self> {
+ self.check_name_set_and_unique(&field.name)?;
+ self.check_for_redundant_partitions(field.source_id,
&field.transform)?;
+ Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
+ Self::check_transform_compatibility(&field, self.schema)?;
+ if let Some(partition_id) = field.partition_id {
+ self.check_partition_id_unique(partition_id)?;
+ }
+
+ // Non-fallible from here
+ self.fields.push(field);
+ Ok(self)
+ }
+
+ /// Wrapper around `with_unbound_fields` to add multiple partition fields.
+ pub fn add_unbound_fields(
+ self,
+ fields: impl IntoIterator<Item = UnboundPartitionField>,
+ ) -> Result<Self> {
+ let mut builder = self;
+ for field in fields {
+ builder = builder.add_unbound_field(field)?;
+ }
+ Ok(builder)
+ }
+
+ /// Build a bound partition spec with the given schema.
+ pub fn build(self) -> Result<PartitionSpec> {
+ let fields = Self::set_field_ids(self.fields,
self.last_assigned_field_id)?;
+ Ok(PartitionSpec {
+ spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
+ fields,
+ })
+ }
+
+ fn set_field_ids(
+ fields: Vec<UnboundPartitionField>,
+ last_assigned_field_id: i32,
+ ) -> Result<Vec<PartitionField>> {
+ let mut last_assigned_field_id = last_assigned_field_id;
+ // Already assigned partition ids. If we see one of these during
iteration,
+ // we skip it.
+ let assigned_ids = fields
+ .iter()
+ .filter_map(|f| f.partition_id)
+ .collect::<std::collections::HashSet<_>>();
+
+ fn _check_add_1(prev: i32) -> Result<i32> {
+ prev.checked_add(1).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot assign more partition ids. Overflow.",
+ )
+ })
+ }
+
+ let mut bound_fields = Vec::with_capacity(fields.len());
+ for field in fields.into_iter() {
+ let partition_id = if let Some(partition_id) = field.partition_id {
+ last_assigned_field_id = std::cmp::max(last_assigned_field_id,
partition_id);
+ partition_id
+ } else {
+ last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
+ while assigned_ids.contains(&last_assigned_field_id) {
+ last_assigned_field_id =
_check_add_1(last_assigned_field_id)?;
+ }
+ last_assigned_field_id
+ };
+
+ bound_fields.push(PartitionField {
+ source_id: field.source_id,
+ field_id: partition_id,
+ name: field.name,
+ transform: field.transform,
+ })
+ }
+
+ Ok(bound_fields)
+ }
+
+ /// Ensure that the partition name is unique among columns in the schema.
+ /// Duplicate names are allowed if:
+ /// 1. The column is sourced from the column with the same name.
+ /// 2. AND the transformation is identity
+ fn check_name_does_not_collide_with_schema(
+ field: &UnboundPartitionField,
+ schema: &Schema,
+ ) -> Result<()> {
+ match schema.field_by_name(field.name.as_str()) {
+ Some(schema_collision) => {
+ if field.transform == Transform::Identity {
+ if schema_collision.id == field.source_id {
+ Ok(())
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot create identity partition sourced from
different field in schema. Field name '{}' has id `{}` in schema but partition
source id is `{}`",
+ field.name, schema_collision.id,
field.source_id
+ ),
+ ))
+ }
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot create partition with name: '{}' that
conflicts with schema field and is not an identity transform.",
+ field.name
+ ),
+ ))
+ }
+ }
+ None => Ok(()),
+ }
+ }
+
+ /// Ensure that the transformation of the field is compatible with type of
the field
+ /// in the schema. Implicitly also checks if the source field exists in
the schema.
+ fn check_transform_compatibility(field: &UnboundPartitionField, schema:
&Schema) -> Result<()> {
+ let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot find partition source field with id `{}` in
schema",
+ field.source_id
+ ),
+ )
+ })?;
+
+ if field.transform != Transform::Void {
+ if !schema_field.field_type.is_primitive() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot partition by non-primitive source field:
'{}'.",
+ schema_field.field_type
+ ),
+ ));
+ }
+
+ if field
+ .transform
+ .result_type(&schema_field.field_type)
+ .is_err()
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid source type: '{}' for transform: '{}'.",
+ schema_field.field_type,
+ field.transform.dedup_name()
+ ),
+ ));
+ }
+ }
+
+ Ok(())
+ }
+}
+
+/// Contains checks that are common to both PartitionSpecBuilder and
UnboundPartitionSpecBuilder
+trait CorePartitionSpecValidator {
+ /// Ensure that the partition name is unique among the partition fields
and is not empty.
+ fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
+ if name.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot use empty partition name",
+ ));
+ }
+
+ if self.fields().iter().any(|f| f.name == name) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot use partition name more than once: {}", name),
+ ));
+ }
+ Ok(())
+ }
+
+ /// For a single source-column transformations must be unique.
+ fn check_for_redundant_partitions(&self, source_id: i32, transform:
&Transform) -> Result<()> {
+ let collision = self.fields().iter().find(|f| {
+ f.source_id == source_id && f.transform.dedup_name() ==
transform.dedup_name()
+ });
+
+ if let Some(collision) = collision {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add redundant partition with source id `{}`
and transform `{}`. A partition with the same source id and transform already
exists with name `{}`",
+ source_id, transform.dedup_name(), collision.name
+ ),
+ ))
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Check field / partition_id unique within the partition spec if set
+ fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
+ if self
+ .fields()
+ .iter()
+ .any(|f| f.partition_id == Some(field_id))
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot use field id more than once in one PartitionSpec:
{}",
+ field_id
+ ),
+ ));
+ }
+
+ Ok(())
+ }
+
+ fn fields(&self) -> &Vec<UnboundPartitionField>;
+}
+
+impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> {
+ fn fields(&self) -> &Vec<UnboundPartitionField> {
+ &self.fields
+ }
+}
+
+impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
+ fn fields(&self) -> &Vec<UnboundPartitionField> {
+ &self.fields
+ }
}
#[cfg(test)]
@@ -184,9 +624,21 @@ mod tests {
#[test]
fn test_is_unpartitioned() {
- let partition_spec = PartitionSpec::builder()
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_fields(vec![])
.build()
.unwrap();
assert!(
@@ -194,23 +646,20 @@ mod tests {
"Empty partition spec should be unpartitioned"
);
- let partition_spec = PartitionSpec::builder()
- .with_partition_field(
- PartitionField::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
+ .add_unbound_fields(vec![
+ UnboundPartitionField::builder()
.source_id(1)
- .field_id(1)
.name("id".to_string())
.transform(Transform::Identity)
.build(),
- )
- .with_partition_field(
- PartitionField::builder()
+ UnboundPartitionField::builder()
.source_id(2)
- .field_id(2)
- .name("name".to_string())
+ .name("name_string".to_string())
.transform(Transform::Void)
.build(),
- )
+ ])
+ .unwrap()
.with_spec_id(1)
.build()
.unwrap();
@@ -219,24 +668,21 @@ mod tests {
"Partition spec with one non void transform should not be
unpartitioned"
);
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
- .with_partition_field(
- PartitionField::builder()
+ .add_unbound_fields(vec![
+ UnboundPartitionField::builder()
.source_id(1)
- .field_id(1)
- .name("id".to_string())
+ .name("id_void".to_string())
.transform(Transform::Void)
.build(),
- )
- .with_partition_field(
- PartitionField::builder()
+ UnboundPartitionField::builder()
.source_id(2)
- .field_id(2)
- .name("name".to_string())
+ .name("name_void".to_string())
.transform(Transform::Void)
.build(),
- )
+ ])
+ .unwrap()
.build()
.unwrap();
assert!(
@@ -489,4 +935,336 @@ mod tests {
assert!(partition_spec.partition_type(&schema).is_err());
}
+
+ #[test]
+ fn test_builder_disallow_duplicate_names() {
+ UnboundPartitionSpec::builder()
+ .add_partition_field(1, "ts_day".to_string(), Transform::Day)
+ .unwrap()
+ .add_partition_field(2, "ts_day".to_string(), Transform::Day)
+ .unwrap_err();
+ }
+
+ #[test]
+ fn test_builder_disallow_duplicate_field_ids() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+ PartitionSpec::builder(&schema)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ partition_id: Some(1000),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ partition_id: Some(1000),
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap_err();
+ }
+
+ #[test]
+ fn test_builder_auto_assign_field_ids() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ NestedField::required(
+ 3,
+ "ts",
+ Type::Primitive(crate::spec::PrimitiveType::Timestamp),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ partition_id: Some(1012),
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ name: "name_void".to_string(),
+ transform: Transform::Void,
+ partition_id: None,
+ })
+ .unwrap()
+ // Should keep its ID even if its lower
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 3,
+ name: "year".to_string(),
+ transform: Transform::Year,
+ partition_id: Some(1),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(1012, spec.fields[0].field_id);
+ assert_eq!(1013, spec.fields[1].field_id);
+ assert_eq!(1, spec.fields[2].field_id);
+ }
+
+ #[test]
+ fn test_builder_valid_schema() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .build()
+ .unwrap();
+
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(spec, PartitionSpec {
+ spec_id: 1,
+ fields: vec![PartitionField {
+ source_id: 1,
+ field_id: 1000,
+ name: "id_bucket[16]".to_string(),
+ transform: Transform::Bucket(16),
+ }]
+ });
+ }
+
+ #[test]
+ fn test_collision_with_schema_name() {
+ let schema = Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(crate::spec::PrimitiveType::Int),
+ )
+ .into()])
+ .build()
+ .unwrap();
+
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .build()
+ .unwrap();
+
+ let err = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap_err();
+ assert!(err.message().contains("conflicts with schema"))
+ }
+
+ #[test]
+ fn test_builder_collision_is_ok_for_identity_transforms() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "number",
+ Type::Primitive(crate::spec::PrimitiveType::Int),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .build()
+ .unwrap();
+
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // Not OK for different source id
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ partition_id: None,
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap_err();
+ }
+
+ #[test]
+ fn test_builder_all_source_ids_must_exist() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ NestedField::required(
+ 3,
+ "ts",
+ Type::Primitive(crate::spec::PrimitiveType::Timestamp),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ // Valid
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_fields(vec![
+ UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ },
+ UnboundPartitionField {
+ source_id: 2,
+ partition_id: None,
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ },
+ ])
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // Invalid
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_fields(vec![
+ UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ },
+ UnboundPartitionField {
+ source_id: 4,
+ partition_id: None,
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ },
+ ])
+ .unwrap_err();
+ }
+
+ #[test]
+ fn test_builder_disallows_redundant() {
+ let err = UnboundPartitionSpec::builder()
+ .with_spec_id(1)
+ .add_partition_field(1, "id_bucket[16]".to_string(),
Transform::Bucket(16))
+ .unwrap()
+ .add_partition_field(
+ 1,
+ "id_bucket_with_other_name".to_string(),
+ Transform::Bucket(16),
+ )
+ .unwrap_err();
+ assert!(err.message().contains("redundant partition"));
+ }
+
+ #[test]
+ fn test_builder_incompatible_transforms_disallowed() {
+ let schema = Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(crate::spec::PrimitiveType::Int),
+ )
+ .into()])
+ .build()
+ .unwrap();
+
+ PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id_year".to_string(),
+ transform: Transform::Year,
+ })
+ .unwrap_err();
+ }
+
+ #[test]
+ fn test_build_unbound_specs_without_partition_id() {
+ let spec = UnboundPartitionSpec::builder()
+ .with_spec_id(1)
+ .add_partition_fields(vec![UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id_bucket[16]".to_string(),
+ transform: Transform::Bucket(16),
+ }])
+ .unwrap()
+ .build();
+
+ assert_eq!(spec, UnboundPartitionSpec {
+ spec_id: Some(1),
+ fields: vec![UnboundPartitionField {
+ source_id: 1,
+ partition_id: None,
+ name: "id_bucket[16]".to_string(),
+ transform: Transform::Bucket(16),
+ }]
+ });
+ }
}
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index 53bcabb..cd7f046 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -32,12 +32,12 @@ use uuid::Uuid;
use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention};
use super::{
PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef,
SortOrder, SortOrderRef,
+ DEFAULT_PARTITION_SPEC_ID,
};
use crate::error::{timestamp_ms_to_utc, Result};
use crate::{Error, ErrorKind, TableCreation};
static MAIN_BRANCH: &str = "main";
-static DEFAULT_SPEC_ID: i32 = 0;
static DEFAULT_SORT_ORDER_ID: i64 = 0;
pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
@@ -187,8 +187,8 @@ impl TableMetadata {
/// Get default partition spec
#[inline]
pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> {
- if self.default_spec_id == DEFAULT_SPEC_ID {
- self.partition_spec_by_id(DEFAULT_SPEC_ID)
+ if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID {
+ self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID)
} else {
Some(
self.partition_spec_by_id(self.default_spec_id)
@@ -308,9 +308,9 @@ impl TableMetadataBuilder {
))
}
None => HashMap::from([(
- DEFAULT_SPEC_ID,
+ DEFAULT_PARTITION_SPEC_ID,
Arc::new(PartitionSpec {
- spec_id: DEFAULT_SPEC_ID,
+ spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: vec![],
}),
)]),
@@ -347,7 +347,7 @@ impl TableMetadataBuilder {
current_schema_id: schema.schema_id(),
schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]),
partition_specs,
- default_spec_id: DEFAULT_SPEC_ID,
+ default_spec_id: DEFAULT_PARTITION_SPEC_ID,
last_partition_id: 0,
properties,
current_snapshot_id: None,
@@ -391,8 +391,8 @@ pub(super) mod _serde {
use uuid::Uuid;
use super::{
- FormatVersion, MetadataLog, SnapshotLog, TableMetadata,
DEFAULT_SORT_ORDER_ID,
- DEFAULT_SPEC_ID, MAIN_BRANCH,
+ FormatVersion, MetadataLog, SnapshotLog, TableMetadata,
DEFAULT_PARTITION_SPEC_ID,
+ DEFAULT_SORT_ORDER_ID, MAIN_BRANCH,
};
use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
@@ -568,7 +568,7 @@ pub(super) mod _serde {
value
.partition_specs
.into_iter()
- .map(|x| (x.spec_id, Arc::new(x))),
+ .map(|x| (x.spec_id(), Arc::new(x))),
),
default_spec_id: value.default_spec_id,
last_partition_id: value.last_partition_id,
@@ -643,12 +643,12 @@ pub(super) mod _serde {
.partition_specs
.unwrap_or_else(|| {
vec![PartitionSpec {
- spec_id: DEFAULT_SPEC_ID,
+ spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: value.partition_spec,
}]
})
.into_iter()
- .map(|x| (x.spec_id, Arc::new(x))),
+ .map(|x| (x.spec_id(), Arc::new(x))),
);
Ok(TableMetadata {
format_version: FormatVersion::V1,
@@ -808,7 +808,7 @@ pub(super) mod _serde {
partition_spec: v
.partition_specs
.get(&v.default_spec_id)
- .map(|x| x.fields.clone())
+ .map(|x| x.fields().to_vec())
.unwrap_or_default(),
partition_specs: Some(
v.partition_specs
@@ -935,7 +935,7 @@ mod tests {
use crate::spec::{
NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema,
Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
SortField, SortOrder,
- Summary, Transform, Type,
+ Summary, Transform, Type, UnboundPartitionField,
};
use crate::TableCreation;
@@ -1020,16 +1020,15 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder()
- .with_spec_id(1)
- .with_partition_field(PartitionField {
+ let partition_spec = PartitionSpec {
+ spec_id: 1,
+ fields: vec![PartitionField {
name: "ts_day".to_string(),
transform: Transform::Day,
source_id: 4,
field_id: 1000,
- })
- .build()
- .unwrap();
+ }],
+ };
let expected = TableMetadata {
format_version: FormatVersion::V2,
@@ -1179,14 +1178,10 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(0)
- .with_partition_field(PartitionField {
- name: "vendor_id".to_string(),
- transform: Transform::Identity,
- source_id: 1,
- field_id: 1000,
- })
+ .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
+ .unwrap()
.build()
.unwrap();
@@ -1292,14 +1287,15 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema1)
.with_spec_id(0)
- .with_partition_field(PartitionField {
+ .add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
- field_id: 1000,
+ partition_id: Some(1000),
})
+ .unwrap()
.build()
.unwrap();
@@ -1414,14 +1410,15 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(0)
- .with_partition_field(PartitionField {
+ .add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
- field_id: 1000,
+ partition_id: Some(1000),
})
+ .unwrap()
.build()
.unwrap();
@@ -1493,14 +1490,15 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder()
+ let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(0)
- .with_partition_field(PartitionField {
+ .add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
- field_id: 1000,
+ partition_id: Some(1000),
})
+ .unwrap()
.build()
.unwrap();
@@ -1686,10 +1684,12 @@ mod tests {
table_metadata.partition_specs,
HashMap::from([(
0,
- Arc::new(PartitionSpec {
- spec_id: 0,
- fields: vec![]
- })
+ Arc::new(
+
PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap())
+ .with_spec_id(0)
+ .build()
+ .unwrap()
+ )
)])
);
assert_eq!(