This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bdc66a0 feat: Table metadata (#29)
bdc66a0 is described below
commit bdc66a0e984fdedaa81b252d35c72b3f3cb79047
Author: JanKaul <[email protected]>
AuthorDate: Mon Aug 21 10:53:05 2023 +0200
feat: Table metadata (#29)
* serde schemav1 & schemav2
* fix default schema id
* implement snapshot
* add partition spec
* add license
* add sortorder
* fix initial & write default
* serialize/deserialize table metadata
* impl table metadata
* fix docs
* fix clippy warnings
* change visibility
* fix rebase
* fix clippy warnings
* fix transform
* introduce static
* fix typo
* change spec export style
* improve table metadata v1 test
* improve table metadata v2 test
* delete temp file
* export all submodule types at spec module
* rename snapshotreference
* rename snapshot retention
* remove option from properties
* remove option from snapshot log
* improve builder
* introduce enum for manifest list files and manifests
* keep retention
* use arc for schema
* use arc for snapshot
* current snapshot returns option
* remove panic from snapshot conversion
* check if current_snapshot_id is -1
* fix schema
* use schema field as fallback in v1 table metadata
* use partition spec as fallback in v1 metadata
* fix parition spec
* introduce _serde module for schema
* introduce _serde module for snapshot
* introduce _serde module for table_metadata
* fix docs
* fix typo
* use minimal table metadata for v1 test
---
crates/iceberg/Cargo.toml | 2 +
crates/iceberg/src/lib.rs | 3 +
crates/iceberg/src/spec/datatypes.rs | 94 ++-
crates/iceberg/src/spec/mod.rs | 21 +-
crates/iceberg/src/spec/partition.rs | 103 ++++
crates/iceberg/src/spec/schema.rs | 124 ++++
crates/iceberg/src/spec/snapshot.rs | 346 +++++++++++
crates/iceberg/src/spec/sort.rs | 133 ++++
crates/iceberg/src/spec/table_metadata.rs | 976 ++++++++++++++++++++++++++++++
9 files changed, 1783 insertions(+), 19 deletions(-)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 0b27a56..053acf8 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -39,8 +39,10 @@ chrono = "0.4"
uuid = "1.4.1"
ordered-float = "3.7.0"
bitvec = "1.0.1"
+serde_repr = "0.1.16"
itertools = "0.11"
bimap = "0.6"
+derive_builder = "0.12.0"
[dev-dependencies]
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index fb9db5b..8e77b97 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -19,6 +19,9 @@
#![deny(missing_docs)]
+#[macro_use]
+extern crate derive_builder;
+
mod error;
pub use error::Error;
pub use error::ErrorKind;
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index 9b4c986..7b24eb0 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -21,10 +21,14 @@
use ::serde::de::{MapAccess, Visitor};
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
+use serde_json::Value as JsonValue;
use std::cell::OnceCell;
+use std::convert::identity;
use std::sync::Arc;
use std::{collections::HashMap, fmt, ops::Index};
+use super::values::Literal;
+
/// Field name for list type.
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
@@ -341,8 +345,8 @@ impl fmt::Display for StructType {
}
}
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
-#[serde(rename_all = "kebab-case")]
+#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
+#[serde(from = "SerdeNestedField", into = "SerdeNestedField")]
/// A struct is a tuple of typed values. Each field in the tuple is named and
has an integer id that is unique in the table schema.
/// Each field can be either optional or required, meaning that values can (or
cannot) be null. Fields may be any type.
/// Fields may have an optional comment or doc string. Fields can have default
values.
@@ -354,17 +358,65 @@ pub struct NestedField {
/// Optional or required
pub required: bool,
/// Datatype
- #[serde(rename = "type")]
pub field_type: Box<Type>,
/// Fields may have an optional comment or doc string.
- #[serde(skip_serializing_if = "Option::is_none")]
pub doc: Option<String>,
/// Used to populate the field’s value for all records that were written
before the field was added to the schema
- #[serde(skip_serializing_if = "Option::is_none")]
- pub initial_default: Option<String>,
+ pub initial_default: Option<Literal>,
/// Used to populate the field’s value for any records written after the
field was added to the schema, if the writer does not supply the field’s value
+ pub write_default: Option<Literal>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+struct SerdeNestedField {
+ pub id: i32,
+ pub name: String,
+ pub required: bool,
+ #[serde(rename = "type")]
+ pub field_type: Box<Type>,
#[serde(skip_serializing_if = "Option::is_none")]
- pub write_default: Option<String>,
+ pub doc: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub initial_default: Option<JsonValue>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub write_default: Option<JsonValue>,
+}
+
+impl From<SerdeNestedField> for NestedField {
+ fn from(value: SerdeNestedField) -> Self {
+ NestedField {
+ id: value.id,
+ name: value.name,
+ required: value.required,
+ initial_default: value.initial_default.and_then(|x| {
+ Literal::try_from_json(x, &value.field_type)
+ .ok()
+ .and_then(identity)
+ }),
+ write_default: value.write_default.and_then(|x| {
+ Literal::try_from_json(x, &value.field_type)
+ .ok()
+ .and_then(identity)
+ }),
+ field_type: value.field_type,
+ doc: value.doc,
+ }
+ }
+}
+
+impl From<NestedField> for SerdeNestedField {
+ fn from(value: NestedField) -> Self {
+ SerdeNestedField {
+ id: value.id,
+ name: value.name,
+ required: value.required,
+ field_type: value.field_type,
+ doc: value.doc,
+ initial_default: value.initial_default.map(|x| (&x).into()),
+ write_default: value.write_default.map(|x| (&x).into()),
+ }
+ }
}
/// Reference to nested field.
@@ -427,14 +479,14 @@ impl NestedField {
}
/// Set the field's initial default value.
- pub fn with_initial_default(mut self, value: impl ToString) -> Self {
- self.initial_default = Some(value.to_string());
+ pub fn with_initial_default(mut self, value: Literal) -> Self {
+ self.initial_default = Some(value);
self
}
/// Set the field's initial default value.
- pub fn with_write_default(mut self, value: impl ToString) -> Self {
- self.write_default = Some(value.to_string());
+ pub fn with_write_default(mut self, value: Literal) -> Self {
+ self.write_default = Some(value);
self
}
}
@@ -581,6 +633,10 @@ pub struct MapType {
#[cfg(test)]
mod tests {
+ use uuid::Uuid;
+
+ use crate::spec::values::PrimitiveLiteral;
+
use super::*;
fn check_type_serde(json: &str, expected_type: Type) {
@@ -685,8 +741,12 @@ mod tests {
Type::Struct(StructType {
fields: vec![
NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Uuid))
-
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
-
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
+
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
+ )))
+
.with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
+
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
+ )))
.into(),
NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)).into(),
],
@@ -749,8 +809,12 @@ mod tests {
let struct_type = Type::Struct(StructType::new(vec![
NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Uuid))
- .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
- .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
+
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
+ )))
+ .with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
+
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
+ )))
.into(),
NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index e87bb1f..dedde72 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -17,7 +17,20 @@
//! Spec for Iceberg.
-pub mod datatypes;
-pub mod schema;
-pub mod transform;
-pub mod values;
+mod datatypes;
+mod partition;
+mod schema;
+mod snapshot;
+mod sort;
+mod table_metadata;
+mod transform;
+mod values;
+
+pub use datatypes::*;
+pub use partition::*;
+pub use schema::*;
+pub use snapshot::*;
+pub use sort::*;
+pub use table_metadata::*;
+pub use transform::*;
+pub use values::*;
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
new file mode 100644
index 0000000..505f248
--- /dev/null
+++ b/crates/iceberg/src/spec/partition.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Partitioning
+*/
+use serde::{Deserialize, Serialize};
+
+use super::transform::Transform;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Partition fields capture the transform from table data to partition values.
+pub struct PartitionField {
+ /// A source column id from the table’s schema
+ pub source_id: i32,
+ /// A partition field id that is used to identify a partition field and is
unique within a partition spec.
+ /// In v2 table metadata, it is unique across all partition specs.
+ pub field_id: i32,
+ /// A partition name.
+ pub name: String,
+ /// A transform that is applied to the source column to produce a
partition value.
+ pub transform: Transform,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default,
Builder)]
+#[serde(rename_all = "kebab-case")]
+#[builder(setter(prefix = "with"))]
+/// Partition spec that defines how to produce a tuple of partition values
from a record.
+pub struct PartitionSpec {
+ /// Identifier for PartitionSpec
+ pub spec_id: i32,
+ /// Details of the partition spec
+ #[builder(setter(each(name = "with_partition_field")))]
+ pub fields: Vec<PartitionField>,
+}
+
+impl PartitionSpec {
+ /// Create partition spec builer
+ pub fn builder() -> PartitionSpecBuilder {
+ PartitionSpecBuilder::default()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn partition_spec() {
+ let sort_order = r#"
+ {
+ "spec-id": 1,
+ "fields": [ {
+ "source-id": 4,
+ "field-id": 1000,
+ "name": "ts_day",
+ "transform": "day"
+ }, {
+ "source-id": 1,
+ "field-id": 1001,
+ "name": "id_bucket",
+ "transform": "bucket[16]"
+ }, {
+ "source-id": 2,
+ "field-id": 1002,
+ "name": "id_truncate",
+ "transform": "truncate[4]"
+ } ]
+ }
+ "#;
+
+ let partition_spec: PartitionSpec =
serde_json::from_str(sort_order).unwrap();
+ assert_eq!(4, partition_spec.fields[0].source_id);
+ assert_eq!(1000, partition_spec.fields[0].field_id);
+ assert_eq!("ts_day", partition_spec.fields[0].name);
+ assert_eq!(Transform::Day, partition_spec.fields[0].transform);
+
+ assert_eq!(1, partition_spec.fields[1].source_id);
+ assert_eq!(1001, partition_spec.fields[1].field_id);
+ assert_eq!("id_bucket", partition_spec.fields[1].name);
+ assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
+
+ assert_eq!(2, partition_spec.fields[2].source_id);
+ assert_eq!(1002, partition_spec.fields[2].field_id);
+ assert_eq!("id_truncate", partition_spec.fields[2].name);
+ assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
+ }
+}
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index e1c6f2c..654a10e 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -609,6 +609,92 @@ impl SchemaVisitor for IndexByName {
}
}
+pub(super) mod _serde {
+ /// This is a helper module that defines types to help with
serialization/deserialization.
+ /// For deserialization the input first gets read into either the
[SchemaV1] or [SchemaV2] struct
+ /// and then converted into the [Schema] struct. Serialization works the
other way around.
+ /// [SchemaV1] and [SchemaV2] are internal struct that are only used for
serialization and deserialization.
+ use serde::{Deserialize, Serialize};
+
+ use crate::{spec::StructType, Error, Result};
+
+ use super::{Schema, DEFAULT_SCHEMA_ID};
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 schema for serialization/deserialization
+ pub(crate) struct SchemaV2 {
+ pub schema_id: i32,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub identifier_field_ids: Option<Vec<i32>>,
+ #[serde(flatten)]
+ pub fields: StructType,
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v1 schema for serialization/deserialization
+ pub(crate) struct SchemaV1 {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub schema_id: Option<i32>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub identifier_field_ids: Option<Vec<i32>>,
+ #[serde(flatten)]
+ pub fields: StructType,
+ }
+
+ impl TryFrom<SchemaV2> for Schema {
+ type Error = Error;
+ fn try_from(value: SchemaV2) -> Result<Self> {
+ dbg!(&value);
+ Schema::builder()
+ .with_schema_id(value.schema_id)
+ .with_fields(value.fields.fields().iter().cloned())
+
.with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default())
+ .build()
+ }
+ }
+
+ impl TryFrom<SchemaV1> for Schema {
+ type Error = Error;
+ fn try_from(value: SchemaV1) -> Result<Self> {
+ Schema::builder()
+ .with_schema_id(value.schema_id.unwrap_or(DEFAULT_SCHEMA_ID))
+ .with_fields(value.fields.fields().iter().cloned())
+
.with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default())
+ .build()
+ }
+ }
+
+ impl From<Schema> for SchemaV2 {
+ fn from(value: Schema) -> Self {
+ SchemaV2 {
+ schema_id: value.schema_id,
+ identifier_field_ids: if value.identifier_field_ids.is_empty()
{
+ None
+ } else {
+ Some(value.identifier_field_ids.into_iter().collect())
+ },
+ fields: value.r#struct,
+ }
+ }
+ }
+
+ impl From<Schema> for SchemaV1 {
+ fn from(value: Schema) -> Self {
+ SchemaV1 {
+ schema_id: Some(value.schema_id),
+ identifier_field_ids: if value.identifier_field_ids.is_empty()
{
+ None
+ } else {
+ Some(value.identifier_field_ids.into_iter().collect())
+ },
+ fields: value.r#struct,
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::spec::datatypes::Type::{List, Map, Primitive, Struct};
@@ -616,6 +702,7 @@ mod tests {
ListType, MapType, NestedField, NestedFieldRef, PrimitiveType,
StructType, Type,
};
use crate::spec::schema::Schema;
+ use crate::spec::schema::_serde::SchemaV2;
use std::collections::HashMap;
#[test]
@@ -639,6 +726,43 @@ mod tests {
assert_eq!(None, schema.field_by_id(3));
}
+ #[test]
+ fn schema() {
+ let record = r#"
+ {
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [ {
+ "id": 1,
+ "name": "id",
+ "required": true,
+ "type": "uuid"
+ }, {
+ "id": 2,
+ "name": "data",
+ "required": false,
+ "type": "int"
+ } ]
+ }
+ "#;
+
+ let result: SchemaV2 = serde_json::from_str(record).unwrap();
+ assert_eq!(1, result.schema_id);
+ assert_eq!(
+ Box::new(Type::Primitive(PrimitiveType::Uuid)),
+ result.fields[0].field_type
+ );
+ assert_eq!(1, result.fields[0].id);
+ assert!(result.fields[0].required);
+
+ assert_eq!(
+ Box::new(Type::Primitive(PrimitiveType::Int)),
+ result.fields[1].field_type
+ );
+ assert_eq!(2, result.fields[1].id);
+ assert!(!result.fields[1].required);
+ }
+
fn table_schema_simple() -> Schema {
Schema::builder()
.with_schema_id(1)
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
new file mode 100644
index 0000000..9a80288
--- /dev/null
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Snapshots
+*/
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use super::table_metadata::SnapshotLog;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "lowercase")]
+/// The operation field is used by some operations, like snapshot expiration,
to skip processing certain snapshots.
+pub enum Operation {
+ /// Only data files were added and no files were removed.
+ Append,
+ /// Data and delete files were added and removed without changing table
data;
+ /// i.e., compaction, changing the data file format, or relocating data
files.
+ Replace,
+ /// Data and delete files were added and removed in a logical overwrite
operation.
+ Overwrite,
+ /// Data files were removed and their contents logically deleted and/or
delete files were added to delete rows.
+ Delete,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Summarises the changes in the snapshot.
+pub struct Summary {
+ /// The type of operation in the snapshot
+ pub operation: Operation,
+ /// Other summary data.
+ #[serde(flatten)]
+ pub other: HashMap<String, String>,
+}
+
+impl Default for Operation {
+ fn default() -> Operation {
+ Self::Append
+ }
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Builder)]
+#[builder(setter(prefix = "with"))]
+/// A snapshot represents the state of a table at some time and is used to
access the complete set of data files in the table.
+pub struct Snapshot {
+ /// A unique long ID
+ snapshot_id: i64,
+ /// The snapshot ID of the snapshot’s parent.
+ /// Omitted for any snapshot with no parent
+ #[builder(default = "None")]
+ parent_snapshot_id: Option<i64>,
+ /// A monotonically increasing long that tracks the order of
+ /// changes to a table.
+ sequence_number: i64,
+ /// A timestamp when the snapshot was created, used for garbage
+ /// collection and table inspection
+ timestamp_ms: i64,
+ /// The location of a manifest list for this snapshot that
+ /// tracks manifest files with additional metadata.
+ manifest_list: ManifestList,
+ /// A string map that summarizes the snapshot changes, including operation.
+ summary: Summary,
+ /// ID of the table’s current schema when the snapshot was created.
+ #[builder(setter(strip_option))]
+ schema_id: Option<i64>,
+}
+
+/// Type to distinguish between a path to a manifestlist file or a vector of
manifestfile locations
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(untagged)]
+pub enum ManifestList {
+ /// Location of manifestlist file
+ ManifestListFile(String),
+ /// Manifestfile locations
+ ManifestFiles(Vec<String>),
+}
+
+impl Snapshot {
+ /// Get the id of the snapshot
+ #[inline]
+ pub fn snapshot_id(&self) -> i64 {
+ self.snapshot_id
+ }
+ /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
+ #[inline]
+ pub fn sequence_number(&self) -> i64 {
+ self.sequence_number
+ }
+ /// Get location of manifest_list file
+ #[inline]
+ pub fn manifest_list(&self) -> &ManifestList {
+ &self.manifest_list
+ }
+ /// Get summary of the snapshot
+ #[inline]
+ pub fn summary(&self) -> &Summary {
+ &self.summary
+ }
+ /// Get the timestamp of when the snapshot was created
+ #[inline]
+ pub fn timestamp(&self) -> i64 {
+ self.timestamp_ms
+ }
+ /// Create snapshot builder
+ pub fn builder() -> SnapshotBuilder {
+ SnapshotBuilder::default()
+ }
+
+ pub(crate) fn log(&self) -> SnapshotLog {
+ SnapshotLog {
+ timestamp_ms: self.timestamp_ms,
+ snapshot_id: self.snapshot_id,
+ }
+ }
+}
+
+pub(super) mod _serde {
+ /// This is a helper module that defines types to help with
serialization/deserialization.
+ /// For deserialization the input first gets read into either the
[SnapshotV1] or [SnapshotV2] struct
+ /// and then converted into the [Snapshot] struct. Serialization works the
other way around.
+ /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used
for serialization and deserialization.
+ use std::collections::HashMap;
+
+ use serde::{Deserialize, Serialize};
+
+ use crate::{Error, ErrorKind};
+
+ use super::{ManifestList, Operation, Snapshot, Summary};
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 snapshot for
serialization/deserialization
+ pub(crate) struct SnapshotV2 {
+ pub snapshot_id: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parent_snapshot_id: Option<i64>,
+ pub sequence_number: i64,
+ pub timestamp_ms: i64,
+ pub manifest_list: String,
+ pub summary: Summary,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub schema_id: Option<i64>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v1 snapshot for
serialization/deserialization
+ pub(crate) struct SnapshotV1 {
+ pub snapshot_id: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parent_snapshot_id: Option<i64>,
+ pub timestamp_ms: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub manifest_list: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub manifests: Option<Vec<String>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub summary: Option<Summary>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub schema_id: Option<i64>,
+ }
+
+ impl From<SnapshotV2> for Snapshot {
+ fn from(v2: SnapshotV2) -> Self {
+ Snapshot {
+ snapshot_id: v2.snapshot_id,
+ parent_snapshot_id: v2.parent_snapshot_id,
+ sequence_number: v2.sequence_number,
+ timestamp_ms: v2.timestamp_ms,
+ manifest_list:
ManifestList::ManifestListFile(v2.manifest_list),
+ summary: v2.summary,
+ schema_id: v2.schema_id,
+ }
+ }
+ }
+
+ impl From<Snapshot> for SnapshotV2 {
+ fn from(v2: Snapshot) -> Self {
+ SnapshotV2 {
+ snapshot_id: v2.snapshot_id,
+ parent_snapshot_id: v2.parent_snapshot_id,
+ sequence_number: v2.sequence_number,
+ timestamp_ms: v2.timestamp_ms,
+ manifest_list: match v2.manifest_list {
+ ManifestList::ManifestListFile(file) => file,
+ ManifestList::ManifestFiles(_) => panic!("Wrong table format
version. Can't convert a list of manifest files into a location of a manifest
file.")
+ },
+ summary: v2.summary,
+ schema_id: v2.schema_id,
+ }
+ }
+ }
+
+ impl TryFrom<SnapshotV1> for Snapshot {
+ type Error = Error;
+
+ fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
+ Ok(Snapshot {
+ snapshot_id: v1.snapshot_id,
+ parent_snapshot_id: v1.parent_snapshot_id,
+ sequence_number: 0,
+ timestamp_ms: v1.timestamp_ms,
+ manifest_list: match (v1.manifest_list, v1.manifests) {
+ (Some(file), _) => ManifestList::ManifestListFile(file),
+ (None, Some(files)) => ManifestList::ManifestFiles(files),
+ (None, None) => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Neither manifestlist file or manifest files are
provided.",
+ ))
+ }
+ },
+ summary: v1.summary.unwrap_or(Summary {
+ operation: Operation::default(),
+ other: HashMap::new(),
+ }),
+ schema_id: v1.schema_id,
+ })
+ }
+ }
+
+ impl From<Snapshot> for SnapshotV1 {
+ fn from(v2: Snapshot) -> Self {
+ let (manifest_list, manifests) = match v2.manifest_list {
+ ManifestList::ManifestListFile(file) => (Some(file), None),
+ ManifestList::ManifestFiles(files) => (None, Some(files)),
+ };
+ SnapshotV1 {
+ snapshot_id: v2.snapshot_id,
+ parent_snapshot_id: v2.parent_snapshot_id,
+ timestamp_ms: v2.timestamp_ms,
+ manifest_list,
+ manifests,
+ summary: Some(v2.summary),
+ schema_id: v2.schema_id,
+ }
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Iceberg tables keep track of branches and tags using snapshot references.
+pub struct SnapshotReference {
+ /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a
branch.
+ pub snapshot_id: i64,
+ #[serde(flatten)]
+ /// Snapshot retention policy
+ pub retention: SnapshotRetention,
+}
+
+impl SnapshotReference {
+ /// Create new snapshot reference
+ pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
+ SnapshotReference {
+ snapshot_id,
+ retention,
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "lowercase", tag = "type")]
+/// The snapshot expiration procedure removes snapshots from table metadata
and applies the table’s retention policy.
+pub enum SnapshotRetention {
+ #[serde(rename_all = "kebab-case")]
+ /// Branches are mutable named references that can be updated by
committing a new snapshot as
+ /// the branch’s referenced snapshot using the Commit Conflict Resolution
and Retry procedures.
+ Branch {
+ /// A positive number for the minimum number of snapshots to keep in a
branch while expiring snapshots.
+ /// Defaults to table property history.expire.min-snapshots-to-keep.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ min_snapshots_to_keep: Option<i32>,
+ /// A positive number for the max age of snapshots to keep when
expiring, including the latest snapshot.
+ /// Defaults to table property history.expire.max-snapshot-age-ms.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ max_snapshot_age_ms: Option<i64>,
+ /// For snapshot references except the main branch, a positive number
for the max age of the snapshot reference to keep while expiring snapshots.
+ /// Defaults to table property history.expire.max-ref-age-ms. The main
branch never expires.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ max_ref_age_ms: Option<i64>,
+ },
+ #[serde(rename_all = "kebab-case")]
+ /// Tags are labels for individual snapshots.
+ Tag {
+ /// For snapshot references except the main branch, a positive number
for the max age of the snapshot reference to keep while expiring snapshots.
+ /// Defaults to table property history.expire.max-ref-age-ms. The main
branch never expires.
+ max_ref_age_ms: i64,
+ },
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use crate::spec::snapshot::{ManifestList, Operation, Snapshot, Summary,
_serde::SnapshotV1};
+
+ #[test]
+ fn schema() {
+ let record = r#"
+ {
+ "snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1515100955770,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://b/wh/.../s1.avro",
+ "schema-id": 0
+ }
+ "#;
+
+ let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
+ .unwrap()
+ .try_into()
+ .unwrap();
+ assert_eq!(3051729675574597004, result.snapshot_id());
+ assert_eq!(1515100955770, result.timestamp());
+ assert_eq!(
+ Summary {
+ operation: Operation::Append,
+ other: HashMap::new()
+ },
+ *result.summary()
+ );
+ assert_eq!(
+
ManifestList::ManifestListFile("s3://b/wh/.../s1.avro".to_string()),
+ *result.manifest_list()
+ );
+ }
+}
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
new file mode 100644
index 0000000..357e68f
--- /dev/null
+++ b/crates/iceberg/src/spec/sort.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Sorting
+*/
+use serde::{Deserialize, Serialize};
+
+use super::transform::Transform;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Sort direction in a partition, either ascending or descending
+pub enum SortDirection {
+ /// Ascending
+ #[serde(rename = "asc")]
+ Ascending,
+ /// Descending
+ #[serde(rename = "desc")]
+ Descending,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Describes the order of null values when sorted.
+pub enum NullOrder {
+ #[serde(rename = "nulls-first")]
+ /// Nulls are stored first
+ First,
+ #[serde(rename = "nulls-last")]
+ /// Nulls are stored last
+ Last,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Entry for every column that is to be sorted
+pub struct SortField {
+ /// A source column id from the table’s schema
+ pub source_id: i64,
+ /// A transform that is used to produce values to be sorted on from the
source column.
+ pub transform: Transform,
+ /// A sort direction, that can only be either asc or desc
+ pub direction: SortDirection,
+ /// A null order that describes the order of null values when sorted.
+ pub null_order: NullOrder,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder)]
+#[serde(rename_all = "kebab-case")]
+#[builder(setter(prefix = "with"))]
+/// A sort order is defined by a sort order id and a list of sort fields.
+/// The order of the sort fields within the list defines the order in which
the sort is applied to the data.
+pub struct SortOrder {
+ /// Identifier for SortOrder, order_id `0` is no sort order.
+ pub order_id: i64,
+ /// Details of the sort
+ #[builder(setter(each(name = "with_sort_field")), default)]
+ pub fields: Vec<SortField>,
+}
+
+impl SortOrder {
+ /// Create sort order builder
+ pub fn builder() -> SortOrderBuilder {
+ SortOrderBuilder::default()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn sort_field() {
+ let sort_field = r#"
+ {
+ "transform": "bucket[4]",
+ "source-id": 3,
+ "direction": "desc",
+ "null-order": "nulls-last"
+ }
+ "#;
+
+ let field: SortField = serde_json::from_str(sort_field).unwrap();
+ assert_eq!(Transform::Bucket(4), field.transform);
+ assert_eq!(3, field.source_id);
+ assert_eq!(SortDirection::Descending, field.direction);
+ assert_eq!(NullOrder::Last, field.null_order);
+ }
+
+ #[test]
+ fn sort_order() {
+ let sort_order = r#"
+ {
+ "order-id": 1,
+ "fields": [ {
+ "transform": "identity",
+ "source-id": 2,
+ "direction": "asc",
+ "null-order": "nulls-first"
+ }, {
+ "transform": "bucket[4]",
+ "source-id": 3,
+ "direction": "desc",
+ "null-order": "nulls-last"
+ } ]
+ }
+ "#;
+
+ let order: SortOrder = serde_json::from_str(sort_order).unwrap();
+ assert_eq!(Transform::Identity, order.fields[0].transform);
+ assert_eq!(2, order.fields[0].source_id);
+ assert_eq!(SortDirection::Ascending, order.fields[0].direction);
+ assert_eq!(NullOrder::First, order.fields[0].null_order);
+
+ assert_eq!(Transform::Bucket(4), order.fields[1].transform);
+ assert_eq!(3, order.fields[1].source_id);
+ assert_eq!(SortDirection::Descending, order.fields[1].direction);
+ assert_eq!(NullOrder::Last, order.fields[1].null_order);
+ }
+}
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
new file mode 100644
index 0000000..40fe11a
--- /dev/null
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -0,0 +1,976 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
+The main struct here is [TableMetadataV2] which defines the data for a table.
+*/
+
+use std::{collections::HashMap, sync::Arc};
+
+use serde::{Deserialize, Serialize};
+use serde_repr::{Deserialize_repr, Serialize_repr};
+use uuid::Uuid;
+
+use crate::{Error, ErrorKind};
+
+use super::{
+ partition::PartitionSpec,
+ schema::Schema,
+ snapshot::{Snapshot, SnapshotReference, SnapshotRetention},
+ sort::SortOrder,
+};
+
+use _serde::TableMetadataEnum;
+
+static MAIN_BRANCH: &str = "main";
+static DEFAULT_SPEC_ID: i32 = 0;
+
+#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
+#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
+/// Fields for the version 2 of the table metadata.
+pub struct TableMetadata {
+ /// Integer Version for the format.
+ format_version: FormatVersion,
+ /// A UUID that identifies the table
+ table_uuid: Uuid,
+ /// Location tables base location
+ location: String,
+ /// The tables highest sequence number
+ last_sequence_number: i64,
+ /// Timestamp in milliseconds from the unix epoch when the table was last
updated.
+ last_updated_ms: i64,
+ /// An integer; the highest assigned column ID for the table.
+ last_column_id: i32,
+ /// A list of schemas, stored as objects with schema-id.
+ schemas: HashMap<i32, Arc<Schema>>,
+ /// ID of the table’s current schema.
+ current_schema_id: i32,
+ /// A list of partition specs, stored as full partition spec objects.
+ partition_specs: HashMap<i32, PartitionSpec>,
+ /// ID of the “current” spec that writers should use by default.
+ default_spec_id: i32,
+ /// An integer; the highest assigned partition field ID across all
partition specs for the table.
+ last_partition_id: i32,
+ ///A string to string map of table properties. This is used to control
settings that
+ /// affect reading and writing and is not intended to be used for
arbitrary metadata.
+ /// For example, commit.retry.num-retries is used to control the number of
commit retries.
+ properties: HashMap<String, String>,
+ /// long ID of the current table snapshot; must be the same as the current
+ /// ID of the main branch in refs.
+ current_snapshot_id: Option<i64>,
+ ///A list of valid snapshots. Valid snapshots are snapshots for which all
+ /// data files exist in the file system. A data file must not be deleted
+ /// from the file system until the last snapshot in which it was listed is
+ /// garbage collected.
+ snapshots: Option<HashMap<i64, Arc<Snapshot>>>,
+ /// A list (optional) of timestamp and snapshot ID pairs that encodes
changes
+ /// to the current snapshot for the table. Each time the
current-snapshot-id
+ /// is changed, a new entry should be added with the last-updated-ms
+ /// and the new current-snapshot-id. When snapshots are expired from
+ /// the list of valid snapshots, all entries before a snapshot that has
+ /// expired should be removed.
+ snapshot_log: Vec<SnapshotLog>,
+
+ /// A list (optional) of timestamp and metadata file location pairs
+ /// that encodes changes to the previous metadata files for the table.
+ /// Each time a new metadata file is created, a new entry of the
+ /// previous metadata file location should be added to the list.
+ /// Tables can be configured to remove oldest metadata log entries and
+ /// keep a fixed-size log of the most recent entries after a commit.
+ metadata_log: Vec<MetadataLog>,
+
+ /// A list of sort orders, stored as full sort order objects.
+ sort_orders: HashMap<i64, SortOrder>,
+ /// Default sort order id of the table. Note that this could be used by
+ /// writers, but is not used when reading because reads use the specs
+ /// stored in manifest files.
+ default_sort_order_id: i64,
+ ///A map of snapshot references. The map keys are the unique snapshot
reference
+ /// names in the table, and the map values are snapshot reference objects.
+ /// There is always a main branch reference pointing to the
current-snapshot-id
+ /// even if the refs map is null.
+ refs: HashMap<String, SnapshotReference>,
+}
+
+impl TableMetadata {
+ /// Get current schema
+ #[inline]
+ pub fn current_schema(&self) -> Result<Arc<Schema>, Error> {
+ self.schemas
+ .get(&self.current_schema_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Schema id {} not found!", self.current_schema_id),
+ )
+ })
+ .cloned()
+ }
+ /// Get default partition spec
+ #[inline]
+ pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
+ self.partition_specs
+ .get(&self.default_spec_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Partition spec id {} not found!",
self.default_spec_id),
+ )
+ })
+ }
+
+ /// Get current snapshot
+ #[inline]
+ pub fn current_snapshot(&self) -> Result<Option<Arc<Snapshot>>, Error> {
+ match (&self.current_snapshot_id, &self.snapshots) {
+ (Some(snapshot_id), Some(snapshots)) =>
Ok(snapshots.get(snapshot_id).cloned()),
+ (Some(-1), None) => Ok(None),
+ (None, None) => Ok(None),
+ (Some(_), None) => Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Snapshot id is provided but there are no
snapshots".to_string(),
+ )),
+ (None, Some(_)) => Err(Error::new(
+ ErrorKind::DataInvalid,
+ "There are snapshots but no snapshot id is
provided".to_string(),
+ )),
+ }
+ }
+
+ /// Append snapshot to table
+ pub fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error>
{
+ self.last_updated_ms = snapshot.timestamp();
+ self.last_sequence_number = snapshot.sequence_number();
+
+ self.refs
+ .entry(MAIN_BRANCH.to_string())
+ .and_modify(|s| {
+ s.snapshot_id = snapshot.snapshot_id();
+ })
+ .or_insert_with(|| {
+ SnapshotReference::new(
+ snapshot.snapshot_id(),
+ SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ )
+ });
+
+ if let Some(snapshots) = &mut self.snapshots {
+ self.snapshot_log.push(snapshot.log());
+ snapshots.insert(snapshot.snapshot_id(), Arc::new(snapshot));
+ } else {
+ if !self.snapshot_log.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Snapshot logs is empty while snapshots is not!",
+ ));
+ }
+
+ self.snapshot_log = vec![snapshot.log()];
+ self.snapshots = Some(HashMap::from_iter(vec![(
+ snapshot.snapshot_id(),
+ Arc::new(snapshot),
+ )]));
+ }
+
+ Ok(())
+ }
+}
+
+pub(super) mod _serde {
+ /// This is a helper module that defines types to help with
serialization/deserialization.
+ /// For deserialization the input first gets read into either the
[TableMetadataV1] or [TableMetadataV2] struct
+ /// and then converted into the [TableMetadata] struct. Serialization
works the other way around.
+ /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are
only used for serialization and deserialization.
+ use std::{collections::HashMap, sync::Arc};
+
+ use serde::{Deserialize, Serialize};
+ use uuid::Uuid;
+
+ use crate::{
+ spec::{
+ schema::_serde::{SchemaV1, SchemaV2},
+ snapshot::_serde::{SnapshotV1, SnapshotV2},
+ PartitionField, PartitionSpec, Schema, SnapshotReference,
SnapshotRetention, SortOrder,
+ },
+ Error,
+ };
+
+ use super::{
+ FormatVersion, MetadataLog, SnapshotLog, TableMetadata,
DEFAULT_SPEC_ID, MAIN_BRANCH,
+ };
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(untagged)]
+ pub(super) enum TableMetadataEnum {
+ V2(TableMetadataV2),
+ V1(TableMetadataV1),
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v2 table metadata for
serialization/deserialization
+ pub(super) struct TableMetadataV2 {
+ pub format_version: VersionNumber<2>,
+ pub table_uuid: Uuid,
+ pub location: String,
+ pub last_sequence_number: i64,
+ pub last_updated_ms: i64,
+ pub last_column_id: i32,
+ pub schemas: Vec<SchemaV2>,
+ pub current_schema_id: i32,
+ pub partition_specs: Vec<PartitionSpec>,
+ pub default_spec_id: i32,
+ pub last_partition_id: i32,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub properties: Option<HashMap<String, String>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub current_snapshot_id: Option<i64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshots: Option<Vec<SnapshotV2>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshot_log: Option<Vec<SnapshotLog>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub metadata_log: Option<Vec<MetadataLog>>,
+ pub sort_orders: Vec<SortOrder>,
+ pub default_sort_order_id: i64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub refs: Option<HashMap<String, SnapshotReference>>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v1 table metadata for
serialization/deserialization
+ pub(super) struct TableMetadataV1 {
+ pub format_version: VersionNumber<1>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub table_uuid: Option<Uuid>,
+ pub location: String,
+ pub last_updated_ms: i64,
+ pub last_column_id: i32,
+ pub schema: SchemaV1,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub schemas: Option<Vec<SchemaV1>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub current_schema_id: Option<i32>,
+ pub partition_spec: Vec<PartitionField>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub partition_specs: Option<Vec<PartitionSpec>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub default_spec_id: Option<i32>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_partition_id: Option<i32>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub properties: Option<HashMap<String, String>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub current_snapshot_id: Option<i64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshots: Option<Vec<SnapshotV1>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub snapshot_log: Option<Vec<SnapshotLog>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub metadata_log: Option<Vec<MetadataLog>>,
+ pub sort_orders: Vec<SortOrder>,
+ pub default_sort_order_id: i64,
+ }
+
+ /// Helper to serialize and deserialize the format version.
+ #[derive(Debug, PartialEq, Eq)]
+ pub(super) struct VersionNumber<const V: u8>;
+
+ impl<const V: u8> Serialize for VersionNumber<V> {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ serializer.serialize_u8(V)
+ }
+ }
+
+ impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let value = u8::deserialize(deserializer)?;
+ if value == V {
+ Ok(VersionNumber::<V>)
+ } else {
+ Err(serde::de::Error::custom("Invalid Version"))
+ }
+ }
+ }
+
+ impl TryFrom<TableMetadataEnum> for TableMetadata {
+ type Error = Error;
+ fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
+ match value {
+ TableMetadataEnum::V2(value) => value.try_into(),
+ TableMetadataEnum::V1(value) => value.try_into(),
+ }
+ }
+ }
+
+ impl From<TableMetadata> for TableMetadataEnum {
+ fn from(value: TableMetadata) -> Self {
+ match value.format_version {
+ FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
+ FormatVersion::V1 => TableMetadataEnum::V1(value.into()),
+ }
+ }
+ }
+
+ impl TryFrom<TableMetadataV2> for TableMetadata {
+ type Error = Error;
+ fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
+ let current_snapshot_id = if let &Some(-1) =
&value.current_snapshot_id {
+ None
+ } else {
+ value.current_snapshot_id
+ };
+ Ok(TableMetadata {
+ format_version: FormatVersion::V2,
+ table_uuid: value.table_uuid,
+ location: value.location,
+ last_sequence_number: value.last_sequence_number,
+ last_updated_ms: value.last_updated_ms,
+ last_column_id: value.last_column_id,
+ schemas: HashMap::from_iter(
+ value
+ .schemas
+ .into_iter()
+ .map(|schema| Ok((schema.schema_id,
Arc::new(schema.try_into()?))))
+ .collect::<Result<Vec<_>, Error>>()?
+ .into_iter(),
+ ),
+ current_schema_id: value.current_schema_id,
+ partition_specs: HashMap::from_iter(
+ value.partition_specs.into_iter().map(|x| (x.spec_id, x)),
+ ),
+ default_spec_id: value.default_spec_id,
+ last_partition_id: value.last_partition_id,
+ properties: value.properties.unwrap_or_default(),
+ current_snapshot_id,
+ snapshots: value.snapshots.map(|snapshots| {
+ HashMap::from_iter(
+ snapshots
+ .into_iter()
+ .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+ )
+ }),
+ snapshot_log: value.snapshot_log.unwrap_or_default(),
+ metadata_log: value.metadata_log.unwrap_or_default(),
+ sort_orders: HashMap::from_iter(
+ value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+ ),
+ default_sort_order_id: value.default_sort_order_id,
+ refs: value.refs.unwrap_or_else(|| {
+ if let Some(snapshot_id) = current_snapshot_id {
+ HashMap::from_iter(vec![(
+ MAIN_BRANCH.to_string(),
+ SnapshotReference {
+ snapshot_id,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ },
+ )])
+ } else {
+ HashMap::new()
+ }
+ }),
+ })
+ }
+ }
+
+ impl TryFrom<TableMetadataV1> for TableMetadata {
+ type Error = Error;
+ fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
+ let schemas = value
+ .schemas
+ .map(|schemas| {
+ Ok::<_, Error>(HashMap::from_iter(
+ schemas
+ .into_iter()
+ .enumerate()
+ .map(|(i, schema)| {
+ Ok((
+ schema.schema_id.unwrap_or(i as i32),
+ Arc::new(schema.try_into()?),
+ ))
+ })
+ .collect::<Result<Vec<_>, Error>>()?
+ .into_iter(),
+ ))
+ })
+ .or_else(|| {
+ Some(value.schema.try_into().map(|schema: Schema| {
+ HashMap::from_iter(vec![(schema.schema_id(),
Arc::new(schema))])
+ }))
+ })
+ .transpose()?
+ .unwrap_or_default();
+ let partition_specs = HashMap::from_iter(
+ value
+ .partition_specs
+ .unwrap_or_else(|| {
+ vec![PartitionSpec {
+ spec_id: DEFAULT_SPEC_ID,
+ fields: value.partition_spec,
+ }]
+ })
+ .into_iter()
+ .map(|x| (x.spec_id, x)),
+ );
+ Ok(TableMetadata {
+ format_version: FormatVersion::V1,
+ table_uuid: value.table_uuid.unwrap_or_default(),
+ location: value.location,
+ last_sequence_number: 0,
+ last_updated_ms: value.last_updated_ms,
+ last_column_id: value.last_column_id,
+ current_schema_id: value
+ .current_schema_id
+ .unwrap_or_else(||
schemas.keys().copied().max().unwrap_or_default()),
+ default_spec_id: value
+ .default_spec_id
+ .unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default()),
+ last_partition_id: value
+ .last_partition_id
+ .unwrap_or_else(||
partition_specs.keys().copied().max().unwrap_or_default()),
+ partition_specs,
+ schemas,
+
+ properties: value.properties.unwrap_or_default(),
+ current_snapshot_id: if let &Some(-1) =
&value.current_snapshot_id {
+ None
+ } else {
+ value.current_snapshot_id
+ },
+ snapshots: value
+ .snapshots
+ .map(|snapshots| {
+ Ok::<_, Error>(HashMap::from_iter(
+ snapshots
+ .into_iter()
+ .map(|x| Ok((x.snapshot_id,
Arc::new(x.try_into()?))))
+ .collect::<Result<Vec<_>, Error>>()?,
+ ))
+ })
+ .transpose()?,
+ snapshot_log: value.snapshot_log.unwrap_or_default(),
+ metadata_log: value.metadata_log.unwrap_or_default(),
+ sort_orders: HashMap::from_iter(
+ value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+ ),
+ default_sort_order_id: value.default_sort_order_id,
+ refs: HashMap::from_iter(vec![(
+ MAIN_BRANCH.to_string(),
+ SnapshotReference {
+ snapshot_id:
value.current_snapshot_id.unwrap_or_default(),
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ },
+ )]),
+ })
+ }
+ }
+
+ impl From<TableMetadata> for TableMetadataV2 {
+ fn from(v: TableMetadata) -> Self {
+ TableMetadataV2 {
+ format_version: VersionNumber::<2>,
+ table_uuid: v.table_uuid,
+ location: v.location,
+ last_sequence_number: v.last_sequence_number,
+ last_updated_ms: v.last_updated_ms,
+ last_column_id: v.last_column_id,
+ schemas: v
+ .schemas
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|schema| schema.as_ref().clone())
+ .into()
+ })
+ .collect(),
+ current_schema_id: v.current_schema_id,
+ partition_specs: v.partition_specs.into_values().collect(),
+ default_spec_id: v.default_spec_id,
+ last_partition_id: v.last_partition_id,
+ properties: if v.properties.is_empty() {
+ None
+ } else {
+ Some(v.properties)
+ },
+ current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
+ snapshots: v.snapshots.map(|snapshots| {
+ snapshots
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
+ .into()
+ })
+ .collect()
+ }),
+ snapshot_log: if v.snapshot_log.is_empty() {
+ None
+ } else {
+ Some(v.snapshot_log)
+ },
+ metadata_log: if v.metadata_log.is_empty() {
+ None
+ } else {
+ Some(v.metadata_log)
+ },
+ sort_orders: v.sort_orders.into_values().collect(),
+ default_sort_order_id: v.default_sort_order_id,
+ refs: Some(v.refs),
+ }
+ }
+ }
+
+ impl From<TableMetadata> for TableMetadataV1 {
+ fn from(v: TableMetadata) -> Self {
+ TableMetadataV1 {
+ format_version: VersionNumber::<1>,
+ table_uuid: Some(v.table_uuid),
+ location: v.location,
+ last_updated_ms: v.last_updated_ms,
+ last_column_id: v.last_column_id,
+ schema: v
+ .schemas
+ .get(&v.current_schema_id)
+ .unwrap()
+ .as_ref()
+ .clone()
+ .into(),
+ schemas: Some(
+ v.schemas
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|schema|
schema.as_ref().clone())
+ .into()
+ })
+ .collect(),
+ ),
+ current_schema_id: Some(v.current_schema_id),
+ partition_spec: v
+ .partition_specs
+ .get(&v.default_spec_id)
+ .map(|x| x.fields.clone())
+ .unwrap_or_default(),
+ partition_specs:
Some(v.partition_specs.into_values().collect()),
+ default_spec_id: Some(v.default_spec_id),
+ last_partition_id: Some(v.last_partition_id),
+ properties: if v.properties.is_empty() {
+ None
+ } else {
+ Some(v.properties)
+ },
+ current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
+ snapshots: v.snapshots.map(|snapshots| {
+ snapshots
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
+ .into()
+ })
+ .collect()
+ }),
+ snapshot_log: if v.snapshot_log.is_empty() {
+ None
+ } else {
+ Some(v.snapshot_log)
+ },
+ metadata_log: if v.metadata_log.is_empty() {
+ None
+ } else {
+ Some(v.metadata_log)
+ },
+ sort_orders: v.sort_orders.into_values().collect(),
+ default_sort_order_id: v.default_sort_order_id,
+ }
+ }
+ }
+}
+
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
+#[repr(u8)]
+/// Iceberg format version
+pub enum FormatVersion {
+ /// Iceberg spec version 1
+ V1 = b'1',
+ /// Iceberg spec version 2
+ V2 = b'2',
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Encodes changes to the previous metadata files for the table
+pub struct MetadataLog {
+ /// The file for the log.
+ pub metadata_file: String,
+ /// Time new metadata was created
+ pub timestamp_ms: i64,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A log of when each snapshot was made.
+pub struct SnapshotLog {
+ /// Id of the snapshot.
+ pub snapshot_id: i64,
+ /// Last updated timestamp
+ pub timestamp_ms: i64,
+}
+
+#[cfg(test)]
+mod tests {
+
+ use std::{collections::HashMap, sync::Arc};
+
+ use anyhow::Result;
+ use uuid::Uuid;
+
+ use pretty_assertions::assert_eq;
+
+ use crate::spec::{
+ table_metadata::TableMetadata, ManifestList, NestedField, Operation,
PartitionField,
+ PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention,
+ SortOrder, Summary, Transform, Type,
+ };
+
+ use super::{FormatVersion, MetadataLog, SnapshotLog};
+
+ fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
+ let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
+ assert_eq!(desered_type, expected_type);
+
+ let sered_json = serde_json::to_string(&expected_type).unwrap();
+ let parsed_json_value =
serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
+
+ assert_eq!(parsed_json_value, desered_type);
+ }
+
+ #[test]
+ fn test_table_data_v2() {
+ let data = r#"
+ {
+ "format-version" : 2,
+ "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
+ "location": "s3://b/wh/data.db/table",
+ "last-sequence-number" : 1,
+ "last-updated-ms": 1515100955770,
+ "last-column-id": 1,
+ "schemas": [
+ {
+ "schema-id" : 1,
+ "type" : "struct",
+ "fields" :[
+ {
+ "id": 1,
+ "name": "struct_name",
+ "required": true,
+ "type": "fixed[1]"
+ }
+ ]
+ }
+ ],
+ "current-schema-id" : 1,
+ "partition-specs": [
+ {
+ "spec-id": 1,
+ "fields": [
+ {
+ "source-id": 4,
+ "field-id": 1000,
+ "name": "ts_day",
+ "transform": "day"
+ }
+ ]
+ }
+ ],
+ "default-spec-id": 1,
+ "last-partition-id": 1000,
+ "properties": {
+ "commit.retry.num-retries": "1"
+ },
+ "metadata-log": [
+ {
+ "metadata-file": "s3://bucket/.../v1.json",
+ "timestamp-ms": 1515100
+ }
+ ],
+ "sort-orders": [],
+ "default-sort-order-id": 0
+ }
+ "#;
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![Arc::new(NestedField::required(
+ 1,
+ "struct_name",
+ Type::Primitive(PrimitiveType::Fixed(1)),
+ ))])
+ .build()
+ .unwrap();
+
+ let partition_spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_partition_field(PartitionField {
+ name: "ts_day".to_string(),
+ transform: Transform::Day,
+ source_id: 4,
+ field_id: 1000,
+ })
+ .build()
+ .unwrap();
+
+ let expected = TableMetadata {
+ format_version: FormatVersion::V2,
+ table_uuid:
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+ location: "s3://b/wh/data.db/table".to_string(),
+ last_updated_ms: 1515100955770,
+ last_column_id: 1,
+ schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+ current_schema_id: 1,
+ partition_specs: HashMap::from_iter(vec![(1, partition_spec)]),
+ default_spec_id: 1,
+ last_partition_id: 1000,
+ default_sort_order_id: 0,
+ sort_orders: HashMap::from_iter(vec![]),
+ snapshots: None,
+ current_snapshot_id: None,
+ last_sequence_number: 1,
+ properties: HashMap::from_iter(vec![(
+ "commit.retry.num-retries".to_string(),
+ "1".to_string(),
+ )]),
+ snapshot_log: Vec::new(),
+ metadata_log: vec![MetadataLog {
+ metadata_file: "s3://bucket/.../v1.json".to_string(),
+ timestamp_ms: 1515100,
+ }],
+ refs: HashMap::new(),
+ };
+
+ check_table_metadata_serde(data, expected);
+ }
+
+ #[test]
+ fn test_table_data_v1() {
+ let data = r#"
+ {
+ "format-version" : 1,
+ "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
+ "location" : "/home/iceberg/warehouse/nyc/taxis",
+ "last-updated-ms" : 1662532818843,
+ "last-column-id" : 5,
+ "schema" : {
+ "type" : "struct",
+ "schema-id" : 0,
+ "fields" : [ {
+ "id" : 1,
+ "name" : "vendor_id",
+ "required" : false,
+ "type" : "long"
+ }, {
+ "id" : 2,
+ "name" : "trip_id",
+ "required" : false,
+ "type" : "long"
+ }, {
+ "id" : 3,
+ "name" : "trip_distance",
+ "required" : false,
+ "type" : "float"
+ }, {
+ "id" : 4,
+ "name" : "fare_amount",
+ "required" : false,
+ "type" : "double"
+ }, {
+ "id" : 5,
+ "name" : "store_and_fwd_flag",
+ "required" : false,
+ "type" : "string"
+ } ]
+ },
+ "partition-spec" : [ {
+ "name" : "vendor_id",
+ "transform" : "identity",
+ "source-id" : 1,
+ "field-id" : 1000
+ } ],
+ "last-partition-id" : 1000,
+ "default-sort-order-id" : 0,
+ "sort-orders" : [ {
+ "order-id" : 0,
+ "fields" : [ ]
+ } ],
+ "properties" : {
+ "owner" : "root"
+ },
+ "current-snapshot-id" : 638933773299822130,
+ "refs" : {
+ "main" : {
+ "snapshot-id" : 638933773299822130,
+ "type" : "branch"
+ }
+ },
+ "snapshots" : [ {
+ "snapshot-id" : 638933773299822130,
+ "timestamp-ms" : 1662532818843,
+ "sequence-number" : 0,
+ "summary" : {
+ "operation" : "append",
+ "spark.app.id" : "local-1662532784305",
+ "added-data-files" : "4",
+ "added-records" : "4",
+ "added-files-size" : "6001"
+ },
+ "manifest-list" :
"/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
+ "schema-id" : 0
+ } ],
+ "snapshot-log" : [ {
+ "timestamp-ms" : 1662532818843,
+ "snapshot-id" : 638933773299822130
+ } ],
+ "metadata-log" : [ {
+ "timestamp-ms" : 1662532805245,
+ "metadata-file" :
"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
+ } ]
+ }
+ "#;
+
+ let schema = Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "vendor_id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "trip_id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 3,
+ "trip_distance",
+ Type::Primitive(PrimitiveType::Float),
+ )),
+ Arc::new(NestedField::optional(
+ 4,
+ "fare_amount",
+ Type::Primitive(PrimitiveType::Double),
+ )),
+ Arc::new(NestedField::optional(
+ 5,
+ "store_and_fwd_flag",
+ Type::Primitive(PrimitiveType::String),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec = PartitionSpec::builder()
+ .with_spec_id(0)
+ .with_partition_field(PartitionField {
+ name: "vendor_id".to_string(),
+ transform: Transform::Identity,
+ source_id: 1,
+ field_id: 1000,
+ })
+ .build()
+ .unwrap();
+
+ let sort_order =
SortOrder::builder().with_order_id(0).build().unwrap();
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(638933773299822130)
+ .with_timestamp_ms(1662532818843)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+
.with_manifest_list(ManifestList::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
+ .with_summary(Summary{operation: Operation::Append, other:
HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])})
+ .build().unwrap();
+
+ let expected = TableMetadata {
+ format_version: FormatVersion::V1,
+ table_uuid:
Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
+ location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
+ last_updated_ms: 1662532818843,
+ last_column_id: 5,
+ schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+ current_schema_id: 0,
+ partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+ default_spec_id: 0,
+ last_partition_id: 1000,
+ default_sort_order_id: 0,
+ sort_orders: HashMap::from_iter(vec![(0, sort_order)]),
+ snapshots: Some(HashMap::from_iter(vec![(638933773299822130,
Arc::new(snapshot))])),
+ current_snapshot_id: Some(638933773299822130),
+ last_sequence_number: 0,
+ properties:
HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]),
+ snapshot_log: vec![SnapshotLog {
+ snapshot_id: 638933773299822130,
+ timestamp_ms: 1662532818843,
+ }],
+ metadata_log:
vec![MetadataLog{metadata_file:"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(),
timestamp_ms: 1662532805245}],
+ refs:
HashMap::from_iter(vec![("main".to_string(),SnapshotReference{snapshot_id:
638933773299822130, retention: SnapshotRetention::Branch {
min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None
}})])
+ };
+
+ check_table_metadata_serde(data, expected);
+ }
+
+ #[test]
+ fn test_invalid_table_uuid() -> Result<()> {
+ let data = r#"
+ {
+ "format-version" : 2,
+ "table-uuid": "xxxx"
+ }
+ "#;
+ assert!(serde_json::from_str::<TableMetadata>(data).is_err());
+ Ok(())
+ }
+ #[test]
+ fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
+ let data = r#"
+ {
+ "format-version" : 1
+ }
+ "#;
+ assert!(serde_json::from_str::<TableMetadata>(data).is_err());
+ Ok(())
+ }
+}