This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6e0bcf56 feat: Safer PartitionSpec & SchemalessPartitionSpec (#645)
6e0bcf56 is described below
commit 6e0bcf56028e0d20d5ceeedf87dbb3db7c929ee3
Author: Christian <[email protected]>
AuthorDate: Sat Nov 2 21:10:03 2024 -0700
feat: Safer PartitionSpec & SchemalessPartitionSpec (#645)
* SchemalessPartitionSpec
* Traits -> Enum
* Remove PartitionSpec enum
* Address comments
---
crates/catalog/memory/src/catalog.rs | 6 +-
crates/catalog/sql/src/catalog.rs | 7 +-
.../src/expr/visitors/expression_evaluator.rs | 136 +++---
.../expr/visitors/inclusive_metrics_evaluator.rs | 13 +-
.../src/expr/visitors/inclusive_projection.rs | 155 +++++--
crates/iceberg/src/io/object_cache.rs | 2 +-
crates/iceberg/src/scan.rs | 8 +-
crates/iceberg/src/spec/manifest.rs | 516 ++++++++++-----------
crates/iceberg/src/spec/partition.rs | 433 ++++++++++++-----
crates/iceberg/src/spec/table_metadata.rs | 315 ++++++++-----
.../src/writer/file_writer/location_generator.rs | 5 +-
11 files changed, 969 insertions(+), 627 deletions(-)
diff --git a/crates/catalog/memory/src/catalog.rs
b/crates/catalog/memory/src/catalog.rs
index 1da04482..e4192aae 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -283,7 +283,7 @@ mod tests {
use std::iter::FromIterator;
use iceberg::io::FileIOBuilder;
- use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
+ use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType,
Schema, SortOrder, Type};
use regex::Regex;
use tempfile::TempDir;
@@ -355,7 +355,7 @@ mod tests {
assert_eq!(metadata.current_schema().as_ref(), expected_schema);
- let expected_partition_spec = PartitionSpec::builder(expected_schema)
+ let expected_partition_spec =
BoundPartitionSpec::builder((*expected_schema).clone())
.with_spec_id(0)
.build()
.unwrap();
@@ -365,7 +365,7 @@ mod tests {
.partition_specs_iter()
.map(|p| p.as_ref())
.collect_vec(),
- vec![&expected_partition_spec]
+ vec![&expected_partition_spec.into_schemaless()]
);
let expected_sorted_order = SortOrder::builder()
diff --git a/crates/catalog/sql/src/catalog.rs
b/crates/catalog/sql/src/catalog.rs
index b7976d9d..abf22ffd 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -781,7 +781,7 @@ mod tests {
use std::hash::Hash;
use iceberg::io::FileIOBuilder;
- use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
+ use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType,
Schema, SortOrder, Type};
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation,
TableIdent};
use itertools::Itertools;
@@ -874,10 +874,11 @@ mod tests {
assert_eq!(metadata.current_schema().as_ref(), expected_schema);
- let expected_partition_spec = PartitionSpec::builder(expected_schema)
+ let expected_partition_spec =
BoundPartitionSpec::builder(expected_schema.clone())
.with_spec_id(0)
.build()
- .unwrap();
+ .unwrap()
+ .into_schemaless();
assert_eq!(
metadata
diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
index 8f3c2971..2add5761 100644
--- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -258,15 +258,13 @@ mod tests {
UnaryExpression,
};
use crate::spec::{
- DataContentType, DataFile, DataFileFormat, Datum, Literal,
NestedField, PartitionSpec,
- PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform,
Type,
+ BoundPartitionSpec, BoundPartitionSpecRef, DataContentType, DataFile,
DataFileFormat,
+ Datum, Literal, NestedField, PrimitiveType, Schema, Struct, Transform,
Type,
UnboundPartitionField,
};
use crate::Result;
- fn create_schema_and_partition_spec(
- r#type: PrimitiveType,
- ) -> Result<(SchemaRef, PartitionSpecRef)> {
+ fn create_partition_spec(r#type: PrimitiveType) ->
Result<BoundPartitionSpecRef> {
let schema = Schema::builder()
.with_fields(vec![Arc::new(NestedField::optional(
1,
@@ -275,7 +273,7 @@ mod tests {
))])
.build()?;
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
@@ -287,16 +285,15 @@ mod tests {
.build()
.unwrap();
- Ok((Arc::new(schema), Arc::new(spec)))
+ Ok(Arc::new(spec))
}
fn create_partition_filter(
- schema: &Schema,
- partition_spec: PartitionSpecRef,
+ partition_spec: BoundPartitionSpecRef,
predicate: &BoundPredicate,
case_sensitive: bool,
) -> Result<BoundPredicate> {
- let partition_type = partition_spec.partition_type(schema)?;
+ let partition_type = partition_spec.partition_type();
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Schema::builder()
@@ -304,7 +301,8 @@ mod tests {
.with_fields(partition_fields)
.build()?;
- let mut inclusive_projection =
InclusiveProjection::new(partition_spec);
+ let mut inclusive_projection =
+
InclusiveProjection::new((*partition_spec).clone().into_schemaless().into());
let partition_filter = inclusive_projection
.project(predicate)?
@@ -315,13 +313,11 @@ mod tests {
}
fn create_expression_evaluator(
- schema: &Schema,
- partition_spec: PartitionSpecRef,
+ partition_spec: BoundPartitionSpecRef,
predicate: &BoundPredicate,
case_sensitive: bool,
) -> Result<ExpressionEvaluator> {
- let partition_filter =
- create_partition_filter(schema, partition_spec, predicate,
case_sensitive)?;
+ let partition_filter = create_partition_filter(partition_spec,
predicate, case_sensitive)?;
Ok(ExpressionEvaluator::new(partition_filter))
}
@@ -375,7 +371,7 @@ mod tests {
#[test]
fn test_expr_or() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::LessThan,
@@ -387,10 +383,10 @@ mod tests {
Reference::new("a"),
Datum::float(0.4),
)))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -404,7 +400,7 @@ mod tests {
#[test]
fn test_expr_and() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::LessThan,
@@ -416,10 +412,10 @@ mod tests {
Reference::new("a"),
Datum::float(0.4),
)))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -433,17 +429,17 @@ mod tests {
#[test]
fn test_expr_not_in() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Set(SetExpression::new(
PredicateOperator::NotIn,
Reference::new("a"),
FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2),
Datum::float(2.4)]),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -457,17 +453,17 @@ mod tests {
#[test]
fn test_expr_in() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Set(SetExpression::new(
PredicateOperator::In,
Reference::new("a"),
FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2),
Datum::float(2.4)]),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -481,17 +477,17 @@ mod tests {
#[test]
fn test_expr_not_starts_with() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::String)?;
+ let partition_spec = create_partition_spec(PrimitiveType::String)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::NotStartsWith,
Reference::new("a"),
Datum::string("not"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_string();
@@ -505,17 +501,17 @@ mod tests {
#[test]
fn test_expr_starts_with() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::String)?;
+ let partition_spec = create_partition_spec(PrimitiveType::String)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::StartsWith,
Reference::new("a"),
Datum::string("test"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_string();
@@ -529,17 +525,17 @@ mod tests {
#[test]
fn test_expr_not_eq() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::NotEq,
Reference::new("a"),
Datum::float(0.9),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -553,17 +549,17 @@ mod tests {
#[test]
fn test_expr_eq() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::Eq,
Reference::new("a"),
Datum::float(1.0),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -577,17 +573,17 @@ mod tests {
#[test]
fn test_expr_greater_than_or_eq() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::GreaterThanOrEq,
Reference::new("a"),
Datum::float(1.0),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -601,17 +597,17 @@ mod tests {
#[test]
fn test_expr_greater_than() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::GreaterThan,
Reference::new("a"),
Datum::float(0.9),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -625,17 +621,17 @@ mod tests {
#[test]
fn test_expr_less_than_or_eq() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::LessThanOrEq,
Reference::new("a"),
Datum::float(1.0),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -649,17 +645,17 @@ mod tests {
#[test]
fn test_expr_less_than() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Binary(BinaryExpression::new(
PredicateOperator::LessThan,
Reference::new("a"),
Datum::float(1.1),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -673,15 +669,15 @@ mod tests {
#[test]
fn test_expr_is_not_nan() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Unary(UnaryExpression::new(
PredicateOperator::NotNan,
Reference::new("a"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -695,15 +691,15 @@ mod tests {
#[test]
fn test_expr_is_nan() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Unary(UnaryExpression::new(
PredicateOperator::IsNan,
Reference::new("a"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -717,15 +713,15 @@ mod tests {
#[test]
fn test_expr_is_not_null() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Unary(UnaryExpression::new(
PredicateOperator::NotNull,
Reference::new("a"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -739,15 +735,15 @@ mod tests {
#[test]
fn test_expr_is_null() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
let predicate = Predicate::Unary(UnaryExpression::new(
PredicateOperator::IsNull,
Reference::new("a"),
))
- .bind(schema.clone(), case_sensitive)?;
+ .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -761,11 +757,12 @@ mod tests {
#[test]
fn test_expr_always_false() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
- let predicate = Predicate::AlwaysFalse.bind(schema.clone(),
case_sensitive)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
+ let predicate =
+ Predicate::AlwaysFalse.bind(partition_spec.schema_ref().clone(),
case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
@@ -779,11 +776,12 @@ mod tests {
#[test]
fn test_expr_always_true() -> Result<()> {
let case_sensitive = true;
- let (schema, partition_spec) =
create_schema_and_partition_spec(PrimitiveType::Float)?;
- let predicate = Predicate::AlwaysTrue.bind(schema.clone(),
case_sensitive)?;
+ let partition_spec = create_partition_spec(PrimitiveType::Float)?;
+ let predicate =
+ Predicate::AlwaysTrue.bind(partition_spec.schema_ref().clone(),
case_sensitive)?;
let expression_evaluator =
- create_expression_evaluator(&schema, partition_spec, &predicate,
case_sensitive)?;
+ create_expression_evaluator(partition_spec, &predicate,
case_sensitive)?;
let data_file = create_data_file_float();
diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
index a2ee4722..1cdc7577 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
@@ -495,7 +495,7 @@ mod test {
UnaryExpression,
};
use crate::spec::{
- DataContentType, DataFile, DataFileFormat, Datum, NestedField,
PartitionSpec,
+ BoundPartitionSpec, DataContentType, DataFile, DataFileFormat, Datum,
NestedField,
PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
};
@@ -504,10 +504,10 @@ mod test {
#[test]
fn test_data_file_no_partitions() {
- let (table_schema_ref, _partition_spec_ref) =
create_test_schema_and_partition_spec();
+ let partition_spec_ref = create_test_partition_spec();
let partition_filter = Predicate::AlwaysTrue
- .bind(table_schema_ref.clone(), false)
+ .bind(partition_spec_ref.schema_ref().clone(), false)
.unwrap();
let case_sensitive = false;
@@ -1645,7 +1645,7 @@ mod test {
assert!(result, "Should read: NotIn on no nulls column");
}
- fn create_test_schema_and_partition_spec() -> (Arc<Schema>,
Arc<PartitionSpec>) {
+ fn create_test_partition_spec() -> Arc<BoundPartitionSpec> {
let table_schema = Schema::builder()
.with_fields(vec![Arc::new(NestedField::optional(
1,
@@ -1656,7 +1656,7 @@ mod test {
.unwrap();
let table_schema_ref = Arc::new(table_schema);
- let partition_spec = PartitionSpec::builder(&table_schema_ref)
+ let partition_spec =
BoundPartitionSpec::builder(table_schema_ref.clone())
.with_spec_id(1)
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
@@ -1667,8 +1667,7 @@ mod test {
.unwrap()
.build()
.unwrap();
- let partition_spec_ref = Arc::new(partition_spec);
- (table_schema_ref, partition_spec_ref)
+ Arc::new(partition_spec)
}
fn not_null(reference: &str) -> BoundPredicate {
diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
index 2087207e..7c6e0b2d 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
@@ -21,16 +21,16 @@ use fnv::FnvHashSet;
use crate::expr::visitors::bound_predicate_visitor::{visit,
BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference, Predicate};
-use crate::spec::{Datum, PartitionField, PartitionSpecRef};
+use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef};
use crate::Error;
pub(crate) struct InclusiveProjection {
- partition_spec: PartitionSpecRef,
+ partition_spec: SchemalessPartitionSpecRef,
cached_parts: HashMap<i32, Vec<PartitionField>>,
}
impl InclusiveProjection {
- pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self {
+ pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self {
Self {
partition_spec,
cached_parts: HashMap::new(),
@@ -235,7 +235,7 @@ mod tests {
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::{Bind, Predicate, Reference};
use crate::spec::{
- Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType,
Schema, Transform, Type,
+ BoundPartitionSpec, Datum, NestedField, PrimitiveType, Schema,
Transform, Type,
UnboundPartitionField,
};
@@ -265,13 +265,14 @@ mod tests {
#[test]
fn test_inclusive_projection_logic_ops() {
let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.build()
- .unwrap();
+ .unwrap()
+ .into_schemaless();
- let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
// this predicate contains only logic operators,
@@ -295,8 +296,9 @@ mod tests {
#[test]
fn test_inclusive_projection_identity_transform() {
let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
@@ -308,9 +310,9 @@ mod tests {
)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .into_schemaless();
- let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
let unbound_predicate = Reference::new("a").less_than(Datum::int(10));
@@ -321,7 +323,7 @@ mod tests {
// should result in the same Predicate as the original
// `unbound_predicate`, since we have just a single partition field,
// and it has an Identity transform
- let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();
let expected = "a < 10".to_string();
@@ -330,34 +332,95 @@ mod tests {
}
#[test]
- fn test_inclusive_projection_date_transforms() {
+ fn test_inclusive_projection_date_year_transform() {
let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
+
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+ .with_spec_id(1)
+ .add_unbound_fields(vec![UnboundPartitionField {
+ source_id: 2,
+ name: "year".to_string(),
+ field_id: Some(1000),
+ transform: Transform::Year,
+ }])
+ .unwrap()
+ .build()
+ .unwrap()
+ .into_schemaless();
+
+ let arc_partition_spec = Arc::new(partition_spec);
+
+ let unbound_predicate =
+
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
+
+ let bound_predicate = unbound_predicate.bind(arc_schema.clone(),
false).unwrap();
+
+ // applying InclusiveProjection to bound_predicate
+ // should result in a predicate that correctly handles
+ // year, month and date
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
+ let result = inclusive_projection.project(&bound_predicate).unwrap();
+
+ let expected = "year <= 53".to_string();
+
+ assert_eq!(result.to_string(), expected);
+ }
+
+ #[test]
+ fn test_inclusive_projection_date_month_transform() {
+ let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
+
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+ .with_spec_id(1)
+ .add_unbound_fields(vec![UnboundPartitionField {
+ source_id: 2,
+ name: "month".to_string(),
+ field_id: Some(1000),
+ transform: Transform::Month,
+ }])
+ .unwrap()
+ .build()
+ .unwrap()
+ .into_schemaless();
+
+ let arc_partition_spec = Arc::new(partition_spec);
+
+ let unbound_predicate =
+
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
+
+ let bound_predicate = unbound_predicate.bind(arc_schema.clone(),
false).unwrap();
- let partition_spec = PartitionSpec {
- spec_id: 1,
- fields: vec![
- PartitionField {
- source_id: 2,
- name: "year".to_string(),
- field_id: 1000,
- transform: Transform::Year,
- },
- PartitionField {
- source_id: 2,
- name: "month".to_string(),
- field_id: 1001,
- transform: Transform::Month,
- },
- PartitionField {
- source_id: 2,
- name: "day".to_string(),
- field_id: 1002,
- transform: Transform::Day,
- },
- ],
- };
+ // applying InclusiveProjection to bound_predicate
+ // should result in a predicate that correctly handles
+ // year, month and date
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
+ let result = inclusive_projection.project(&bound_predicate).unwrap();
+
+ let expected = "month <= 647".to_string();
+
+ assert_eq!(result.to_string(), expected);
+ }
+ #[test]
+ fn test_inclusive_projection_date_day_transform() {
+ let schema = build_test_schema();
let arc_schema = Arc::new(schema);
+
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+ .with_spec_id(1)
+ .add_unbound_fields(vec![UnboundPartitionField {
+ source_id: 2,
+ name: "day".to_string(),
+ field_id: Some(1000),
+ transform: Transform::Day,
+ }])
+ .unwrap()
+ .build()
+ .unwrap()
+ .into_schemaless();
+
let arc_partition_spec = Arc::new(partition_spec);
let unbound_predicate =
@@ -368,10 +431,10 @@ mod tests {
// applying InclusiveProjection to bound_predicate
// should result in a predicate that correctly handles
// year, month and date
- let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();
- let expected = "((year <= 53) AND (month <= 647)) AND (day <=
19722)".to_string();
+ let expected = "day <= 19722".to_string();
assert_eq!(result.to_string(), expected);
}
@@ -379,8 +442,9 @@ mod tests {
#[test]
fn test_inclusive_projection_truncate_transform() {
let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
@@ -392,9 +456,9 @@ mod tests {
)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .into_schemaless();
- let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
let unbound_predicate =
Reference::new("name").starts_with(Datum::string("Testy McTest"));
@@ -408,7 +472,7 @@ mod tests {
// name that start with "Testy McTest" into a partition
// for values of name that start with the first four letters
// of that, ie "Test".
- let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();
let expected = "name_truncate STARTS WITH \"Test\"".to_string();
@@ -419,8 +483,9 @@ mod tests {
#[test]
fn test_inclusive_projection_bucket_transform() {
let schema = build_test_schema();
+ let arc_schema = Arc::new(schema);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
.with_spec_id(1)
.add_unbound_field(
UnboundPartitionField::builder()
@@ -432,9 +497,9 @@ mod tests {
)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .into_schemaless();
- let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
let unbound_predicate = Reference::new("a").equal_to(Datum::int(10));
@@ -445,7 +510,7 @@ mod tests {
// should result in the "a = 10" predicate being
// transformed into "a = 2", since 10 gets bucketed
// to 2 with a Bucket(7) partition
- let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec.clone());
+ let mut inclusive_projection =
InclusiveProjection::new(arc_partition_spec);
let result = inclusive_projection.project(&bound_predicate).unwrap();
let expected = "a_bucket[7] = 2".to_string();
diff --git a/crates/iceberg/src/io/object_cache.rs
b/crates/iceberg/src/io/object_cache.rs
index 731072a5..35b6a2c9 100644
--- a/crates/iceberg/src/io/object_cache.rs
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -262,7 +262,7 @@ mod tests {
)
.write(Manifest::new(
ManifestMetadata::builder()
- .schema((*current_schema).clone())
+ .schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index d0355454..89e8846f 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -706,7 +706,7 @@ impl PartitionFilterCache {
&self,
spec_id: i32,
table_metadata: &TableMetadataRef,
- schema: &SchemaRef,
+ schema: &Schema,
case_sensitive: bool,
filter: BoundPredicate,
) -> Result<Arc<BoundPredicate>> {
@@ -732,11 +732,11 @@ impl PartitionFilterCache {
format!("Could not find partition spec for id {}", spec_id),
))?;
- let partition_type = partition_spec.partition_type(schema.as_ref())?;
+ let partition_type = partition_spec.partition_type(schema)?;
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Arc::new(
Schema::builder()
- .with_schema_id(partition_spec.spec_id)
+ .with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?,
);
@@ -1057,7 +1057,7 @@ mod tests {
)
.write(Manifest::new(
ManifestMetadata::builder()
- .schema((*current_schema).clone())
+ .schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index 997908d5..085200b7 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -30,8 +30,8 @@ use typed_builder::TypedBuilder;
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
use super::{
- Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile,
PartitionSpec, Schema,
- SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER,
+ BoundPartitionSpec, Datum, FieldSummary, FormatVersion,
ManifestContentType, ManifestFile,
+ Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER,
UNASSIGNED_SEQUENCE_NUMBER,
};
use crate::error::Result;
use crate::io::OutputFile;
@@ -55,7 +55,7 @@ impl Manifest {
let metadata = ManifestMetadata::parse(meta)?;
// Parse manifest entries
- let partition_type =
metadata.partition_spec.partition_type(&metadata.schema)?;
+ let partition_type = metadata.partition_spec.partition_type();
let entries = match metadata.format_version {
FormatVersion::V1 => {
@@ -65,7 +65,7 @@ impl Manifest {
.into_iter()
.map(|value| {
from_value::<_serde::ManifestEntryV1>(&value?)?
- .try_into(&partition_type, &metadata.schema)
+ .try_into(partition_type, &metadata.schema)
})
.collect::<Result<Vec<_>>>()?
}
@@ -76,7 +76,7 @@ impl Manifest {
.into_iter()
.map(|value| {
from_value::<_serde::ManifestEntryV2>(&value?)?
- .try_into(&partition_type, &metadata.schema)
+ .try_into(partition_type, &metadata.schema)
})
.collect::<Result<Vec<_>>>()?
}
@@ -206,10 +206,7 @@ impl ManifestWriter {
/// Write a manifest.
pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
// Create the avro writer
- let partition_type = manifest
- .metadata
- .partition_spec
- .partition_type(&manifest.metadata.schema)?;
+ let partition_type = manifest.metadata.partition_spec.partition_type();
let table_schema = &manifest.metadata.schema;
let avro_schema = match manifest.metadata.format_version {
FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?,
@@ -284,12 +281,12 @@ impl ManifestWriter {
let value = match manifest.metadata.format_version {
FormatVersion::V1 =>
to_value(_serde::ManifestEntryV1::try_from(
(*entry).clone(),
- &partition_type,
+ partition_type,
)?)?
.resolve(&avro_schema)?,
FormatVersion::V2 =>
to_value(_serde::ManifestEntryV2::try_from(
(*entry).clone(),
- &partition_type,
+ partition_type,
)?)?
.resolve(&avro_schema)?,
};
@@ -705,11 +702,11 @@ mod _const_schema {
pub struct ManifestMetadata {
/// The table schema at the time the manifest
/// was written
- schema: Schema,
+ schema: SchemaRef,
/// ID of the schema used to write the manifest as a string
schema_id: SchemaId,
/// The partition spec used to write the manifest
- partition_spec: PartitionSpec,
+ partition_spec: BoundPartitionSpec,
/// Table format version number of the manifest as a string
format_version: FormatVersion,
/// Type of content files tracked by the manifest: “data” or “deletes”
@@ -719,7 +716,7 @@ pub struct ManifestMetadata {
impl ManifestMetadata {
/// Parse from metadata in avro file.
pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self> {
- let schema = {
+ let schema = Arc::new({
let bs = meta.get("schema").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
@@ -733,7 +730,7 @@ impl ManifestMetadata {
)
.with_source(err)
})?
- };
+ });
let schema_id: i32 = meta
.get("schema-id")
.map(|bs| {
@@ -776,7 +773,10 @@ impl ManifestMetadata {
})
.transpose()?
.unwrap_or(0);
- PartitionSpec { spec_id, fields }
+ BoundPartitionSpec::builder(schema.clone())
+ .with_spec_id(spec_id)
+ .add_unbound_fields(fields.into_iter().map(|f|
f.into_unbound()))?
+ .build()?
};
let format_version = if let Some(bs) = meta.get("format-version") {
serde_json::from_slice::<FormatVersion>(bs).map_err(|err| {
@@ -1519,82 +1519,82 @@ mod tests {
#[tokio::test]
async fn test_parse_manifest_v2_unpartition() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ // id v_int v_long v_float v_double v_varchar v_bool
v_date v_timestamp v_decimal v_ts_ntz
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ Arc::new(NestedField::optional(
+ 3,
+ "v_long",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 4,
+ "v_float",
+ Type::Primitive(PrimitiveType::Float),
+ )),
+ Arc::new(NestedField::optional(
+ 5,
+ "v_double",
+ Type::Primitive(PrimitiveType::Double),
+ )),
+ Arc::new(NestedField::optional(
+ 6,
+ "v_varchar",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ Arc::new(NestedField::optional(
+ 7,
+ "v_bool",
+ Type::Primitive(PrimitiveType::Boolean),
+ )),
+ Arc::new(NestedField::optional(
+ 8,
+ "v_date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ Arc::new(NestedField::optional(
+ 9,
+ "v_timestamp",
+ Type::Primitive(PrimitiveType::Timestamptz),
+ )),
+ Arc::new(NestedField::optional(
+ 10,
+ "v_decimal",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 36,
+ scale: 10,
+ }),
+ )),
+ Arc::new(NestedField::optional(
+ 11,
+ "v_ts_ntz",
+ Type::Primitive(PrimitiveType::Timestamp),
+ )),
+ Arc::new(NestedField::optional(
+ 12,
+ "v_ts_ns_ntz",
+ Type::Primitive(PrimitiveType::TimestampNs),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
- schema: Schema::builder()
- .with_fields(vec![
- // id v_int v_long v_float v_double v_varchar v_bool
v_date v_timestamp v_decimal v_ts_ntz
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 2,
- "v_int",
- Type::Primitive(PrimitiveType::Int),
- )),
- Arc::new(NestedField::optional(
- 3,
- "v_long",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 4,
- "v_float",
- Type::Primitive(PrimitiveType::Float),
- )),
- Arc::new(NestedField::optional(
- 5,
- "v_double",
- Type::Primitive(PrimitiveType::Double),
- )),
- Arc::new(NestedField::optional(
- 6,
- "v_varchar",
- Type::Primitive(PrimitiveType::String),
- )),
- Arc::new(NestedField::optional(
- 7,
- "v_bool",
- Type::Primitive(PrimitiveType::Boolean),
- )),
- Arc::new(NestedField::optional(
- 8,
- "v_date",
- Type::Primitive(PrimitiveType::Date),
- )),
- Arc::new(NestedField::optional(
- 9,
- "v_timestamp",
- Type::Primitive(PrimitiveType::Timestamptz),
- )),
- Arc::new(NestedField::optional(
- 10,
- "v_decimal",
- Type::Primitive(PrimitiveType::Decimal {
- precision: 36,
- scale: 10,
- }),
- )),
- Arc::new(NestedField::optional(
- 11,
- "v_ts_ntz",
- Type::Primitive(PrimitiveType::Timestamp),
- )),
- Arc::new(NestedField::optional(
- 12,
- "v_ts_ns_ntz",
- Type::Primitive(PrimitiveType::TimestampNs
- ))),
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![],
- },
+ schema: schema.clone(),
+ partition_spec:
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
@@ -1633,94 +1633,83 @@ mod tests {
#[tokio::test]
async fn test_parse_manifest_v2_partition() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ Arc::new(NestedField::optional(
+ 3,
+ "v_long",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 4,
+ "v_float",
+ Type::Primitive(PrimitiveType::Float),
+ )),
+ Arc::new(NestedField::optional(
+ 5,
+ "v_double",
+ Type::Primitive(PrimitiveType::Double),
+ )),
+ Arc::new(NestedField::optional(
+ 6,
+ "v_varchar",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ Arc::new(NestedField::optional(
+ 7,
+ "v_bool",
+ Type::Primitive(PrimitiveType::Boolean),
+ )),
+ Arc::new(NestedField::optional(
+ 8,
+ "v_date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ Arc::new(NestedField::optional(
+ 9,
+ "v_timestamp",
+ Type::Primitive(PrimitiveType::Timestamptz),
+ )),
+ Arc::new(NestedField::optional(
+ 10,
+ "v_decimal",
+ Type::Primitive(PrimitiveType::Decimal {
+ precision: 36,
+ scale: 10,
+ }),
+ )),
+ Arc::new(NestedField::optional(
+ 11,
+ "v_ts_ntz",
+ Type::Primitive(PrimitiveType::Timestamp),
+ )),
+ Arc::new(NestedField::optional(
+ 12,
+ "v_ts_ns_ntz",
+ Type::Primitive(PrimitiveType::TimestampNs),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
- schema: Schema::builder()
- .with_fields(vec![
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 2,
- "v_int",
- Type::Primitive(PrimitiveType::Int),
- )),
- Arc::new(NestedField::optional(
- 3,
- "v_long",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 4,
- "v_float",
- Type::Primitive(PrimitiveType::Float),
- )),
- Arc::new(NestedField::optional(
- 5,
- "v_double",
- Type::Primitive(PrimitiveType::Double),
- )),
- Arc::new(NestedField::optional(
- 6,
- "v_varchar",
- Type::Primitive(PrimitiveType::String),
- )),
- Arc::new(NestedField::optional(
- 7,
- "v_bool",
- Type::Primitive(PrimitiveType::Boolean),
- )),
- Arc::new(NestedField::optional(
- 8,
- "v_date",
- Type::Primitive(PrimitiveType::Date),
- )),
- Arc::new(NestedField::optional(
- 9,
- "v_timestamp",
- Type::Primitive(PrimitiveType::Timestamptz),
- )),
- Arc::new(NestedField::optional(
- 10,
- "v_decimal",
- Type::Primitive(PrimitiveType::Decimal {
- precision: 36,
- scale: 10,
- }),
- )),
- Arc::new(NestedField::optional(
- 11,
- "v_ts_ntz",
- Type::Primitive(PrimitiveType::Timestamp),
- )),
- Arc::new(NestedField::optional(
- 12,
- "v_ts_ns_ntz",
- Type::Primitive(PrimitiveType::TimestampNs
- )))
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![
- PartitionField {
- name: "v_int".to_string(),
- transform: Transform::Identity,
- source_id: 2,
- field_id: 1000,
- },
- PartitionField {
- name: "v_long".to_string(),
- transform: Transform::Identity,
- source_id: 3,
- field_id: 1001,
- },
- ],
- },
+ schema: schema.clone(),
+ partition_spec: BoundPartitionSpec::builder(schema)
+ .with_spec_id(0).add_partition_field("v_int", "v_int",
Transform::Identity).unwrap()
+ .add_partition_field("v_long", "v_long",
Transform::Identity).unwrap().build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
@@ -1802,34 +1791,34 @@ mod tests {
#[tokio::test]
async fn test_parse_manifest_v1_unpartition() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "data",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ Arc::new(NestedField::optional(
+ 3,
+ "comment",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 1,
- schema: Schema::builder()
- .with_schema_id(1)
- .with_fields(vec![
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Int),
- )),
- Arc::new(NestedField::optional(
- 2,
- "data",
- Type::Primitive(PrimitiveType::String),
- )),
- Arc::new(NestedField::optional(
- 3,
- "comment",
- Type::Primitive(PrimitiveType::String),
- )),
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![],
- },
+ schema: schema.clone(),
+ partition_spec:
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V1,
},
@@ -1867,38 +1856,33 @@ mod tests {
#[tokio::test]
async fn test_parse_manifest_v1_partition() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "data",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ Arc::new(NestedField::optional(
+ 3,
+ "category",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
- schema: Schema::builder()
- .with_fields(vec![
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 2,
- "data",
- Type::Primitive(PrimitiveType::String),
- )),
- Arc::new(NestedField::optional(
- 3,
- "category",
- Type::Primitive(PrimitiveType::String),
- )),
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![PartitionField {
- name: "category".to_string(),
- transform: Transform::Identity,
- source_id: 3,
- field_id: 1000,
- }],
- },
+ schema: schema.clone(),
+ partition_spec:
BoundPartitionSpec::builder(schema).add_partition_field("category", "category",
Transform::Identity).unwrap().build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V1,
},
@@ -1956,28 +1940,28 @@ mod tests {
#[tokio::test]
async fn test_parse_manifest_with_schema_evolution() {
+ let schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
- schema: Schema::builder()
- .with_fields(vec![
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 2,
- "v_int",
- Type::Primitive(PrimitiveType::Int),
- )),
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![],
- },
+ schema: schema.clone(),
+ partition_spec:
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
@@ -2028,28 +2012,28 @@ mod tests {
// Compared with original manifest, the lower_bounds and upper_bounds
no longer has data for field 3, and
// other parts should be same.
+ let schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ );
let expected_manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
- schema: Schema::builder()
- .with_fields(vec![
- Arc::new(NestedField::optional(
- 1,
- "id",
- Type::Primitive(PrimitiveType::Long),
- )),
- Arc::new(NestedField::optional(
- 2,
- "v_int",
- Type::Primitive(PrimitiveType::Int),
- )),
- ])
- .build()
- .unwrap(),
- partition_spec: PartitionSpec {
- spec_id: 0,
- fields: vec![],
- },
+ schema: schema.clone(),
+ partition_spec:
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index 36763df7..75e5d924 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -24,14 +24,15 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use super::transform::Transform;
-use super::{NestedField, Schema, StructType};
+use super::{NestedField, Schema, SchemaRef, StructType};
use crate::{Error, ErrorKind, Result};
pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
-/// Reference to [`PartitionSpec`].
-pub type PartitionSpecRef = Arc<PartitionSpec>;
+/// Reference to [`BoundPartitionSpec`].
+pub type BoundPartitionSpecRef = Arc<BoundPartitionSpec>;
+
/// Partition fields capture the transform from table data to partition values.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
@@ -54,22 +55,51 @@ impl PartitionField {
}
}
-/// Partition spec that defines how to produce a tuple of partition values
from a record.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
+/// Partition spec that defines how to produce a tuple of partition values
from a record.
+/// `PartitionSpec` is bound to a specific schema.
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct BoundPartitionSpec {
+ /// Identifier for PartitionSpec
+ spec_id: i32,
+ /// Details of the partition spec
+ fields: Vec<PartitionField>,
+ /// The schema this partition spec is bound to
+ schema: SchemaRef,
+ /// Type of the partition spec
+ partition_type: StructType,
+}
+
+/// Reference to [`SchemalessPartitionSpec`].
+pub type SchemalessPartitionSpecRef = Arc<SchemalessPartitionSpec>;
+/// Partition spec that defines how to produce a tuple of partition values
from a record.
+/// Schemaless partition specs are never constructed manually. They occur when
a table is mutated
+/// and partition spec and schemas are updated. While old partition specs are
retained, the bound
+/// schema might not be available anymore as part of the table metadata.
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
-pub struct PartitionSpec {
+pub struct SchemalessPartitionSpec {
/// Identifier for PartitionSpec
- pub(crate) spec_id: i32,
+ spec_id: i32,
/// Details of the partition spec
- pub(crate) fields: Vec<PartitionField>,
+ fields: Vec<PartitionField>,
}
-impl PartitionSpec {
+impl BoundPartitionSpec {
/// Create partition spec builder
- pub fn builder(schema: &Schema) -> PartitionSpecBuilder {
+ pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
PartitionSpecBuilder::new(schema)
}
+ /// Get a new unpatitioned partition spec
+ pub fn unpartition_spec(schema: impl Into<SchemaRef>) -> Self {
+ Self {
+ spec_id: DEFAULT_PARTITION_SPEC_ID,
+ fields: vec![],
+ schema: schema.into(),
+ partition_type: StructType::new(vec![]),
+ }
+ }
+
/// Spec id of the partition spec
pub fn spec_id(&self) -> i32 {
self.spec_id
@@ -80,39 +110,21 @@ impl PartitionSpec {
&self.fields
}
+ /// The schema this partition spec is bound to
+ pub fn schema(&self) -> &Schema {
+ &self.schema
+ }
+
+ /// The schema ref this partition spec is bound to
+ pub fn schema_ref(&self) -> &SchemaRef {
+ &self.schema
+ }
+
/// Returns if the partition spec is unpartitioned.
///
/// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields
are [`Transform::Void`] transform.
pub fn is_unpartitioned(&self) -> bool {
- self.fields.is_empty()
- || self
- .fields
- .iter()
- .all(|f| matches!(f.transform, Transform::Void))
- }
-
- /// Returns the partition type of this partition spec.
- pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
- let mut fields = Vec::with_capacity(self.fields.len());
- for partition_field in &self.fields {
- let field = schema
- .field_by_id(partition_field.source_id)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!(
- "No column with source column id {} in schema
{:?}",
- partition_field.source_id, schema
- ),
- )
- })?;
- let res_type =
partition_field.transform.result_type(&field.field_type)?;
- let field =
- NestedField::optional(partition_field.field_id,
&partition_field.name, res_type)
- .into();
- fields.push(field);
- }
- Ok(StructType::new(fields))
+ self.fields.is_empty() || self.fields.iter().all(|f| f.transform ==
Transform::Void)
}
/// Turn this partition spec into an unbound partition spec.
@@ -122,6 +134,29 @@ impl PartitionSpec {
self.into()
}
+ /// Turn this partition spec into a preserved partition spec.
+ pub fn into_schemaless(self) -> SchemalessPartitionSpec {
+ self.into()
+ }
+
+ /// Check if this partition spec has sequential partition ids.
+ /// Sequential ids start from 1000 and increment by 1 for each field.
+ /// This is required for spec version 1
+ pub fn has_sequential_ids(&self) -> bool {
+ has_sequential_ids(self.fields.iter().map(|f| f.field_id))
+ }
+
+ /// Get the highest field id in the partition spec.
+ /// If the partition spec is unpartitioned, it returns the last
unpartitioned last assigned id (999).
+ pub fn highest_field_id(&self) -> Option<i32> {
+ self.fields.iter().map(|f| f.field_id).max()
+ }
+
+ /// Returns the partition type of this partition spec.
+ pub fn partition_type(&self) -> &StructType {
+ &self.partition_type
+ }
+
/// Check if this partition spec is compatible with another partition spec.
///
/// Returns true if the partition spec is equal to the other spec with
partition field ids ignored and
@@ -131,15 +166,15 @@ impl PartitionSpec {
/// * Field names
/// * Source column ids
/// * Transforms
- pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool {
+ pub fn is_compatible_with_schemaless(&self, other:
&SchemalessPartitionSpec) -> bool {
if self.fields.len() != other.fields.len() {
return false;
}
- for (this_field, other_field) in self.fields.iter().zip(&other.fields)
{
+ for (this_field, other_field) in
self.fields.iter().zip(other.fields.iter()) {
if this_field.source_id != other_field.source_id
- || this_field.transform != other_field.transform
|| this_field.name != other_field.name
+ || this_field.transform != other_field.transform
{
return false;
}
@@ -147,33 +182,40 @@ impl PartitionSpec {
true
}
+}
- /// Check if this partition spec has sequential partition ids.
- /// Sequential ids start from 1000 and increment by 1 for each field.
- /// This is required for spec version 1
- pub fn has_sequential_ids(&self) -> bool {
- for (index, field) in self.fields.iter().enumerate() {
- let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
- .checked_add(1)
- .and_then(|id| id.checked_add(index as i64))
- .unwrap_or(i64::MAX);
+impl SchemalessPartitionSpec {
+ /// Fields of the partition spec
+ pub fn fields(&self) -> &[PartitionField] {
+ &self.fields
+ }
- if field.field_id as i64 != expected_id {
- return false;
- }
+ /// Spec id of the partition spec
+ pub fn spec_id(&self) -> i32 {
+ self.spec_id
+ }
+
+ /// Bind this schemaless partition spec to a schema.
+ pub fn bind(self, schema: impl Into<SchemaRef>) ->
Result<BoundPartitionSpec> {
+ PartitionSpecBuilder::new_from_unbound(self.into_unbound(),
schema)?.build()
+ }
+
+ /// Get a new unpatitioned partition spec
+ pub fn unpartition_spec() -> Self {
+ Self {
+ spec_id: DEFAULT_PARTITION_SPEC_ID,
+ fields: vec![],
}
+ }
- true
+ /// Returns the partition type of this partition spec.
+ pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
+ PartitionSpecBuilder::partition_type(&self.fields, schema)
}
- /// Get the highest field id in the partition spec.
- /// If the partition spec is unpartitioned, it returns the last
unpartitioned last assigned id (999).
- pub fn highest_field_id(&self) -> i32 {
- self.fields
- .iter()
- .map(|f| f.field_id)
- .max()
- .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID)
+ /// Convert to unbound partition spec
+ pub fn into_unbound(self) -> UnboundPartitionSpec {
+ self.into()
}
}
@@ -212,7 +254,7 @@ impl UnboundPartitionSpec {
}
/// Bind this unbound partition spec to a schema.
- pub fn bind(self, schema: &Schema) -> Result<PartitionSpec> {
+ pub fn bind(self, schema: impl Into<SchemaRef>) ->
Result<BoundPartitionSpec> {
PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
}
@@ -235,6 +277,21 @@ impl UnboundPartitionSpec {
}
}
+fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
+ for (index, field_id) in field_ids.enumerate() {
+ let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
+ .checked_add(1)
+ .and_then(|id| id.checked_add(index as i64))
+ .unwrap_or(i64::MAX);
+
+ if field_id as i64 != expected_id {
+ return false;
+ }
+ }
+
+ true
+}
+
impl From<PartitionField> for UnboundPartitionField {
fn from(field: PartitionField) -> Self {
UnboundPartitionField {
@@ -246,8 +303,17 @@ impl From<PartitionField> for UnboundPartitionField {
}
}
-impl From<PartitionSpec> for UnboundPartitionSpec {
- fn from(spec: PartitionSpec) -> Self {
+impl From<BoundPartitionSpec> for UnboundPartitionSpec {
+ fn from(spec: BoundPartitionSpec) -> Self {
+ UnboundPartitionSpec {
+ spec_id: Some(spec.spec_id),
+ fields: spec.fields.into_iter().map(Into::into).collect(),
+ }
+ }
+}
+
+impl From<SchemalessPartitionSpec> for UnboundPartitionSpec {
+ fn from(spec: SchemalessPartitionSpec) -> Self {
UnboundPartitionSpec {
spec_id: Some(spec.spec_id),
fields: spec.fields.into_iter().map(Into::into).collect(),
@@ -255,6 +321,15 @@ impl From<PartitionSpec> for UnboundPartitionSpec {
}
}
+impl From<BoundPartitionSpec> for SchemalessPartitionSpec {
+ fn from(spec: BoundPartitionSpec) -> Self {
+ SchemalessPartitionSpec {
+ spec_id: spec.spec_id,
+ fields: spec.fields,
+ }
+ }
+}
+
/// Create a new UnboundPartitionSpec
#[derive(Debug, Default)]
pub struct UnboundPartitionSpecBuilder {
@@ -326,26 +401,29 @@ impl UnboundPartitionSpecBuilder {
/// Create valid partition specs for a given schema.
#[derive(Debug)]
-pub struct PartitionSpecBuilder<'a> {
+pub struct PartitionSpecBuilder {
spec_id: Option<i32>,
last_assigned_field_id: i32,
fields: Vec<UnboundPartitionField>,
- schema: &'a Schema,
+ schema: SchemaRef,
}
-impl<'a> PartitionSpecBuilder<'a> {
+impl PartitionSpecBuilder {
/// Create a new partition spec builder with the given schema.
- pub fn new(schema: &'a Schema) -> Self {
+ pub fn new(schema: impl Into<SchemaRef>) -> Self {
Self {
spec_id: None,
fields: vec![],
last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
- schema,
+ schema: schema.into(),
}
}
/// Create a new partition spec builder from an existing unbound partition
spec.
- pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema)
-> Result<Self> {
+ pub fn new_from_unbound(
+ unbound: UnboundPartitionSpec,
+ schema: impl Into<SchemaRef>,
+ ) -> Result<Self> {
let mut builder =
Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
@@ -408,8 +486,8 @@ impl<'a> PartitionSpecBuilder<'a> {
pub fn add_unbound_field(mut self, field: UnboundPartitionField) ->
Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id,
&field.transform)?;
- Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
- Self::check_transform_compatibility(&field, self.schema)?;
+ Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
+ Self::check_transform_compatibility(&field, &self.schema)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}
@@ -432,11 +510,14 @@ impl<'a> PartitionSpecBuilder<'a> {
}
/// Build a bound partition spec with the given schema.
- pub fn build(self) -> Result<PartitionSpec> {
+ pub fn build(self) -> Result<BoundPartitionSpec> {
let fields = Self::set_field_ids(self.fields,
self.last_assigned_field_id)?;
- Ok(PartitionSpec {
+ let partition_type = Self::partition_type(&fields, &self.schema)?;
+ Ok(BoundPartitionSpec {
spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
fields,
+ partition_type,
+ schema: self.schema,
})
}
@@ -485,6 +566,32 @@ impl<'a> PartitionSpecBuilder<'a> {
Ok(bound_fields)
}
+ /// Returns the partition type of this partition spec.
+ fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) ->
Result<StructType> {
+ let mut struct_fields = Vec::with_capacity(fields.len());
+ for partition_field in fields {
+ let field = schema
+ .field_by_id(partition_field.source_id)
+ .ok_or_else(|| {
+ Error::new(
+ // This should never occur as
check_transform_compatibility
+ // already ensures that the source field exists in the
schema
+ ErrorKind::Unexpected,
+ format!(
+ "No column with source column id {} in schema
{:?}",
+ partition_field.source_id, schema
+ ),
+ )
+ })?;
+ let res_type =
partition_field.transform.result_type(&field.field_type)?;
+ let field =
+ NestedField::optional(partition_field.field_id,
&partition_field.name, res_type)
+ .into();
+ struct_fields.push(field);
+ }
+ Ok(StructType::new(struct_fields))
+ }
+
/// Ensure that the partition name is unique among columns in the schema.
/// Duplicate names are allowed if:
/// 1. The column is sourced from the column with the same name.
@@ -622,7 +729,7 @@ trait CorePartitionSpecValidator {
fn fields(&self) -> &Vec<UnboundPartitionField>;
}
-impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> {
+impl CorePartitionSpecValidator for PartitionSpecBuilder {
fn fields(&self) -> &Vec<UnboundPartitionField> {
&self.fields
}
@@ -637,7 +744,7 @@ impl CorePartitionSpecValidator for
UnboundPartitionSpecBuilder {
#[cfg(test)]
mod tests {
use super::*;
- use crate::spec::Type;
+ use crate::spec::{PrimitiveType, Type};
#[test]
fn test_partition_spec() {
@@ -663,7 +770,7 @@ mod tests {
}
"#;
- let partition_spec: PartitionSpec =
serde_json::from_str(spec).unwrap();
+ let partition_spec: SchemalessPartitionSpec =
serde_json::from_str(spec).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
@@ -695,7 +802,7 @@ mod tests {
])
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
@@ -704,7 +811,7 @@ mod tests {
"Empty partition spec should be unpartitioned"
);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.add_unbound_fields(vec![
UnboundPartitionField::builder()
.source_id(1)
@@ -726,7 +833,7 @@ mod tests {
"Partition spec with one non void transform should not be
unpartitioned"
);
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField::builder()
@@ -809,6 +916,32 @@ mod tests {
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
+ #[test]
+ fn test_new_unpartition() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
+ .with_spec_id(0)
+ .build()
+ .unwrap();
+ let partition_type = partition_spec.partition_type();
+ assert_eq!(0, partition_type.fields().len());
+
+ let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema);
+ assert_eq!(partition_spec, unpartition_spec);
+ }
+
#[test]
fn test_partition_type() {
let spec = r#"
@@ -833,7 +966,7 @@ mod tests {
}
"#;
- let partition_spec: PartitionSpec =
serde_json::from_str(spec).unwrap();
+ let partition_spec: SchemalessPartitionSpec =
serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -909,7 +1042,7 @@ mod tests {
}
"#;
- let partition_spec: PartitionSpec =
serde_json::from_str(spec).unwrap();
+ let partition_spec: SchemalessPartitionSpec =
serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -976,7 +1109,7 @@ mod tests {
}
"#;
- let partition_spec: PartitionSpec =
serde_json::from_str(spec).unwrap();
+ let partition_spec: SchemalessPartitionSpec =
serde_json::from_str(spec).unwrap();
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -994,6 +1127,50 @@ mod tests {
assert!(partition_spec.partition_type(&schema).is_err());
}
+ #[test]
+ fn test_schemaless_bind_schema_keeps_field_ids_and_spec_id() {
+ let schema: Schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
+ .with_spec_id(99)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: Some(1010),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: Some(1001),
+ name: "name_void".to_string(),
+ transform: Transform::Void,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let schemaless_partition_spec =
SchemalessPartitionSpec::from(partition_spec.clone());
+ let bound_partition_spec =
schemaless_partition_spec.bind(schema).unwrap();
+
+ assert_eq!(partition_spec, bound_partition_spec);
+ assert_eq!(partition_spec.fields[0].field_id, 1010);
+ assert_eq!(partition_spec.fields[1].field_id, 1001);
+ assert_eq!(bound_partition_spec.spec_id(), 99);
+ }
+
#[test]
fn test_builder_disallow_duplicate_names() {
UnboundPartitionSpec::builder()
@@ -1018,7 +1195,7 @@ mod tests {
])
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.add_unbound_field(UnboundPartitionField {
source_id: 1,
field_id: Some(1000),
@@ -1056,7 +1233,7 @@ mod tests {
])
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1104,26 +1281,33 @@ mod tests {
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
.unwrap()
.build()
.unwrap();
- assert_eq!(spec, PartitionSpec {
+ assert_eq!(spec, BoundPartitionSpec {
spec_id: 1,
+ schema: schema.into(),
fields: vec![PartitionField {
source_id: 1,
field_id: 1000,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
- }]
+ }],
+ partition_type: StructType::new(vec![NestedField::optional(
+ 1000,
+ "id_bucket[16]",
+ Type::Primitive(PrimitiveType::Int)
+ )
+ .into()])
});
}
@@ -1139,12 +1323,12 @@ mod tests {
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
- let err = PartitionSpec::builder(&schema)
+ let err = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1172,12 +1356,12 @@ mod tests {
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1190,7 +1374,7 @@ mod tests {
.unwrap();
// Not OK for different source id
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
@@ -1224,7 +1408,7 @@ mod tests {
.unwrap();
// Valid
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField {
@@ -1245,7 +1429,7 @@ mod tests {
.unwrap();
// Invalid
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_fields(vec![
UnboundPartitionField {
@@ -1291,7 +1475,7 @@ mod tests {
.build()
.unwrap();
- PartitionSpec::builder(&schema)
+ BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1342,7 +1526,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_1 = PartitionSpec::builder(&schema)
+ let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1354,7 +1538,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_2 = PartitionSpec::builder(&schema)
+ let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1366,7 +1550,7 @@ mod tests {
.build()
.unwrap();
-
assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+
assert!(partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()));
}
#[test]
@@ -1381,7 +1565,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_1 = PartitionSpec::builder(&schema)
+ let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1393,7 +1577,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_2 = PartitionSpec::builder(&schema)
+ let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1405,7 +1589,9 @@ mod tests {
.build()
.unwrap();
-
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ assert!(
+
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+ );
}
#[test]
@@ -1424,7 +1610,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_1 = PartitionSpec::builder(&schema)
+ let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1436,7 +1622,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_2 = PartitionSpec::builder(&schema)
+ let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
@@ -1448,7 +1634,9 @@ mod tests {
.build()
.unwrap();
-
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ assert!(
+
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+ );
}
#[test]
@@ -1467,7 +1655,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_1 = PartitionSpec::builder(&schema)
+ let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1486,7 +1674,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec_2 = PartitionSpec::builder(&schema)
+ let partition_spec_2 = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
@@ -1505,17 +1693,20 @@ mod tests {
.build()
.unwrap();
-
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ assert!(
+
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+ );
}
#[test]
fn test_highest_field_id_unpartitioned() {
- let spec =
PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap())
- .with_spec_id(1)
- .build()
- .unwrap();
+ let spec =
+
BoundPartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
+ .with_spec_id(1)
+ .build()
+ .unwrap();
- assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id());
+ assert!(spec.highest_field_id().is_none());
}
#[test]
@@ -1534,7 +1725,7 @@ mod tests {
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1553,7 +1744,7 @@ mod tests {
.build()
.unwrap();
- assert_eq!(1001, spec.highest_field_id());
+ assert_eq!(Some(1001), spec.highest_field_id());
}
#[test]
@@ -1572,7 +1763,7 @@ mod tests {
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1612,7 +1803,7 @@ mod tests {
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
@@ -1652,7 +1843,7 @@ mod tests {
.build()
.unwrap();
- let spec = PartitionSpec::builder(&schema)
+ let spec = BoundPartitionSpec::builder(schema)
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index cde70937..4a2e3ab7 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -31,8 +31,8 @@ use uuid::Uuid;
use super::snapshot::SnapshotReference;
use super::{
- PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, Snapshot,
SnapshotRef, SnapshotRetention,
- SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
+ BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef,
SchemalessPartitionSpecRef,
+ Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef,
DEFAULT_PARTITION_SPEC_ID,
};
use crate::error::{timestamp_ms_to_utc, Result};
use crate::{Error, ErrorKind, TableCreation};
@@ -118,9 +118,9 @@ pub struct TableMetadata {
/// ID of the table’s current schema.
pub(crate) current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
- pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
+ pub(crate) partition_specs: HashMap<i32, SchemalessPartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
- pub(crate) default_spec_id: i32,
+ pub(crate) default_spec: BoundPartitionSpecRef,
/// An integer; the highest assigned partition field ID across all
partition specs for the table.
pub(crate) last_partition_id: i32,
///A string to string map of table properties. This is used to control
settings that
@@ -222,21 +222,26 @@ impl TableMetadata {
/// Returns all partition specs.
#[inline]
- pub fn partition_specs_iter(&self) -> impl Iterator<Item =
&PartitionSpecRef> {
+ pub fn partition_specs_iter(&self) -> impl Iterator<Item =
&SchemalessPartitionSpecRef> {
self.partition_specs.values()
}
/// Lookup partition spec by id.
#[inline]
- pub fn partition_spec_by_id(&self, spec_id: i32) ->
Option<&PartitionSpecRef> {
+ pub fn partition_spec_by_id(&self, spec_id: i32) ->
Option<&SchemalessPartitionSpecRef> {
self.partition_specs.get(&spec_id)
}
/// Get default partition spec
#[inline]
- pub fn default_partition_spec(&self) -> &PartitionSpecRef {
- self.partition_spec_by_id(self.default_spec_id)
- .expect("Default partition spec id set, but not found in table
metadata")
+ pub fn default_partition_spec(&self) -> &BoundPartitionSpecRef {
+ &self.default_spec
+ }
+
+ #[inline]
+ /// Returns spec id of the "current" partition spec.
+ pub fn default_partition_spec_id(&self) -> i32 {
+ self.default_spec.spec_id()
}
/// Returns all snapshots
@@ -352,29 +357,18 @@ impl TableMetadata {
Ok(self)
}
- /// If the default partition spec is specified but the spec is not present
in specs, add it
+ /// If the default partition spec is not present in specs, add it
fn try_normalize_partition_spec(&mut self) -> Result<()> {
- if self.partition_spec_by_id(self.default_spec_id).is_some() {
- return Ok(());
- }
-
- if self.default_spec_id != DEFAULT_PARTITION_SPEC_ID {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "No partition spec exists with the default spec id {}.",
- self.default_spec_id
- ),
- ));
+ if self
+ .partition_spec_by_id(self.default_spec.spec_id())
+ .is_none()
+ {
+ self.partition_specs.insert(
+ self.default_spec.spec_id(),
+
Arc::new(Arc::unwrap_or_clone(self.default_spec.clone()).into_schemaless()),
+ );
}
- let partition_spec = PartitionSpec {
- spec_id: DEFAULT_PARTITION_SPEC_ID,
- fields: vec![],
- };
- self.partition_specs
- .insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec));
-
Ok(())
}
@@ -565,6 +559,8 @@ impl TableMetadataBuilder {
properties,
} = table_creation;
+ let schema: Arc<super::Schema> = Arc::new(schema);
+ let unpartition_spec =
BoundPartitionSpec::unpartition_spec(schema.clone());
let partition_specs = match partition_spec {
Some(_) => {
return Err(Error::new(
@@ -573,11 +569,8 @@ impl TableMetadataBuilder {
))
}
None => HashMap::from([(
- DEFAULT_PARTITION_SPEC_ID,
- Arc::new(PartitionSpec {
- spec_id: DEFAULT_PARTITION_SPEC_ID,
- fields: vec![],
- }),
+ unpartition_spec.spec_id(),
+ Arc::new(unpartition_spec.clone().into_schemaless()),
)]),
};
@@ -607,9 +600,9 @@ impl TableMetadataBuilder {
last_updated_ms: Utc::now().timestamp_millis(),
last_column_id: schema.highest_field_id(),
current_schema_id: schema.schema_id(),
- schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]),
+ schemas: HashMap::from([(schema.schema_id(), schema)]),
partition_specs,
- default_spec_id: DEFAULT_PARTITION_SPEC_ID,
+ default_spec: BoundPartitionSpecRef::new(unpartition_spec),
last_partition_id: 0,
properties,
current_snapshot_id: None,
@@ -661,8 +654,8 @@ pub(super) mod _serde {
use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
use crate::spec::{
- PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference,
SnapshotRetention,
- SortOrder,
+ BoundPartitionSpec, PartitionField, Schema, SchemaRef,
SchemalessPartitionSpec, Snapshot,
+ SnapshotReference, SnapshotRetention, SortOrder,
};
use crate::{Error, ErrorKind};
@@ -685,7 +678,7 @@ pub(super) mod _serde {
pub last_column_id: i32,
pub schemas: Vec<SchemaV2>,
pub current_schema_id: i32,
- pub partition_specs: Vec<PartitionSpec>,
+ pub partition_specs: Vec<SchemalessPartitionSpec>,
pub default_spec_id: i32,
pub last_partition_id: i32,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -721,7 +714,7 @@ pub(super) mod _serde {
pub current_schema_id: Option<i32>,
pub partition_spec: Vec<PartitionField>,
#[serde(skip_serializing_if = "Option::is_none")]
- pub partition_specs: Option<Vec<PartitionSpec>>,
+ pub partition_specs: Option<Vec<SchemalessPartitionSpec>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_spec_id: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -809,6 +802,44 @@ pub(super) mod _serde {
.map(|schema| Ok((schema.schema_id,
Arc::new(schema.try_into()?))))
.collect::<Result<Vec<_>, Error>>()?,
);
+
+ let current_schema: &SchemaRef =
+ schemas.get(&value.current_schema_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No schema exists with the current schema id {}.",
+ value.current_schema_id
+ ),
+ )
+ })?;
+ let partition_specs = HashMap::from_iter(
+ value
+ .partition_specs
+ .into_iter()
+ .map(|x| (x.spec_id(), Arc::new(x))),
+ );
+ let default_spec_id = value.default_spec_id;
+ let default_spec = partition_specs
+ .get(&value.default_spec_id)
+ .map(|schemaless_spec| {
+ (*schemaless_spec.clone())
+ .clone()
+ .bind(current_schema.clone())
+ })
+ .transpose()?
+ .or_else(|| {
+ (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
+ .then(||
BoundPartitionSpec::unpartition_spec(current_schema.clone()))
+ })
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Default partition spec {default_spec_id} not
found"),
+ )
+ })?
+ .into();
+
let mut metadata = TableMetadata {
format_version: FormatVersion::V2,
table_uuid: value.table_uuid,
@@ -818,13 +849,8 @@ pub(super) mod _serde {
last_column_id: value.last_column_id,
current_schema_id: value.current_schema_id,
schemas,
- partition_specs: HashMap::from_iter(
- value
- .partition_specs
- .into_iter()
- .map(|x| (x.spec_id(), Arc::new(x))),
- ),
- default_spec_id: value.default_spec_id,
+ partition_specs,
+ default_spec,
last_partition_id: value.last_partition_id,
properties: value.properties.unwrap_or_default(),
current_snapshot_id,
@@ -876,6 +902,7 @@ pub(super) mod _serde {
} else {
value.current_snapshot_id
};
+
let schemas = value
.schemas
.map(|schemas| {
@@ -900,18 +927,49 @@ pub(super) mod _serde {
})
.transpose()?
.unwrap_or_default();
- let partition_specs = HashMap::from_iter(
- value
- .partition_specs
- .unwrap_or_else(|| {
- vec![PartitionSpec {
- spec_id: DEFAULT_PARTITION_SPEC_ID,
- fields: value.partition_spec,
- }]
- })
- .into_iter()
- .map(|x| (x.spec_id(), Arc::new(x))),
- );
+ let current_schema_id = value
+ .current_schema_id
+ .unwrap_or_else(||
schemas.keys().copied().max().unwrap_or_default());
+ let current_schema = schemas
+ .get(¤t_schema_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No schema exists with the current schema id {}.",
+ current_schema_id
+ ),
+ )
+ })?
+ .clone();
+
+ let partition_specs = match value.partition_specs {
+ Some(partition_specs) => partition_specs,
+ None =>
vec![BoundPartitionSpec::builder(current_schema.clone())
+ .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
+
.add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))?
+ .build()?
+ .into_schemaless()],
+ }
+ .into_iter()
+ .map(|x| (x.spec_id(), Arc::new(x)))
+ .collect::<HashMap<_, _>>();
+
+ let default_spec_id = value
+ .default_spec_id
+ .unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default());
+ let default_spec = partition_specs
+ .get(&default_spec_id)
+ .map(|x|
Arc::unwrap_or_clone(x.clone()).bind(current_schema.clone()))
+ .transpose()?
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Default partition spec {default_spec_id} not
found"),
+ )
+ })?
+ .into();
+
let mut metadata = TableMetadata {
format_version: FormatVersion::V1,
table_uuid: value.table_uuid.unwrap_or_default(),
@@ -919,12 +977,8 @@ pub(super) mod _serde {
last_sequence_number: 0,
last_updated_ms: value.last_updated_ms,
last_column_id: value.last_column_id,
- current_schema_id: value
- .current_schema_id
- .unwrap_or_else(||
schemas.keys().copied().max().unwrap_or_default()),
- default_spec_id: value
- .default_spec_id
- .unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default()),
+ current_schema_id,
+ default_spec,
last_partition_id: value
.last_partition_id
.unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default()),
@@ -998,7 +1052,7 @@ pub(super) mod _serde {
.into_values()
.map(|x| Arc::try_unwrap(x).unwrap_or_else(|s|
s.as_ref().clone()))
.collect(),
- default_spec_id: v.default_spec_id,
+ default_spec_id: v.default_spec.spec_id(),
last_partition_id: v.last_partition_id,
properties: Some(v.properties),
current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
@@ -1067,18 +1121,14 @@ pub(super) mod _serde {
.collect(),
),
current_schema_id: Some(v.current_schema_id),
- partition_spec: v
- .partition_specs
- .get(&v.default_spec_id)
- .map(|x| x.fields().to_vec())
- .unwrap_or_default(),
+ partition_spec: v.default_spec.fields().to_vec(),
partition_specs: Some(
v.partition_specs
.into_values()
.map(|x| Arc::try_unwrap(x).unwrap_or_else(|s|
s.as_ref().clone()))
.collect(),
),
- default_spec_id: Some(v.default_spec_id),
+ default_spec_id: Some(v.default_spec.spec_id()),
last_partition_id: Some(v.last_partition_id),
properties: if v.properties.is_empty() {
None
@@ -1195,9 +1245,9 @@ mod tests {
use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
- NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema,
- Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
SortField, SortOrder,
- Summary, Transform, Type, UnboundPartitionField,
+ BoundPartitionSpec, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
+ SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, Summary,
+ Transform, Type, UnboundPartitionField,
};
use crate::TableCreation;
@@ -1238,6 +1288,12 @@ mod tests {
"name": "struct_name",
"required": true,
"type": "fixed[1]"
+ },
+ {
+ "id": 4,
+ "name": "ts",
+ "required": true,
+ "type": "timestamp"
}
]
}
@@ -1279,23 +1335,32 @@ mod tests {
let schema = Schema::builder()
.with_schema_id(1)
- .with_fields(vec![Arc::new(NestedField::required(
- 1,
- "struct_name",
- Type::Primitive(PrimitiveType::Fixed(1)),
- ))])
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 1,
+ "struct_name",
+ Type::Primitive(PrimitiveType::Fixed(1)),
+ )),
+ Arc::new(NestedField::required(
+ 4,
+ "ts",
+ Type::Primitive(PrimitiveType::Timestamp),
+ )),
+ ])
.build()
.unwrap();
- let partition_spec = PartitionSpec {
- spec_id: 0,
- fields: vec![PartitionField {
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
+ .with_spec_id(0)
+ .add_unbound_field(UnboundPartitionField {
name: "ts_day".to_string(),
transform: Transform::Day,
source_id: 4,
- field_id: 1000,
- }],
- };
+ field_id: Some(1000),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
let expected = TableMetadata {
format_version: FormatVersion::V2,
@@ -1305,8 +1370,11 @@ mod tests {
last_column_id: 1,
schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
current_schema_id: 1,
- partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
- default_spec_id: 0,
+ partition_specs: HashMap::from_iter(vec![(
+ 0,
+ partition_spec.clone().into_schemaless().into(),
+ )]),
+ default_spec: partition_spec.into(),
last_partition_id: 1000,
default_sort_order_id: 0,
sort_orders: HashMap::from_iter(vec![(0,
SortOrder::unsorted_order().into())]),
@@ -1445,7 +1513,8 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder(&schema)
+ let schema = Arc::new(schema);
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("vendor_id", "vendor_id", Transform::Identity)
.unwrap()
@@ -1472,10 +1541,10 @@ mod tests {
location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
last_updated_ms: 1662532818843,
last_column_id: 5,
- schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+ schemas: HashMap::from_iter(vec![(0, schema)]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
- default_spec_id: 0,
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.clone().into_schemaless().into())]),
+ default_spec: Arc::new(partition_spec),
last_partition_id: 1000,
default_sort_order_id: 0,
sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
@@ -1514,6 +1583,12 @@ mod tests {
"name": "struct_name",
"required": true,
"type": "fixed[1]"
+ },
+ {
+ "id": 4,
+ "name": "ts",
+ "required": true,
+ "type": "timestamp"
}
]
}
@@ -1614,6 +1689,12 @@ mod tests {
"name": "struct_name",
"required": true,
"type": "fixed[1]"
+ },
+ {
+ "id": 4,
+ "name": "ts",
+ "required": true,
+ "type": "timestamp"
}
]
}
@@ -1699,6 +1780,12 @@ mod tests {
"name": "struct_name",
"required": true,
"type": "fixed[1]"
+ },
+ {
+ "id": 4,
+ "name": "ts",
+ "required": true,
+ "type": "timestamp"
}
]
}
@@ -1828,7 +1915,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder(&schema1)
+ let partition_spec = BoundPartitionSpec::builder(schema2.clone())
.with_spec_id(0)
.add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
@@ -1889,8 +1976,11 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1,
Arc::new(schema2))]),
current_schema_id: 1,
- partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
- default_spec_id: 0,
+ partition_specs: HashMap::from_iter(vec![(
+ 0,
+ partition_spec.clone().into_schemaless().into(),
+ )]),
+ default_spec: Arc::new(partition_spec),
last_partition_id: 1000,
default_sort_order_id: 3,
sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
@@ -1951,7 +2041,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
@@ -1988,8 +2078,11 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
- default_spec_id: 0,
+ partition_specs: HashMap::from_iter(vec![(
+ 0,
+ partition_spec.clone().into_schemaless().into(),
+ )]),
+ default_spec: Arc::new(partition_spec),
last_partition_id: 1000,
default_sort_order_id: 3,
sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
@@ -2031,7 +2124,7 @@ mod tests {
.build()
.unwrap();
- let partition_spec = PartitionSpec::builder(&schema)
+ let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_unbound_field(UnboundPartitionField {
name: "x".to_string(),
@@ -2051,8 +2144,11 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
- default_spec_id: 0,
+ partition_specs: HashMap::from_iter(vec![(
+ 0,
+ partition_spec.clone().into_schemaless().into(),
+ )]),
+ default_spec: Arc::new(partition_spec),
last_partition_id: 0,
default_sort_order_id: 0,
// Sort order is added during deserialization for V2 compatibility
@@ -2165,16 +2261,22 @@ mod tests {
fn test_default_partition_spec() {
let default_spec_id = 1234;
let mut table_meta_data =
get_test_table_metadata("TableMetadataV2Valid.json");
- table_meta_data.default_spec_id = default_spec_id;
+ let partition_spec =
+
BoundPartitionSpec::unpartition_spec(table_meta_data.current_schema().clone());
+ table_meta_data.default_spec = partition_spec.clone().into();
table_meta_data
.partition_specs
- .insert(default_spec_id, Arc::new(PartitionSpec::default()));
+ .insert(default_spec_id,
Arc::new(partition_spec.into_schemaless()));
assert_eq!(
- table_meta_data.default_partition_spec(),
- table_meta_data
+ (*table_meta_data.default_partition_spec().clone())
+ .clone()
+ .into_schemaless(),
+ (*table_meta_data
.partition_spec_by_id(default_spec_id)
.unwrap()
+ .clone())
+ .clone()
);
}
#[test]
@@ -2225,10 +2327,11 @@ mod tests {
HashMap::from([(
0,
Arc::new(
-
PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap())
+
BoundPartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
.with_spec_id(0)
.build()
.unwrap()
+ .into_schemaless()
)
)])
);
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index 44326190..def18b58 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -132,7 +132,7 @@ pub(crate) mod test {
use uuid::Uuid;
use super::LocationGenerator;
- use crate::spec::{FormatVersion, TableMetadata};
+ use crate::spec::{BoundPartitionSpec, FormatVersion, TableMetadata};
use crate::writer::file_writer::location_generator::{
FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION,
};
@@ -156,6 +156,7 @@ pub(crate) mod test {
#[test]
fn test_default_location_generate() {
+ let schema = crate::spec::Schema::builder().build().unwrap();
let mut table_metadata = TableMetadata {
format_version: FormatVersion::V2,
table_uuid:
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
@@ -165,7 +166,7 @@ pub(crate) mod test {
schemas: HashMap::new(),
current_schema_id: 1,
partition_specs: HashMap::new(),
- default_spec_id: 1,
+ default_spec: BoundPartitionSpec::unpartition_spec(schema).into(),
last_partition_id: 1000,
default_sort_order_id: 0,
sort_orders: HashMap::from_iter(vec![]),