This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d206c1d feat: support UnboundPartitionSpec (#106)
d206c1d is described below
commit d206c1d5d299022bbb90223fc6e1166ebef74ae2
Author: yi wang <[email protected]>
AuthorDate: Thu Dec 7 20:28:10 2023 +0800
feat: support UnboundPartitionSpec (#106)
* Implement unbound partition spec.
* little update
* Update in tablecreate & addspec
* fixup: add some tests.
* Put comments before derive
---
.gitignore | 1 +
crates/catalog/rest/src/catalog.rs | 16 +++---
crates/iceberg/src/catalog/mod.rs | 36 +++++-------
crates/iceberg/src/spec/partition.rs | 108 +++++++++++++++++++++++++++++++++--
4 files changed, 126 insertions(+), 35 deletions(-)
diff --git a/.gitignore b/.gitignore
index 10ed67b..72c3484 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@
/target
/Cargo.lock
.idea
+.vscode
**/.DS_Store
\ No newline at end of file
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 8037d8c..1dfbe79 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -521,7 +521,7 @@ mod _serde {
use serde_derive::{Deserialize, Serialize};
- use iceberg::spec::{PartitionSpec, Schema, SortOrder, TableMetadata};
+ use iceberg::spec::{Schema, SortOrder, TableMetadata,
UnboundPartitionSpec};
use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement,
TableUpdate};
pub(super) const OK: u16 = 200u16;
@@ -660,7 +660,7 @@ mod _serde {
pub(super) name: String,
pub(super) location: Option<String>,
pub(super) schema: Schema,
- pub(super) partition_spec: Option<PartitionSpec>,
+ pub(super) partition_spec: Option<UnboundPartitionSpec>,
pub(super) write_order: Option<SortOrder>,
pub(super) stage_create: Option<bool>,
pub(super) properties: Option<HashMap<String, String>>,
@@ -686,9 +686,9 @@ mod tests {
use chrono::{TimeZone, Utc};
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
- FormatVersion, NestedField, NullOrder, Operation, PartitionField,
PartitionSpec,
- PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection,
SortField, SortOrder, Summary,
- Transform, Type,
+ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
+ SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform,
Type,
+ UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::transaction::Transaction;
use mockito::{Mock, Server, ServerGuard};
@@ -1233,14 +1233,12 @@ mod tests {
)
.properties(HashMap::from([("owner".to_string(),
"testx".to_string())]))
.partition_spec(
- PartitionSpec::builder()
- .with_fields(vec![PartitionField::builder()
+ UnboundPartitionSpec::builder()
+ .with_fields(vec![UnboundPartitionField::builder()
.source_id(1)
- .field_id(1000)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
- .with_spec_id(1)
.build()
.unwrap(),
)
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index d13a46b..2ddeace 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -20,7 +20,9 @@
use serde_derive::{Deserialize, Serialize};
use urlencoding::encode;
-use crate::spec::{FormatVersion, PartitionSpec, Schema, Snapshot,
SnapshotReference, SortOrder};
+use crate::spec::{
+ FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder,
UnboundPartitionSpec,
+};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use async_trait::async_trait;
@@ -226,7 +228,7 @@ pub struct TableCreation {
pub schema: Schema,
/// The partition spec of the table, could be None.
#[builder(default, setter(strip_option))]
- pub partition_spec: Option<PartitionSpec>,
+ pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
@@ -361,7 +363,7 @@ pub enum TableUpdate {
/// Add a new partition spec to the table
AddSpec {
/// The partition spec to add.
- spec: PartitionSpec,
+ spec: UnboundPartitionSpec,
},
/// Set table's default spec
#[serde(rename_all = "kebab-case")]
@@ -429,9 +431,9 @@ pub enum TableUpdate {
mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
- FormatVersion, NestedField, NullOrder, Operation, PartitionField,
PartitionSpec,
- PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection,
- SortField, SortOrder, Summary, Transform, Type,
+ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
+ SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, Summary,
+ Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate};
use serde::de::DeserializeOwned;
@@ -758,23 +760,19 @@ mod tests {
{
"action": "add-spec",
"spec": {
- "spec-id": 1,
"fields": [
{
"source-id": 4,
- "field-id": 1000,
"name": "ts_day",
"transform": "day"
},
{
"source-id": 1,
- "field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
},
{
"source-id": 2,
- "field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
}
@@ -783,28 +781,24 @@ mod tests {
}
"#,
TableUpdate::AddSpec {
- spec: PartitionSpec::builder()
- .with_spec_id(1)
- .with_partition_field(
- PartitionField::builder()
+ spec: UnboundPartitionSpec::builder()
+ .with_unbound_partition_field(
+ UnboundPartitionField::builder()
.source_id(4)
- .field_id(1000)
.name("ts_day".to_string())
.transform(Transform::Day)
.build(),
)
- .with_partition_field(
- PartitionField::builder()
+ .with_unbound_partition_field(
+ UnboundPartitionField::builder()
.source_id(1)
- .field_id(1001)
.name("id_bucket".to_string())
.transform(Transform::Bucket(16))
.build(),
)
- .with_partition_field(
- PartitionField::builder()
+ .with_unbound_partition_field(
+ UnboundPartitionField::builder()
.source_id(2)
- .field_id(1002)
.name("id_truncate".to_string())
.transform(Transform::Truncate(4))
.build(),
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index cfdbb6f..16395dc 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -26,9 +26,9 @@ use super::transform::Transform;
/// Reference to [`PartitionSpec`].
pub type PartitionSpecRef = Arc<PartitionSpec>;
+/// Partition fields capture the transform from table data to partition values.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
-/// Partition fields capture the transform from table data to partition values.
pub struct PartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
@@ -41,10 +41,10 @@ pub struct PartitionField {
pub transform: Transform,
}
+/// Partition spec that defines how to produce a tuple of partition values
from a record.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default,
Builder)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
-/// Partition spec that defines how to produce a tuple of partition values
from a record.
pub struct PartitionSpec {
/// Identifier for PartitionSpec
pub spec_id: i32,
@@ -60,13 +60,51 @@ impl PartitionSpec {
}
}
+/// Reference to [`UnboundPartitionSpec`].
+pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
+/// Unbound partition field can be built without a schema and later bound to a
schema.
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
+#[serde(rename_all = "kebab-case")]
+pub struct UnboundPartitionField {
+ /// A source column id from the table’s schema
+ pub source_id: i32,
+ /// A partition field id that is used to identify a partition field and is
unique within a partition spec.
+ /// In v2 table metadata, it is unique across all partition specs.
+ #[builder(default, setter(strip_option))]
+ pub partition_id: Option<i32>,
+ /// A partition name.
+ pub name: String,
+ /// A transform that is applied to the source column to produce a
partition value.
+ pub transform: Transform,
+}
+
+/// Unbound partition spec can be built without a schema and later bound to a
schema.
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default,
Builder)]
+#[serde(rename_all = "kebab-case")]
+#[builder(setter(prefix = "with"))]
+pub struct UnboundPartitionSpec {
+ /// Identifier for PartitionSpec
+ #[builder(default, setter(strip_option))]
+ pub spec_id: Option<i32>,
+ /// Details of the partition spec
+ #[builder(setter(each(name = "with_unbound_partition_field")))]
+ pub fields: Vec<UnboundPartitionField>,
+}
+
+impl UnboundPartitionSpec {
+ /// Create unbound partition spec builer
+ pub fn builder() -> UnboundPartitionSpecBuilder {
+ UnboundPartitionSpecBuilder::default()
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
#[test]
- fn partition_spec() {
- let sort_order = r#"
+ fn test_partition_spec() {
+ let spec = r#"
{
"spec-id": 1,
"fields": [ {
@@ -88,7 +126,7 @@ mod tests {
}
"#;
- let partition_spec: PartitionSpec =
serde_json::from_str(sort_order).unwrap();
+ let partition_spec: PartitionSpec =
serde_json::from_str(spec).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
@@ -104,4 +142,64 @@ mod tests {
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}
+
+ #[test]
+ fn test_unbound_partition_spec() {
+ let spec = r#"
+ {
+ "spec-id": 1,
+ "fields": [ {
+ "source-id": 4,
+ "partition-id": 1000,
+ "name": "ts_day",
+ "transform": "day"
+ }, {
+ "source-id": 1,
+ "partition-id": 1001,
+ "name": "id_bucket",
+ "transform": "bucket[16]"
+ }, {
+ "source-id": 2,
+ "partition-id": 1002,
+ "name": "id_truncate",
+ "transform": "truncate[4]"
+ } ]
+ }
+ "#;
+
+ let partition_spec: UnboundPartitionSpec =
serde_json::from_str(spec).unwrap();
+ assert_eq!(Some(1), partition_spec.spec_id);
+
+ assert_eq!(4, partition_spec.fields[0].source_id);
+ assert_eq!(Some(1000), partition_spec.fields[0].partition_id);
+ assert_eq!("ts_day", partition_spec.fields[0].name);
+ assert_eq!(Transform::Day, partition_spec.fields[0].transform);
+
+ assert_eq!(1, partition_spec.fields[1].source_id);
+ assert_eq!(Some(1001), partition_spec.fields[1].partition_id);
+ assert_eq!("id_bucket", partition_spec.fields[1].name);
+ assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
+
+ assert_eq!(2, partition_spec.fields[2].source_id);
+ assert_eq!(Some(1002), partition_spec.fields[2].partition_id);
+ assert_eq!("id_truncate", partition_spec.fields[2].name);
+ assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
+
+ let spec = r#"
+ {
+ "fields": [ {
+ "source-id": 4,
+ "name": "ts_day",
+ "transform": "day"
+ } ]
+ }
+ "#;
+ let partition_spec: UnboundPartitionSpec =
serde_json::from_str(spec).unwrap();
+ assert_eq!(None, partition_spec.spec_id);
+
+ assert_eq!(4, partition_spec.fields[0].source_id);
+ assert_eq!(None, partition_spec.fields[0].partition_id);
+ assert_eq!("ts_day", partition_spec.fields[0].name);
+ assert_eq!(Transform::Day, partition_spec.fields[0].transform);
+ }
}