This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bdc66a0  feat: Table metadata (#29)
bdc66a0 is described below

commit bdc66a0e984fdedaa81b252d35c72b3f3cb79047
Author: JanKaul <[email protected]>
AuthorDate: Mon Aug 21 10:53:05 2023 +0200

    feat: Table metadata (#29)
    
    * serde schemav1 & schemav2
    
    * fix default schema id
    
    * implement snapshot
    
    * add partition spec
    
    * add license
    
    * add sortorder
    
    * fix initial & write default
    
    * serialize/deserialize table metadata
    
    * impl table metadata
    
    * fix docs
    
    * fix clippy warnings
    
    * change visibility
    
    * fix rebase
    
    * fix clippy warnings
    
    * fix transform
    
    * introduce static
    
    * fix typo
    
    * change spec export style
    
    * improve table metadata v1 test
    
    * improve table metadata v2 test
    
    * delete temp file
    
    * export all submodule types at spec module
    
    * rename snapshotreference
    
    * rename snapshot retention
    
    * remove option from properties
    
    * remove option from snapshot log
    
    * improve builder
    
    * introduce enum for manifest list files and manifests
    
    * keep retention
    
    * use arc for schema
    
    * use arc for snapshot
    
    * current snapshot returns option
    
    * remove panic from snapshot conversion
    
    * check if current_snapshot_id is -1
    
    * fix schema
    
    * use schema field as fallback in v1 table metadata
    
    * use partition spec as fallback in v1 metadata
    
    * fix parition spec
    
    * introduce _serde module for schema
    
    * introduce _serde module for snapshot
    
    * introduce _serde module for table_metadata
    
    * fix docs
    
    * fix typo
    
    * use minimal table metadata for v1 test
---
 crates/iceberg/Cargo.toml                 |   2 +
 crates/iceberg/src/lib.rs                 |   3 +
 crates/iceberg/src/spec/datatypes.rs      |  94 ++-
 crates/iceberg/src/spec/mod.rs            |  21 +-
 crates/iceberg/src/spec/partition.rs      | 103 ++++
 crates/iceberg/src/spec/schema.rs         | 124 ++++
 crates/iceberg/src/spec/snapshot.rs       | 346 +++++++++++
 crates/iceberg/src/spec/sort.rs           | 133 ++++
 crates/iceberg/src/spec/table_metadata.rs | 976 ++++++++++++++++++++++++++++++
 9 files changed, 1783 insertions(+), 19 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 0b27a56..053acf8 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -39,8 +39,10 @@ chrono = "0.4"
 uuid = "1.4.1"
 ordered-float = "3.7.0"
 bitvec = "1.0.1"
+serde_repr = "0.1.16"
 itertools = "0.11"
 bimap = "0.6"
+derive_builder = "0.12.0"
 
 
 [dev-dependencies]
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index fb9db5b..8e77b97 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -19,6 +19,9 @@
 
 #![deny(missing_docs)]
 
+#[macro_use]
+extern crate derive_builder;
+
 mod error;
 pub use error::Error;
 pub use error::ErrorKind;
diff --git a/crates/iceberg/src/spec/datatypes.rs 
b/crates/iceberg/src/spec/datatypes.rs
index 9b4c986..7b24eb0 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -21,10 +21,14 @@
 use ::serde::de::{MapAccess, Visitor};
 use serde::de::{Error, IntoDeserializer};
 use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
+use serde_json::Value as JsonValue;
 use std::cell::OnceCell;
+use std::convert::identity;
 use std::sync::Arc;
 use std::{collections::HashMap, fmt, ops::Index};
 
+use super::values::Literal;
+
 /// Field name for list type.
 pub(crate) const LIST_FILED_NAME: &str = "element";
 pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
@@ -341,8 +345,8 @@ impl fmt::Display for StructType {
     }
 }
 
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
-#[serde(rename_all = "kebab-case")]
+#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
+#[serde(from = "SerdeNestedField", into = "SerdeNestedField")]
 /// A struct is a tuple of typed values. Each field in the tuple is named and 
has an integer id that is unique in the table schema.
 /// Each field can be either optional or required, meaning that values can (or 
cannot) be null. Fields may be any type.
 /// Fields may have an optional comment or doc string. Fields can have default 
values.
@@ -354,17 +358,65 @@ pub struct NestedField {
     /// Optional or required
     pub required: bool,
     /// Datatype
-    #[serde(rename = "type")]
     pub field_type: Box<Type>,
     /// Fields may have an optional comment or doc string.
-    #[serde(skip_serializing_if = "Option::is_none")]
     pub doc: Option<String>,
     /// Used to populate the field’s value for all records that were written 
before the field was added to the schema
-    #[serde(skip_serializing_if = "Option::is_none")]
-    pub initial_default: Option<String>,
+    pub initial_default: Option<Literal>,
     /// Used to populate the field’s value for any records written after the 
field was added to the schema, if the writer does not supply the field’s value
+    pub write_default: Option<Literal>,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+struct SerdeNestedField {
+    pub id: i32,
+    pub name: String,
+    pub required: bool,
+    #[serde(rename = "type")]
+    pub field_type: Box<Type>,
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub write_default: Option<String>,
+    pub doc: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub initial_default: Option<JsonValue>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub write_default: Option<JsonValue>,
+}
+
+impl From<SerdeNestedField> for NestedField {
+    fn from(value: SerdeNestedField) -> Self {
+        NestedField {
+            id: value.id,
+            name: value.name,
+            required: value.required,
+            initial_default: value.initial_default.and_then(|x| {
+                Literal::try_from_json(x, &value.field_type)
+                    .ok()
+                    .and_then(identity)
+            }),
+            write_default: value.write_default.and_then(|x| {
+                Literal::try_from_json(x, &value.field_type)
+                    .ok()
+                    .and_then(identity)
+            }),
+            field_type: value.field_type,
+            doc: value.doc,
+        }
+    }
+}
+
+impl From<NestedField> for SerdeNestedField {
+    fn from(value: NestedField) -> Self {
+        SerdeNestedField {
+            id: value.id,
+            name: value.name,
+            required: value.required,
+            field_type: value.field_type,
+            doc: value.doc,
+            initial_default: value.initial_default.map(|x| (&x).into()),
+            write_default: value.write_default.map(|x| (&x).into()),
+        }
+    }
 }
 
 /// Reference to nested field.
@@ -427,14 +479,14 @@ impl NestedField {
     }
 
     /// Set the field's initial default value.
-    pub fn with_initial_default(mut self, value: impl ToString) -> Self {
-        self.initial_default = Some(value.to_string());
+    pub fn with_initial_default(mut self, value: Literal) -> Self {
+        self.initial_default = Some(value);
         self
     }
 
     /// Set the field's initial default value.
-    pub fn with_write_default(mut self, value: impl ToString) -> Self {
-        self.write_default = Some(value.to_string());
+    pub fn with_write_default(mut self, value: Literal) -> Self {
+        self.write_default = Some(value);
         self
     }
 }
@@ -581,6 +633,10 @@ pub struct MapType {
 
 #[cfg(test)]
 mod tests {
+    use uuid::Uuid;
+
+    use crate::spec::values::PrimitiveLiteral;
+
     use super::*;
 
     fn check_type_serde(json: &str, expected_type: Type) {
@@ -685,8 +741,12 @@ mod tests {
             Type::Struct(StructType {
                 fields: vec![
                     NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Uuid))
-                        
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
-                        
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+                        
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
+                            
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
+                        )))
+                        
.with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
+                            
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
+                        )))
                         .into(),
                     NestedField::optional(2, "data", 
Type::Primitive(PrimitiveType::Int)).into(),
                 ],
@@ -749,8 +809,12 @@ mod tests {
 
         let struct_type = Type::Struct(StructType::new(vec![
             NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Uuid))
-                .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
-                .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+                
.with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID(
+                    
Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(),
+                )))
+                .with_write_default(Literal::Primitive(PrimitiveLiteral::UUID(
+                    
Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(),
+                )))
                 .into(),
             NestedField::optional(2, "data", 
Type::Primitive(PrimitiveType::Int)).into(),
             NestedField::required(
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index e87bb1f..dedde72 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -17,7 +17,20 @@
 
 //! Spec for Iceberg.
 
-pub mod datatypes;
-pub mod schema;
-pub mod transform;
-pub mod values;
+mod datatypes;
+mod partition;
+mod schema;
+mod snapshot;
+mod sort;
+mod table_metadata;
+mod transform;
+mod values;
+
+pub use datatypes::*;
+pub use partition::*;
+pub use schema::*;
+pub use snapshot::*;
+pub use sort::*;
+pub use table_metadata::*;
+pub use transform::*;
+pub use values::*;
diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
new file mode 100644
index 0000000..505f248
--- /dev/null
+++ b/crates/iceberg/src/spec/partition.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Partitioning
+*/
+use serde::{Deserialize, Serialize};
+
+use super::transform::Transform;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Partition fields capture the transform from table data to partition values.
+pub struct PartitionField {
+    /// A source column id from the table’s schema
+    pub source_id: i32,
+    /// A partition field id that is used to identify a partition field and is 
unique within a partition spec.
+    /// In v2 table metadata, it is unique across all partition specs.
+    pub field_id: i32,
+    /// A partition name.
+    pub name: String,
+    /// A transform that is applied to the source column to produce a 
partition value.
+    pub transform: Transform,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, 
Builder)]
+#[serde(rename_all = "kebab-case")]
+#[builder(setter(prefix = "with"))]
+///  Partition spec that defines how to produce a tuple of partition values 
from a record.
+pub struct PartitionSpec {
+    /// Identifier for PartitionSpec
+    pub spec_id: i32,
+    /// Details of the partition spec
+    #[builder(setter(each(name = "with_partition_field")))]
+    pub fields: Vec<PartitionField>,
+}
+
+impl PartitionSpec {
+    /// Create partition spec builer
+    pub fn builder() -> PartitionSpecBuilder {
+        PartitionSpecBuilder::default()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn partition_spec() {
+        let sort_order = r#"
+        {
+        "spec-id": 1,
+        "fields": [ {
+            "source-id": 4,
+            "field-id": 1000,
+            "name": "ts_day",
+            "transform": "day"
+            }, {
+            "source-id": 1,
+            "field-id": 1001,
+            "name": "id_bucket",
+            "transform": "bucket[16]"
+            }, {
+            "source-id": 2,
+            "field-id": 1002,
+            "name": "id_truncate",
+            "transform": "truncate[4]"
+            } ]
+        }
+        "#;
+
+        let partition_spec: PartitionSpec = 
serde_json::from_str(sort_order).unwrap();
+        assert_eq!(4, partition_spec.fields[0].source_id);
+        assert_eq!(1000, partition_spec.fields[0].field_id);
+        assert_eq!("ts_day", partition_spec.fields[0].name);
+        assert_eq!(Transform::Day, partition_spec.fields[0].transform);
+
+        assert_eq!(1, partition_spec.fields[1].source_id);
+        assert_eq!(1001, partition_spec.fields[1].field_id);
+        assert_eq!("id_bucket", partition_spec.fields[1].name);
+        assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
+
+        assert_eq!(2, partition_spec.fields[2].source_id);
+        assert_eq!(1002, partition_spec.fields[2].field_id);
+        assert_eq!("id_truncate", partition_spec.fields[2].name);
+        assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
+    }
+}
diff --git a/crates/iceberg/src/spec/schema.rs 
b/crates/iceberg/src/spec/schema.rs
index e1c6f2c..654a10e 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -609,6 +609,92 @@ impl SchemaVisitor for IndexByName {
     }
 }
 
+pub(super) mod _serde {
+    /// This is a helper module that defines types to help with 
serialization/deserialization.
+    /// For deserialization the input first gets read into either the 
[SchemaV1] or [SchemaV2] struct
+    /// and then converted into the [Schema] struct. Serialization works the 
other way around.
+    /// [SchemaV1] and [SchemaV2] are internal struct that are only used for 
serialization and deserialization.
+    use serde::{Deserialize, Serialize};
+
+    use crate::{spec::StructType, Error, Result};
+
+    use super::{Schema, DEFAULT_SCHEMA_ID};
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 schema for serialization/deserialization
+    pub(crate) struct SchemaV2 {
+        pub schema_id: i32,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub identifier_field_ids: Option<Vec<i32>>,
+        #[serde(flatten)]
+        pub fields: StructType,
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v1 schema for serialization/deserialization
+    pub(crate) struct SchemaV1 {
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub schema_id: Option<i32>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub identifier_field_ids: Option<Vec<i32>>,
+        #[serde(flatten)]
+        pub fields: StructType,
+    }
+
+    impl TryFrom<SchemaV2> for Schema {
+        type Error = Error;
+        fn try_from(value: SchemaV2) -> Result<Self> {
+            dbg!(&value);
+            Schema::builder()
+                .with_schema_id(value.schema_id)
+                .with_fields(value.fields.fields().iter().cloned())
+                
.with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default())
+                .build()
+        }
+    }
+
+    impl TryFrom<SchemaV1> for Schema {
+        type Error = Error;
+        fn try_from(value: SchemaV1) -> Result<Self> {
+            Schema::builder()
+                .with_schema_id(value.schema_id.unwrap_or(DEFAULT_SCHEMA_ID))
+                .with_fields(value.fields.fields().iter().cloned())
+                
.with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default())
+                .build()
+        }
+    }
+
+    impl From<Schema> for SchemaV2 {
+        fn from(value: Schema) -> Self {
+            SchemaV2 {
+                schema_id: value.schema_id,
+                identifier_field_ids: if value.identifier_field_ids.is_empty() 
{
+                    None
+                } else {
+                    Some(value.identifier_field_ids.into_iter().collect())
+                },
+                fields: value.r#struct,
+            }
+        }
+    }
+
+    impl From<Schema> for SchemaV1 {
+        fn from(value: Schema) -> Self {
+            SchemaV1 {
+                schema_id: Some(value.schema_id),
+                identifier_field_ids: if value.identifier_field_ids.is_empty() 
{
+                    None
+                } else {
+                    Some(value.identifier_field_ids.into_iter().collect())
+                },
+                fields: value.r#struct,
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::spec::datatypes::Type::{List, Map, Primitive, Struct};
@@ -616,6 +702,7 @@ mod tests {
         ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, 
StructType, Type,
     };
     use crate::spec::schema::Schema;
+    use crate::spec::schema::_serde::SchemaV2;
     use std::collections::HashMap;
 
     #[test]
@@ -639,6 +726,43 @@ mod tests {
         assert_eq!(None, schema.field_by_id(3));
     }
 
+    #[test]
+    fn schema() {
+        let record = r#"
+        {
+            "type": "struct",
+            "schema-id": 1,
+            "fields": [ {
+            "id": 1,
+            "name": "id",
+            "required": true,
+            "type": "uuid"
+            }, {
+            "id": 2,
+            "name": "data",
+            "required": false,
+            "type": "int"
+            } ]
+            }
+        "#;
+
+        let result: SchemaV2 = serde_json::from_str(record).unwrap();
+        assert_eq!(1, result.schema_id);
+        assert_eq!(
+            Box::new(Type::Primitive(PrimitiveType::Uuid)),
+            result.fields[0].field_type
+        );
+        assert_eq!(1, result.fields[0].id);
+        assert!(result.fields[0].required);
+
+        assert_eq!(
+            Box::new(Type::Primitive(PrimitiveType::Int)),
+            result.fields[1].field_type
+        );
+        assert_eq!(2, result.fields[1].id);
+        assert!(!result.fields[1].required);
+    }
+
     fn table_schema_simple() -> Schema {
         Schema::builder()
             .with_schema_id(1)
diff --git a/crates/iceberg/src/spec/snapshot.rs 
b/crates/iceberg/src/spec/snapshot.rs
new file mode 100644
index 0000000..9a80288
--- /dev/null
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Snapshots
+*/
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+
+use super::table_metadata::SnapshotLog;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "lowercase")]
+/// The operation field is used by some operations, like snapshot expiration, 
to skip processing certain snapshots.
+pub enum Operation {
+    /// Only data files were added and no files were removed.
+    Append,
+    /// Data and delete files were added and removed without changing table 
data;
+    /// i.e., compaction, changing the data file format, or relocating data 
files.
+    Replace,
+    /// Data and delete files were added and removed in a logical overwrite 
operation.
+    Overwrite,
+    /// Data files were removed and their contents logically deleted and/or 
delete files were added to delete rows.
+    Delete,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Summarises the changes in the snapshot.
+pub struct Summary {
+    /// The type of operation in the snapshot
+    pub operation: Operation,
+    /// Other summary data.
+    #[serde(flatten)]
+    pub other: HashMap<String, String>,
+}
+
+impl Default for Operation {
+    fn default() -> Operation {
+        Self::Append
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Builder)]
+#[builder(setter(prefix = "with"))]
+/// A snapshot represents the state of a table at some time and is used to 
access the complete set of data files in the table.
+pub struct Snapshot {
+    /// A unique long ID
+    snapshot_id: i64,
+    /// The snapshot ID of the snapshot’s parent.
+    /// Omitted for any snapshot with no parent
+    #[builder(default = "None")]
+    parent_snapshot_id: Option<i64>,
+    /// A monotonically increasing long that tracks the order of
+    /// changes to a table.
+    sequence_number: i64,
+    /// A timestamp when the snapshot was created, used for garbage
+    /// collection and table inspection
+    timestamp_ms: i64,
+    /// The location of a manifest list for this snapshot that
+    /// tracks manifest files with additional metadata.
+    manifest_list: ManifestList,
+    /// A string map that summarizes the snapshot changes, including operation.
+    summary: Summary,
+    /// ID of the table’s current schema when the snapshot was created.
+    #[builder(setter(strip_option))]
+    schema_id: Option<i64>,
+}
+
+/// Type to distinguish between a path to a manifestlist file or a vector of 
manifestfile locations
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(untagged)]
+pub enum ManifestList {
+    /// Location of manifestlist file
+    ManifestListFile(String),
+    /// Manifestfile locations
+    ManifestFiles(Vec<String>),
+}
+
+impl Snapshot {
+    /// Get the id of the snapshot
+    #[inline]
+    pub fn snapshot_id(&self) -> i64 {
+        self.snapshot_id
+    }
+    /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables.
+    #[inline]
+    pub fn sequence_number(&self) -> i64 {
+        self.sequence_number
+    }
+    /// Get location of manifest_list file
+    #[inline]
+    pub fn manifest_list(&self) -> &ManifestList {
+        &self.manifest_list
+    }
+    /// Get summary of the snapshot
+    #[inline]
+    pub fn summary(&self) -> &Summary {
+        &self.summary
+    }
+    /// Get the timestamp of when the snapshot was created
+    #[inline]
+    pub fn timestamp(&self) -> i64 {
+        self.timestamp_ms
+    }
+    /// Create snapshot builder
+    pub fn builder() -> SnapshotBuilder {
+        SnapshotBuilder::default()
+    }
+
+    pub(crate) fn log(&self) -> SnapshotLog {
+        SnapshotLog {
+            timestamp_ms: self.timestamp_ms,
+            snapshot_id: self.snapshot_id,
+        }
+    }
+}
+
+pub(super) mod _serde {
+    /// This is a helper module that defines types to help with 
serialization/deserialization.
+    /// For deserialization the input first gets read into either the 
[SnapshotV1] or [SnapshotV2] struct
+    /// and then converted into the [Snapshot] struct. Serialization works the 
other way around.
+    /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used 
for serialization and deserialization.
+    use std::collections::HashMap;
+
+    use serde::{Deserialize, Serialize};
+
+    use crate::{Error, ErrorKind};
+
+    use super::{ManifestList, Operation, Snapshot, Summary};
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 snapshot for 
serialization/deserialization
+    pub(crate) struct SnapshotV2 {
+        pub snapshot_id: i64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub parent_snapshot_id: Option<i64>,
+        pub sequence_number: i64,
+        pub timestamp_ms: i64,
+        pub manifest_list: String,
+        pub summary: Summary,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub schema_id: Option<i64>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v1 snapshot for 
serialization/deserialization
+    pub(crate) struct SnapshotV1 {
+        pub snapshot_id: i64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub parent_snapshot_id: Option<i64>,
+        pub timestamp_ms: i64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub manifest_list: Option<String>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub manifests: Option<Vec<String>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub summary: Option<Summary>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub schema_id: Option<i64>,
+    }
+
+    impl From<SnapshotV2> for Snapshot {
+        fn from(v2: SnapshotV2) -> Self {
+            Snapshot {
+                snapshot_id: v2.snapshot_id,
+                parent_snapshot_id: v2.parent_snapshot_id,
+                sequence_number: v2.sequence_number,
+                timestamp_ms: v2.timestamp_ms,
+                manifest_list: 
ManifestList::ManifestListFile(v2.manifest_list),
+                summary: v2.summary,
+                schema_id: v2.schema_id,
+            }
+        }
+    }
+
+    impl From<Snapshot> for SnapshotV2 {
+        fn from(v2: Snapshot) -> Self {
+            SnapshotV2 {
+            snapshot_id: v2.snapshot_id,
+            parent_snapshot_id: v2.parent_snapshot_id,
+            sequence_number: v2.sequence_number,
+            timestamp_ms: v2.timestamp_ms,
+            manifest_list: match v2.manifest_list {
+                ManifestList::ManifestListFile(file) => file,
+                ManifestList::ManifestFiles(_) => panic!("Wrong table format 
version. Can't convert a list of manifest files into a location of a manifest 
file.")
+            },
+            summary: v2.summary,
+            schema_id: v2.schema_id,
+        }
+        }
+    }
+
+    impl TryFrom<SnapshotV1> for Snapshot {
+        type Error = Error;
+
+        fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
+            Ok(Snapshot {
+                snapshot_id: v1.snapshot_id,
+                parent_snapshot_id: v1.parent_snapshot_id,
+                sequence_number: 0,
+                timestamp_ms: v1.timestamp_ms,
+                manifest_list: match (v1.manifest_list, v1.manifests) {
+                    (Some(file), _) => ManifestList::ManifestListFile(file),
+                    (None, Some(files)) => ManifestList::ManifestFiles(files),
+                    (None, None) => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Neither manifestlist file or manifest files are 
provided.",
+                        ))
+                    }
+                },
+                summary: v1.summary.unwrap_or(Summary {
+                    operation: Operation::default(),
+                    other: HashMap::new(),
+                }),
+                schema_id: v1.schema_id,
+            })
+        }
+    }
+
+    impl From<Snapshot> for SnapshotV1 {
+        fn from(v2: Snapshot) -> Self {
+            let (manifest_list, manifests) = match v2.manifest_list {
+                ManifestList::ManifestListFile(file) => (Some(file), None),
+                ManifestList::ManifestFiles(files) => (None, Some(files)),
+            };
+            SnapshotV1 {
+                snapshot_id: v2.snapshot_id,
+                parent_snapshot_id: v2.parent_snapshot_id,
+                timestamp_ms: v2.timestamp_ms,
+                manifest_list,
+                manifests,
+                summary: Some(v2.summary),
+                schema_id: v2.schema_id,
+            }
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Iceberg tables keep track of branches and tags using snapshot references.
+pub struct SnapshotReference {
+    /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a 
branch.
+    pub snapshot_id: i64,
+    #[serde(flatten)]
+    /// Snapshot retention policy
+    pub retention: SnapshotRetention,
+}
+
+impl SnapshotReference {
+    /// Create new snapshot reference
+    pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
+        SnapshotReference {
+            snapshot_id,
+            retention,
+        }
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "lowercase", tag = "type")]
+/// The snapshot expiration procedure removes snapshots from table metadata 
and applies the table’s retention policy.
+pub enum SnapshotRetention {
+    #[serde(rename_all = "kebab-case")]
+    /// Branches are mutable named references that can be updated by 
committing a new snapshot as
+    /// the branch’s referenced snapshot using the Commit Conflict Resolution 
and Retry procedures.
+    Branch {
+        /// A positive number for the minimum number of snapshots to keep in a 
branch while expiring snapshots.
+        /// Defaults to table property history.expire.min-snapshots-to-keep.
+        #[serde(skip_serializing_if = "Option::is_none")]
+        min_snapshots_to_keep: Option<i32>,
+        /// A positive number for the max age of snapshots to keep when 
expiring, including the latest snapshot.
+        /// Defaults to table property history.expire.max-snapshot-age-ms.
+        #[serde(skip_serializing_if = "Option::is_none")]
+        max_snapshot_age_ms: Option<i64>,
+        /// For snapshot references except the main branch, a positive number 
for the max age of the snapshot reference to keep while expiring snapshots.
+        /// Defaults to table property history.expire.max-ref-age-ms. The main 
branch never expires.
+        #[serde(skip_serializing_if = "Option::is_none")]
+        max_ref_age_ms: Option<i64>,
+    },
+    #[serde(rename_all = "kebab-case")]
+    /// Tags are labels for individual snapshots.
+    Tag {
+        /// For snapshot references except the main branch, a positive number 
for the max age of the snapshot reference to keep while expiring snapshots.
+        /// Defaults to table property history.expire.max-ref-age-ms. The main 
branch never expires.
+        max_ref_age_ms: i64,
+    },
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+
+    use crate::spec::snapshot::{ManifestList, Operation, Snapshot, Summary, 
_serde::SnapshotV1};
+
+    #[test]
+    fn schema() {
+        let record = r#"
+        {
+            "snapshot-id": 3051729675574597004,
+            "timestamp-ms": 1515100955770,
+            "summary": {
+                "operation": "append"
+            },
+            "manifest-list": "s3://b/wh/.../s1.avro",
+            "schema-id": 0
+        }
+        "#;
+
+        let result: Snapshot = serde_json::from_str::<SnapshotV1>(record)
+            .unwrap()
+            .try_into()
+            .unwrap();
+        assert_eq!(3051729675574597004, result.snapshot_id());
+        assert_eq!(1515100955770, result.timestamp());
+        assert_eq!(
+            Summary {
+                operation: Operation::Append,
+                other: HashMap::new()
+            },
+            *result.summary()
+        );
+        assert_eq!(
+            
ManifestList::ManifestListFile("s3://b/wh/.../s1.avro".to_string()),
+            *result.manifest_list()
+        );
+    }
+}
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
new file mode 100644
index 0000000..357e68f
--- /dev/null
+++ b/crates/iceberg/src/spec/sort.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Sorting
+*/
+use serde::{Deserialize, Serialize};
+
+use super::transform::Transform;
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Sort direction in a partition, either ascending or descending
+pub enum SortDirection {
+    /// Ascending
+    #[serde(rename = "asc")]
+    Ascending,
+    /// Descending
+    #[serde(rename = "desc")]
+    Descending,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+/// Describes the order of null values when sorted.
+pub enum NullOrder {
+    #[serde(rename = "nulls-first")]
+    /// Nulls are stored first
+    First,
+    #[serde(rename = "nulls-last")]
+    /// Nulls are stored last
+    Last,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Entry for every column that is to be sorted
+pub struct SortField {
+    /// A source column id from the table’s schema
+    pub source_id: i64,
+    /// A transform that is used to produce values to be sorted on from the 
source column.
+    pub transform: Transform,
+    /// A sort direction, that can only be either asc or desc
+    pub direction: SortDirection,
+    /// A null order that describes the order of null values when sorted.
+    pub null_order: NullOrder,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder)]
+#[serde(rename_all = "kebab-case")]
+#[builder(setter(prefix = "with"))]
+/// A sort order is defined by a sort order id and a list of sort fields.
+/// The order of the sort fields within the list defines the order in which 
the sort is applied to the data.
+pub struct SortOrder {
+    /// Identifier for SortOrder, order_id `0` is no sort order.
+    pub order_id: i64,
+    /// Details of the sort
+    #[builder(setter(each(name = "with_sort_field")), default)]
+    pub fields: Vec<SortField>,
+}
+
+impl SortOrder {
+    /// Create sort order builder
+    pub fn builder() -> SortOrderBuilder {
+        SortOrderBuilder::default()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn sort_field() {
+        let sort_field = r#"
+        {
+            "transform": "bucket[4]",
+            "source-id": 3,
+            "direction": "desc",
+            "null-order": "nulls-last"
+         }
+        "#;
+
+        let field: SortField = serde_json::from_str(sort_field).unwrap();
+        assert_eq!(Transform::Bucket(4), field.transform);
+        assert_eq!(3, field.source_id);
+        assert_eq!(SortDirection::Descending, field.direction);
+        assert_eq!(NullOrder::Last, field.null_order);
+    }
+
+    #[test]
+    fn sort_order() {
+        let sort_order = r#"
+        {
+        "order-id": 1,
+        "fields": [ {
+            "transform": "identity",
+            "source-id": 2,
+            "direction": "asc",
+            "null-order": "nulls-first"
+         }, {
+            "transform": "bucket[4]",
+            "source-id": 3,
+            "direction": "desc",
+            "null-order": "nulls-last"
+         } ]
+        }
+        "#;
+
+        let order: SortOrder = serde_json::from_str(sort_order).unwrap();
+        assert_eq!(Transform::Identity, order.fields[0].transform);
+        assert_eq!(2, order.fields[0].source_id);
+        assert_eq!(SortDirection::Ascending, order.fields[0].direction);
+        assert_eq!(NullOrder::First, order.fields[0].null_order);
+
+        assert_eq!(Transform::Bucket(4), order.fields[1].transform);
+        assert_eq!(3, order.fields[1].source_id);
+        assert_eq!(SortDirection::Descending, order.fields[1].direction);
+        assert_eq!(NullOrder::Last, order.fields[1].null_order);
+    }
+}
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
new file mode 100644
index 0000000..40fe11a
--- /dev/null
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -0,0 +1,976 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata).
+The main struct here is [TableMetadataV2] which defines the data for a table.
+*/
+
+use std::{collections::HashMap, sync::Arc};
+
+use serde::{Deserialize, Serialize};
+use serde_repr::{Deserialize_repr, Serialize_repr};
+use uuid::Uuid;
+
+use crate::{Error, ErrorKind};
+
+use super::{
+    partition::PartitionSpec,
+    schema::Schema,
+    snapshot::{Snapshot, SnapshotReference, SnapshotRetention},
+    sort::SortOrder,
+};
+
+use _serde::TableMetadataEnum;
+
+static MAIN_BRANCH: &str = "main";
+static DEFAULT_SPEC_ID: i32 = 0;
+
+#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
+#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
+/// Fields for the version 2 of the table metadata.
+pub struct TableMetadata {
+    /// Integer Version for the format.
+    format_version: FormatVersion,
+    /// A UUID that identifies the table
+    table_uuid: Uuid,
+    /// Location tables base location
+    location: String,
+    /// The tables highest sequence number
+    last_sequence_number: i64,
+    /// Timestamp in milliseconds from the unix epoch when the table was last 
updated.
+    last_updated_ms: i64,
+    /// An integer; the highest assigned column ID for the table.
+    last_column_id: i32,
+    /// A list of schemas, stored as objects with schema-id.
+    schemas: HashMap<i32, Arc<Schema>>,
+    /// ID of the table’s current schema.
+    current_schema_id: i32,
+    /// A list of partition specs, stored as full partition spec objects.
+    partition_specs: HashMap<i32, PartitionSpec>,
+    /// ID of the “current” spec that writers should use by default.
+    default_spec_id: i32,
+    /// An integer; the highest assigned partition field ID across all 
partition specs for the table.
+    last_partition_id: i32,
+    ///A string to string map of table properties. This is used to control 
settings that
+    /// affect reading and writing and is not intended to be used for 
arbitrary metadata.
+    /// For example, commit.retry.num-retries is used to control the number of 
commit retries.
+    properties: HashMap<String, String>,
+    /// long ID of the current table snapshot; must be the same as the current
+    /// ID of the main branch in refs.
+    current_snapshot_id: Option<i64>,
+    ///A list of valid snapshots. Valid snapshots are snapshots for which all
+    /// data files exist in the file system. A data file must not be deleted
+    /// from the file system until the last snapshot in which it was listed is
+    /// garbage collected.
+    snapshots: Option<HashMap<i64, Arc<Snapshot>>>,
+    /// A list (optional) of timestamp and snapshot ID pairs that encodes 
changes
+    /// to the current snapshot for the table. Each time the 
current-snapshot-id
+    /// is changed, a new entry should be added with the last-updated-ms
+    /// and the new current-snapshot-id. When snapshots are expired from
+    /// the list of valid snapshots, all entries before a snapshot that has
+    /// expired should be removed.
+    snapshot_log: Vec<SnapshotLog>,
+
+    /// A list (optional) of timestamp and metadata file location pairs
+    /// that encodes changes to the previous metadata files for the table.
+    /// Each time a new metadata file is created, a new entry of the
+    /// previous metadata file location should be added to the list.
+    /// Tables can be configured to remove oldest metadata log entries and
+    /// keep a fixed-size log of the most recent entries after a commit.
+    metadata_log: Vec<MetadataLog>,
+
+    /// A list of sort orders, stored as full sort order objects.
+    sort_orders: HashMap<i64, SortOrder>,
+    /// Default sort order id of the table. Note that this could be used by
+    /// writers, but is not used when reading because reads use the specs
+    /// stored in manifest files.
+    default_sort_order_id: i64,
+    ///A map of snapshot references. The map keys are the unique snapshot 
reference
+    /// names in the table, and the map values are snapshot reference objects.
+    /// There is always a main branch reference pointing to the 
current-snapshot-id
+    /// even if the refs map is null.
+    refs: HashMap<String, SnapshotReference>,
+}
+
+impl TableMetadata {
+    /// Get current schema
+    #[inline]
+    pub fn current_schema(&self) -> Result<Arc<Schema>, Error> {
+        self.schemas
+            .get(&self.current_schema_id)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Schema id {} not found!", self.current_schema_id),
+                )
+            })
+            .cloned()
+    }
+    /// Get default partition spec
+    #[inline]
+    pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
+        self.partition_specs
+            .get(&self.default_spec_id)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Partition spec id {} not found!", 
self.default_spec_id),
+                )
+            })
+    }
+
+    /// Get current snapshot
+    #[inline]
+    pub fn current_snapshot(&self) -> Result<Option<Arc<Snapshot>>, Error> {
+        match (&self.current_snapshot_id, &self.snapshots) {
+            (Some(snapshot_id), Some(snapshots)) => 
Ok(snapshots.get(snapshot_id).cloned()),
+            (Some(-1), None) => Ok(None),
+            (None, None) => Ok(None),
+            (Some(_), None) => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Snapshot id is provided but there are no 
snapshots".to_string(),
+            )),
+            (None, Some(_)) => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "There are snapshots but no snapshot id is 
provided".to_string(),
+            )),
+        }
+    }
+
+    /// Append snapshot to table
+    pub fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error> 
{
+        self.last_updated_ms = snapshot.timestamp();
+        self.last_sequence_number = snapshot.sequence_number();
+
+        self.refs
+            .entry(MAIN_BRANCH.to_string())
+            .and_modify(|s| {
+                s.snapshot_id = snapshot.snapshot_id();
+            })
+            .or_insert_with(|| {
+                SnapshotReference::new(
+                    snapshot.snapshot_id(),
+                    SnapshotRetention::Branch {
+                        min_snapshots_to_keep: None,
+                        max_snapshot_age_ms: None,
+                        max_ref_age_ms: None,
+                    },
+                )
+            });
+
+        if let Some(snapshots) = &mut self.snapshots {
+            self.snapshot_log.push(snapshot.log());
+            snapshots.insert(snapshot.snapshot_id(), Arc::new(snapshot));
+        } else {
+            if !self.snapshot_log.is_empty() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Snapshot logs is empty while snapshots is not!",
+                ));
+            }
+
+            self.snapshot_log = vec![snapshot.log()];
+            self.snapshots = Some(HashMap::from_iter(vec![(
+                snapshot.snapshot_id(),
+                Arc::new(snapshot),
+            )]));
+        }
+
+        Ok(())
+    }
+}
+
+pub(super) mod _serde {
+    /// This is a helper module that defines types to help with 
serialization/deserialization.
+    /// For deserialization the input first gets read into either the 
[TableMetadataV1] or [TableMetadataV2] struct
+    /// and then converted into the [TableMetadata] struct. Serialization 
works the other way around.
+    /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are 
only used for serialization and deserialization.
+    use std::{collections::HashMap, sync::Arc};
+
+    use serde::{Deserialize, Serialize};
+    use uuid::Uuid;
+
+    use crate::{
+        spec::{
+            schema::_serde::{SchemaV1, SchemaV2},
+            snapshot::_serde::{SnapshotV1, SnapshotV2},
+            PartitionField, PartitionSpec, Schema, SnapshotReference, 
SnapshotRetention, SortOrder,
+        },
+        Error,
+    };
+
+    use super::{
+        FormatVersion, MetadataLog, SnapshotLog, TableMetadata, 
DEFAULT_SPEC_ID, MAIN_BRANCH,
+    };
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(untagged)]
+    pub(super) enum TableMetadataEnum {
+        V2(TableMetadataV2),
+        V1(TableMetadataV1),
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v2 table metadata for 
serialization/deserialization
+    pub(super) struct TableMetadataV2 {
+        pub format_version: VersionNumber<2>,
+        pub table_uuid: Uuid,
+        pub location: String,
+        pub last_sequence_number: i64,
+        pub last_updated_ms: i64,
+        pub last_column_id: i32,
+        pub schemas: Vec<SchemaV2>,
+        pub current_schema_id: i32,
+        pub partition_specs: Vec<PartitionSpec>,
+        pub default_spec_id: i32,
+        pub last_partition_id: i32,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub properties: Option<HashMap<String, String>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub current_snapshot_id: Option<i64>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshots: Option<Vec<SnapshotV2>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshot_log: Option<Vec<SnapshotLog>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub metadata_log: Option<Vec<MetadataLog>>,
+        pub sort_orders: Vec<SortOrder>,
+        pub default_sort_order_id: i64,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub refs: Option<HashMap<String, SnapshotReference>>,
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v1 table metadata for 
serialization/deserialization
+    pub(super) struct TableMetadataV1 {
+        pub format_version: VersionNumber<1>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub table_uuid: Option<Uuid>,
+        pub location: String,
+        pub last_updated_ms: i64,
+        pub last_column_id: i32,
+        pub schema: SchemaV1,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub schemas: Option<Vec<SchemaV1>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub current_schema_id: Option<i32>,
+        pub partition_spec: Vec<PartitionField>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub partition_specs: Option<Vec<PartitionSpec>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub default_spec_id: Option<i32>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub last_partition_id: Option<i32>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub properties: Option<HashMap<String, String>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub current_snapshot_id: Option<i64>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshots: Option<Vec<SnapshotV1>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub snapshot_log: Option<Vec<SnapshotLog>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub metadata_log: Option<Vec<MetadataLog>>,
+        pub sort_orders: Vec<SortOrder>,
+        pub default_sort_order_id: i64,
+    }
+
+    /// Helper to serialize and deserialize the format version.
+    #[derive(Debug, PartialEq, Eq)]
+    pub(super) struct VersionNumber<const V: u8>;
+
+    impl<const V: u8> Serialize for VersionNumber<V> {
+        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+        where
+            S: serde::Serializer,
+        {
+            serializer.serialize_u8(V)
+        }
+    }
+
+    impl<'de, const V: u8> Deserialize<'de> for VersionNumber<V> {
+        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+        where
+            D: serde::Deserializer<'de>,
+        {
+            let value = u8::deserialize(deserializer)?;
+            if value == V {
+                Ok(VersionNumber::<V>)
+            } else {
+                Err(serde::de::Error::custom("Invalid Version"))
+            }
+        }
+    }
+
+    impl TryFrom<TableMetadataEnum> for TableMetadata {
+        type Error = Error;
+        fn try_from(value: TableMetadataEnum) -> Result<Self, Error> {
+            match value {
+                TableMetadataEnum::V2(value) => value.try_into(),
+                TableMetadataEnum::V1(value) => value.try_into(),
+            }
+        }
+    }
+
+    impl From<TableMetadata> for TableMetadataEnum {
+        fn from(value: TableMetadata) -> Self {
+            match value.format_version {
+                FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
+                FormatVersion::V1 => TableMetadataEnum::V1(value.into()),
+            }
+        }
+    }
+
+    impl TryFrom<TableMetadataV2> for TableMetadata {
+        type Error = Error;
+        fn try_from(value: TableMetadataV2) -> Result<Self, self::Error> {
+            let current_snapshot_id = if let &Some(-1) = 
&value.current_snapshot_id {
+                None
+            } else {
+                value.current_snapshot_id
+            };
+            Ok(TableMetadata {
+                format_version: FormatVersion::V2,
+                table_uuid: value.table_uuid,
+                location: value.location,
+                last_sequence_number: value.last_sequence_number,
+                last_updated_ms: value.last_updated_ms,
+                last_column_id: value.last_column_id,
+                schemas: HashMap::from_iter(
+                    value
+                        .schemas
+                        .into_iter()
+                        .map(|schema| Ok((schema.schema_id, 
Arc::new(schema.try_into()?))))
+                        .collect::<Result<Vec<_>, Error>>()?
+                        .into_iter(),
+                ),
+                current_schema_id: value.current_schema_id,
+                partition_specs: HashMap::from_iter(
+                    value.partition_specs.into_iter().map(|x| (x.spec_id, x)),
+                ),
+                default_spec_id: value.default_spec_id,
+                last_partition_id: value.last_partition_id,
+                properties: value.properties.unwrap_or_default(),
+                current_snapshot_id,
+                snapshots: value.snapshots.map(|snapshots| {
+                    HashMap::from_iter(
+                        snapshots
+                            .into_iter()
+                            .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+                    )
+                }),
+                snapshot_log: value.snapshot_log.unwrap_or_default(),
+                metadata_log: value.metadata_log.unwrap_or_default(),
+                sort_orders: HashMap::from_iter(
+                    value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+                ),
+                default_sort_order_id: value.default_sort_order_id,
+                refs: value.refs.unwrap_or_else(|| {
+                    if let Some(snapshot_id) = current_snapshot_id {
+                        HashMap::from_iter(vec![(
+                            MAIN_BRANCH.to_string(),
+                            SnapshotReference {
+                                snapshot_id,
+                                retention: SnapshotRetention::Branch {
+                                    min_snapshots_to_keep: None,
+                                    max_snapshot_age_ms: None,
+                                    max_ref_age_ms: None,
+                                },
+                            },
+                        )])
+                    } else {
+                        HashMap::new()
+                    }
+                }),
+            })
+        }
+    }
+
+    impl TryFrom<TableMetadataV1> for TableMetadata {
+        type Error = Error;
+        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
+            let schemas = value
+                .schemas
+                .map(|schemas| {
+                    Ok::<_, Error>(HashMap::from_iter(
+                        schemas
+                            .into_iter()
+                            .enumerate()
+                            .map(|(i, schema)| {
+                                Ok((
+                                    schema.schema_id.unwrap_or(i as i32),
+                                    Arc::new(schema.try_into()?),
+                                ))
+                            })
+                            .collect::<Result<Vec<_>, Error>>()?
+                            .into_iter(),
+                    ))
+                })
+                .or_else(|| {
+                    Some(value.schema.try_into().map(|schema: Schema| {
+                        HashMap::from_iter(vec![(schema.schema_id(), 
Arc::new(schema))])
+                    }))
+                })
+                .transpose()?
+                .unwrap_or_default();
+            let partition_specs = HashMap::from_iter(
+                value
+                    .partition_specs
+                    .unwrap_or_else(|| {
+                        vec![PartitionSpec {
+                            spec_id: DEFAULT_SPEC_ID,
+                            fields: value.partition_spec,
+                        }]
+                    })
+                    .into_iter()
+                    .map(|x| (x.spec_id, x)),
+            );
+            Ok(TableMetadata {
+                format_version: FormatVersion::V1,
+                table_uuid: value.table_uuid.unwrap_or_default(),
+                location: value.location,
+                last_sequence_number: 0,
+                last_updated_ms: value.last_updated_ms,
+                last_column_id: value.last_column_id,
+                current_schema_id: value
+                    .current_schema_id
+                    .unwrap_or_else(|| 
schemas.keys().copied().max().unwrap_or_default()),
+                default_spec_id: value
+                    .default_spec_id
+                    .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default()),
+                last_partition_id: value
+                    .last_partition_id
+                    .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default()),
+                partition_specs,
+                schemas,
+
+                properties: value.properties.unwrap_or_default(),
+                current_snapshot_id: if let &Some(-1) = 
&value.current_snapshot_id {
+                    None
+                } else {
+                    value.current_snapshot_id
+                },
+                snapshots: value
+                    .snapshots
+                    .map(|snapshots| {
+                        Ok::<_, Error>(HashMap::from_iter(
+                            snapshots
+                                .into_iter()
+                                .map(|x| Ok((x.snapshot_id, 
Arc::new(x.try_into()?))))
+                                .collect::<Result<Vec<_>, Error>>()?,
+                        ))
+                    })
+                    .transpose()?,
+                snapshot_log: value.snapshot_log.unwrap_or_default(),
+                metadata_log: value.metadata_log.unwrap_or_default(),
+                sort_orders: HashMap::from_iter(
+                    value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+                ),
+                default_sort_order_id: value.default_sort_order_id,
+                refs: HashMap::from_iter(vec![(
+                    MAIN_BRANCH.to_string(),
+                    SnapshotReference {
+                        snapshot_id: 
value.current_snapshot_id.unwrap_or_default(),
+                        retention: SnapshotRetention::Branch {
+                            min_snapshots_to_keep: None,
+                            max_snapshot_age_ms: None,
+                            max_ref_age_ms: None,
+                        },
+                    },
+                )]),
+            })
+        }
+    }
+
+    impl From<TableMetadata> for TableMetadataV2 {
+        fn from(v: TableMetadata) -> Self {
+            TableMetadataV2 {
+                format_version: VersionNumber::<2>,
+                table_uuid: v.table_uuid,
+                location: v.location,
+                last_sequence_number: v.last_sequence_number,
+                last_updated_ms: v.last_updated_ms,
+                last_column_id: v.last_column_id,
+                schemas: v
+                    .schemas
+                    .into_values()
+                    .map(|x| {
+                        Arc::try_unwrap(x)
+                            .unwrap_or_else(|schema| schema.as_ref().clone())
+                            .into()
+                    })
+                    .collect(),
+                current_schema_id: v.current_schema_id,
+                partition_specs: v.partition_specs.into_values().collect(),
+                default_spec_id: v.default_spec_id,
+                last_partition_id: v.last_partition_id,
+                properties: if v.properties.is_empty() {
+                    None
+                } else {
+                    Some(v.properties)
+                },
+                current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
+                snapshots: v.snapshots.map(|snapshots| {
+                    snapshots
+                        .into_values()
+                        .map(|x| {
+                            Arc::try_unwrap(x)
+                                .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
+                                .into()
+                        })
+                        .collect()
+                }),
+                snapshot_log: if v.snapshot_log.is_empty() {
+                    None
+                } else {
+                    Some(v.snapshot_log)
+                },
+                metadata_log: if v.metadata_log.is_empty() {
+                    None
+                } else {
+                    Some(v.metadata_log)
+                },
+                sort_orders: v.sort_orders.into_values().collect(),
+                default_sort_order_id: v.default_sort_order_id,
+                refs: Some(v.refs),
+            }
+        }
+    }
+
+    impl From<TableMetadata> for TableMetadataV1 {
+        fn from(v: TableMetadata) -> Self {
+            TableMetadataV1 {
+                format_version: VersionNumber::<1>,
+                table_uuid: Some(v.table_uuid),
+                location: v.location,
+                last_updated_ms: v.last_updated_ms,
+                last_column_id: v.last_column_id,
+                schema: v
+                    .schemas
+                    .get(&v.current_schema_id)
+                    .unwrap()
+                    .as_ref()
+                    .clone()
+                    .into(),
+                schemas: Some(
+                    v.schemas
+                        .into_values()
+                        .map(|x| {
+                            Arc::try_unwrap(x)
+                                .unwrap_or_else(|schema| 
schema.as_ref().clone())
+                                .into()
+                        })
+                        .collect(),
+                ),
+                current_schema_id: Some(v.current_schema_id),
+                partition_spec: v
+                    .partition_specs
+                    .get(&v.default_spec_id)
+                    .map(|x| x.fields.clone())
+                    .unwrap_or_default(),
+                partition_specs: 
Some(v.partition_specs.into_values().collect()),
+                default_spec_id: Some(v.default_spec_id),
+                last_partition_id: Some(v.last_partition_id),
+                properties: if v.properties.is_empty() {
+                    None
+                } else {
+                    Some(v.properties)
+                },
+                current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
+                snapshots: v.snapshots.map(|snapshots| {
+                    snapshots
+                        .into_values()
+                        .map(|x| {
+                            Arc::try_unwrap(x)
+                                .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
+                                .into()
+                        })
+                        .collect()
+                }),
+                snapshot_log: if v.snapshot_log.is_empty() {
+                    None
+                } else {
+                    Some(v.snapshot_log)
+                },
+                metadata_log: if v.metadata_log.is_empty() {
+                    None
+                } else {
+                    Some(v.metadata_log)
+                },
+                sort_orders: v.sort_orders.into_values().collect(),
+                default_sort_order_id: v.default_sort_order_id,
+            }
+        }
+    }
+}
+
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
+#[repr(u8)]
+/// Iceberg format version
+pub enum FormatVersion {
+    /// Iceberg spec version 1
+    V1 = b'1',
+    /// Iceberg spec version 2
+    V2 = b'2',
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Encodes changes to the previous metadata files for the table
+pub struct MetadataLog {
+    /// The file for the log.
+    pub metadata_file: String,
+    /// Time new metadata was created
+    pub timestamp_ms: i64,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A log of when each snapshot was made.
+pub struct SnapshotLog {
+    /// Id of the snapshot.
+    pub snapshot_id: i64,
+    /// Last updated timestamp
+    pub timestamp_ms: i64,
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::{collections::HashMap, sync::Arc};
+
+    use anyhow::Result;
+    use uuid::Uuid;
+
+    use pretty_assertions::assert_eq;
+
+    use crate::spec::{
+        table_metadata::TableMetadata, ManifestList, NestedField, Operation, 
PartitionField,
+        PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, 
SnapshotRetention,
+        SortOrder, Summary, Transform, Type,
+    };
+
+    use super::{FormatVersion, MetadataLog, SnapshotLog};
+
+    fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
+        let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
+        assert_eq!(desered_type, expected_type);
+
+        let sered_json = serde_json::to_string(&expected_type).unwrap();
+        let parsed_json_value = 
serde_json::from_str::<TableMetadata>(&sered_json).unwrap();
+
+        assert_eq!(parsed_json_value, desered_type);
+    }
+
+    #[test]
+    fn test_table_data_v2() {
+        let data = r#"
+            {
+                "format-version" : 2,
+                "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
+                "location": "s3://b/wh/data.db/table",
+                "last-sequence-number" : 1,
+                "last-updated-ms": 1515100955770,
+                "last-column-id": 1,
+                "schemas": [
+                    {
+                        "schema-id" : 1,
+                        "type" : "struct",
+                        "fields" :[
+                            {
+                                "id": 1,
+                                "name": "struct_name",
+                                "required": true,
+                                "type": "fixed[1]"
+                            }
+                        ]
+                    }
+                ],
+                "current-schema-id" : 1,
+                "partition-specs": [
+                    {
+                        "spec-id": 1,
+                        "fields": [
+                            {  
+                                "source-id": 4,  
+                                "field-id": 1000,  
+                                "name": "ts_day",  
+                                "transform": "day"
+                            } 
+                        ]
+                    }
+                ],
+                "default-spec-id": 1,
+                "last-partition-id": 1000,
+                "properties": {
+                    "commit.retry.num-retries": "1"
+                },
+                "metadata-log": [
+                    {  
+                        "metadata-file": "s3://bucket/.../v1.json",  
+                        "timestamp-ms": 1515100
+                    }
+                ],
+                "sort-orders": [],
+                "default-sort-order-id": 0
+            }
+        "#;
+
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![Arc::new(NestedField::required(
+                1,
+                "struct_name",
+                Type::Primitive(PrimitiveType::Fixed(1)),
+            ))])
+            .build()
+            .unwrap();
+
+        let partition_spec = PartitionSpec::builder()
+            .with_spec_id(1)
+            .with_partition_field(PartitionField {
+                name: "ts_day".to_string(),
+                transform: Transform::Day,
+                source_id: 4,
+                field_id: 1000,
+            })
+            .build()
+            .unwrap();
+
+        let expected = TableMetadata {
+            format_version: FormatVersion::V2,
+            table_uuid: 
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+            location: "s3://b/wh/data.db/table".to_string(),
+            last_updated_ms: 1515100955770,
+            last_column_id: 1,
+            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+            current_schema_id: 1,
+            partition_specs: HashMap::from_iter(vec![(1, partition_spec)]),
+            default_spec_id: 1,
+            last_partition_id: 1000,
+            default_sort_order_id: 0,
+            sort_orders: HashMap::from_iter(vec![]),
+            snapshots: None,
+            current_snapshot_id: None,
+            last_sequence_number: 1,
+            properties: HashMap::from_iter(vec![(
+                "commit.retry.num-retries".to_string(),
+                "1".to_string(),
+            )]),
+            snapshot_log: Vec::new(),
+            metadata_log: vec![MetadataLog {
+                metadata_file: "s3://bucket/.../v1.json".to_string(),
+                timestamp_ms: 1515100,
+            }],
+            refs: HashMap::new(),
+        };
+
+        check_table_metadata_serde(data, expected);
+    }
+
+    #[test]
+    fn test_table_data_v1() {
+        let data = r#"
+        {
+            "format-version" : 1,
+            "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7",
+            "location" : "/home/iceberg/warehouse/nyc/taxis",
+            "last-updated-ms" : 1662532818843,
+            "last-column-id" : 5,
+            "schema" : {
+              "type" : "struct",
+              "schema-id" : 0,
+              "fields" : [ {
+                "id" : 1,
+                "name" : "vendor_id",
+                "required" : false,
+                "type" : "long"
+              }, {
+                "id" : 2,
+                "name" : "trip_id",
+                "required" : false,
+                "type" : "long"
+              }, {
+                "id" : 3,
+                "name" : "trip_distance",
+                "required" : false,
+                "type" : "float"
+              }, {
+                "id" : 4,
+                "name" : "fare_amount",
+                "required" : false,
+                "type" : "double"
+              }, {
+                "id" : 5,
+                "name" : "store_and_fwd_flag",
+                "required" : false,
+                "type" : "string"
+              } ]
+            },
+            "partition-spec" : [ {
+              "name" : "vendor_id",
+              "transform" : "identity",
+              "source-id" : 1,
+              "field-id" : 1000
+            } ],
+            "last-partition-id" : 1000,
+            "default-sort-order-id" : 0,
+            "sort-orders" : [ {
+              "order-id" : 0,
+              "fields" : [ ]
+            } ],
+            "properties" : {
+              "owner" : "root"
+            },
+            "current-snapshot-id" : 638933773299822130,
+            "refs" : {
+              "main" : {
+                "snapshot-id" : 638933773299822130,
+                "type" : "branch"
+              }
+            },
+            "snapshots" : [ {
+              "snapshot-id" : 638933773299822130,
+              "timestamp-ms" : 1662532818843,
+              "sequence-number" : 0,
+              "summary" : {
+                "operation" : "append",
+                "spark.app.id" : "local-1662532784305",
+                "added-data-files" : "4",
+                "added-records" : "4",
+                "added-files-size" : "6001"
+              },
+              "manifest-list" : 
"/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro",
+              "schema-id" : 0
+            } ],
+            "snapshot-log" : [ {
+              "timestamp-ms" : 1662532818843,
+              "snapshot-id" : 638933773299822130
+            } ],
+            "metadata-log" : [ {
+              "timestamp-ms" : 1662532805245,
+              "metadata-file" : 
"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json"
+            } ]
+          }
+        "#;
+
+        let schema = Schema::builder()
+            .with_fields(vec![
+                Arc::new(NestedField::optional(
+                    1,
+                    "vendor_id",
+                    Type::Primitive(PrimitiveType::Long),
+                )),
+                Arc::new(NestedField::optional(
+                    2,
+                    "trip_id",
+                    Type::Primitive(PrimitiveType::Long),
+                )),
+                Arc::new(NestedField::optional(
+                    3,
+                    "trip_distance",
+                    Type::Primitive(PrimitiveType::Float),
+                )),
+                Arc::new(NestedField::optional(
+                    4,
+                    "fare_amount",
+                    Type::Primitive(PrimitiveType::Double),
+                )),
+                Arc::new(NestedField::optional(
+                    5,
+                    "store_and_fwd_flag",
+                    Type::Primitive(PrimitiveType::String),
+                )),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = PartitionSpec::builder()
+            .with_spec_id(0)
+            .with_partition_field(PartitionField {
+                name: "vendor_id".to_string(),
+                transform: Transform::Identity,
+                source_id: 1,
+                field_id: 1000,
+            })
+            .build()
+            .unwrap();
+
+        let sort_order = 
SortOrder::builder().with_order_id(0).build().unwrap();
+
+        let snapshot = Snapshot::builder()
+            .with_snapshot_id(638933773299822130)
+            .with_timestamp_ms(1662532818843)
+            .with_sequence_number(0)
+            .with_schema_id(0)
+            
.with_manifest_list(ManifestList::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
+            .with_summary(Summary{operation: Operation::Append, other: 
HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])})
+            .build().unwrap();
+
+        let expected = TableMetadata {
+            format_version: FormatVersion::V1,
+            table_uuid: 
Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(),
+            location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
+            last_updated_ms: 1662532818843,
+            last_column_id: 5,
+            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+            current_schema_id: 0,
+            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+            default_spec_id: 0,
+            last_partition_id: 1000,
+            default_sort_order_id: 0,
+            sort_orders: HashMap::from_iter(vec![(0, sort_order)]),
+            snapshots: Some(HashMap::from_iter(vec![(638933773299822130, 
Arc::new(snapshot))])),
+            current_snapshot_id: Some(638933773299822130),
+            last_sequence_number: 0,
+            properties: 
HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]),
+            snapshot_log: vec![SnapshotLog {
+                snapshot_id: 638933773299822130,
+                timestamp_ms: 1662532818843,
+            }],
+            metadata_log: 
vec![MetadataLog{metadata_file:"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(),
 timestamp_ms: 1662532805245}],
+            refs: 
HashMap::from_iter(vec![("main".to_string(),SnapshotReference{snapshot_id: 
638933773299822130, retention: SnapshotRetention::Branch { 
min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None 
}})])
+        };
+
+        check_table_metadata_serde(data, expected);
+    }
+
+    #[test]
+    fn test_invalid_table_uuid() -> Result<()> {
+        let data = r#"
+            {
+                "format-version" : 2,
+                "table-uuid": "xxxx"
+            }
+        "#;
+        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
+        Ok(())
+    }
+    #[test]
+    fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> {
+        let data = r#"
+            {
+                "format-version" : 1
+            }
+        "#;
+        assert!(serde_json::from_str::<TableMetadata>(data).is_err());
+        Ok(())
+    }
+}

Reply via email to