This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ab4f69a  View Spec implementation (#331)
ab4f69a is described below

commit ab4f69a64115fc66edff0fa0a5c5032c53e138e7
Author: Christian <[email protected]>
AuthorDate: Sun Jul 28 16:02:22 2024 +0200

    View Spec implementation (#331)
    
    * Add support for ViewSpec
    
    * Fix typos
    
    * Fix typos
    
    * clippy is always right
    
    * Add tests
    
    * Remove new_view_version test function
    
    * Remove append_version
    
    * View Representations Struct
    
    * ViewRepresentation case insensitive
    
    * Add fallible methods for ViewRepresentationsBuilder
    
    * Add tests for fallibe ViewRepresentationsBuilder methods
    
    * Introduce ViewVersionId as i32
    
    * Iterator for &'a ViewRepresentations
    
    * Improve comments
    
    Co-authored-by: Renjie Liu <[email protected]>
    
    * Add test_view_metadata_v1_file_valid
    
    * Fix view_version iter
    
    * Remove ViewRepresentationsBuilder
    
    * Fix comment
    
    * Timestamp error handling
    
    * Fallible Timestamp Conversion from Millis
    
    * Fix Initial view Version = 1
    
    * Cleanup
    
    * Hide ViewMetadata iter() type
    
    * timestamp_ms_to_utc -> error.rs
    
    * TableMetadata timestamp conversion -> utility function
    
    * Improve error context
    
    * timestamp_ms_to_utc: LocalResult::None -> DataInvalid
    
    * Fix obsolete comment
    
    * ViewRepresentation::SqlViewRepresentation -> ::Sql
    
    * Fix broken clippy from rebase
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 Cargo.toml                                         |   3 +-
 crates/catalog/rest/src/catalog.rs                 |  14 +-
 crates/iceberg/src/catalog/mod.rs                  |  27 +-
 crates/iceberg/src/error.rs                        |  24 +
 crates/iceberg/src/lib.rs                          |   2 +-
 crates/iceberg/src/spec/mod.rs                     |   4 +
 crates/iceberg/src/spec/table_metadata.rs          |  28 +-
 crates/iceberg/src/spec/view_metadata.rs           | 728 +++++++++++++++++++++
 crates/iceberg/src/spec/view_version.rs            | 313 +++++++++
 .../ViewMetadataUnsupportedVersion.json            |  58 ++
 .../ViewMetadataV1CurrentVersionNotFound.json      |  58 ++
 .../ViewMetadataV1MissingCurrentVersion.json       |  57 ++
 .../view_metadata/ViewMetadataV1MissingSchema.json |  56 ++
 .../ViewMetadataV1SchemaNotFound.json              |  58 ++
 .../view_metadata/ViewMetadataV1Valid.json         |  58 ++
 15 files changed, 1474 insertions(+), 14 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c4f8482..60307bd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,7 +85,8 @@ tempfile = "3.8"
 tokio = { version = "1", default-features = false }
 typed-builder = "^0.19"
 url = "2"
-uuid = "1.6.1"
+urlencoding = "2"
+uuid = { version = "1.6.1", features = ["v7"] }
 volo-thrift = "0.10"
 hive_metastore = "0.1.0"
 tera = "1"
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index aab615c..30f2e29 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -1321,7 +1321,7 @@ mod tests {
         );
         assert_eq!(
             Utc.timestamp_millis_opt(1646787054459).unwrap(),
-            table.metadata().last_updated_ms()
+            table.metadata().last_updated_timestamp().unwrap()
         );
         assert_eq!(
             vec![&Arc::new(
@@ -1511,7 +1511,11 @@ mod tests {
         );
         assert_eq!(
             1657810967051,
-            table.metadata().last_updated_ms().timestamp_millis()
+            table
+                .metadata()
+                .last_updated_timestamp()
+                .unwrap()
+                .timestamp_millis()
         );
         assert_eq!(
             vec![&Arc::new(
@@ -1682,7 +1686,11 @@ mod tests {
         );
         assert_eq!(
             1657810967051,
-            table.metadata().last_updated_ms().timestamp_millis()
+            table
+                .metadata()
+                .last_updated_timestamp()
+                .unwrap()
+                .timestamp_millis()
         );
         assert_eq!(
             vec![&Arc::new(
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index 5c63e1e..bc98772 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,7 +29,7 @@ use uuid::Uuid;
 
 use crate::spec::{
     FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, 
TableMetadataBuilder,
-    UnboundPartitionSpec,
+    UnboundPartitionSpec, ViewRepresentations,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -439,6 +439,31 @@ impl TableUpdate {
     }
 }
 
+/// ViewCreation represents the creation of a view in the catalog.
+#[derive(Debug, TypedBuilder)]
+pub struct ViewCreation {
+    /// The name of the view.
+    pub name: String,
+    /// The view's base location; used to create metadata file locations
+    pub location: String,
+    /// Representations for the view.
+    pub representations: ViewRepresentations,
+    /// The schema of the view.
+    pub schema: Schema,
+    /// The properties of the view.
+    #[builder(default)]
+    pub properties: HashMap<String, String>,
+    /// The default namespace to use when a reference in the SELECT is a 
single identifier
+    pub default_namespace: NamespaceIdent,
+    /// Default catalog to use when a reference in the SELECT does not contain 
a catalog
+    #[builder(default)]
+    pub default_catalog: Option<String>,
+    /// A string to string map of summary metadata about the version
+    /// Typical keys are "engine-name" and "engine-version"
+    #[builder(default)]
+    pub summary: HashMap<String, String>,
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 6270b43..6f7fd7c 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -19,6 +19,8 @@ use std::backtrace::{Backtrace, BacktraceStatus};
 use std::fmt;
 use std::fmt::{Debug, Display, Formatter};
 
+use chrono::{DateTime, TimeZone as _, Utc};
+
 /// Result that is a wrapper of `Result<T, iceberg::Error>`
 pub type Result<T> = std::result::Result<T, Error>;
 
@@ -331,6 +333,28 @@ define_from_err!(
 
 define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
 
+/// Converts a timestamp in milliseconds to `DateTime<Utc>`, handling errors.
+///
+/// # Arguments
+///
+/// * `timestamp_ms` - The timestamp in milliseconds to convert.
+///
+/// # Returns
+///
+/// This function returns a `Result<DateTime<Utc>, Error>` which is `Ok` with 
the `DateTime<Utc>` if the conversion is successful,
+/// or an `Err` with an appropriate error if the timestamp is ambiguous or 
invalid.
+pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
+    match Utc.timestamp_millis_opt(timestamp_ms) {
+        chrono::LocalResult::Single(t) => Ok(t),
+        chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
+            ErrorKind::Unexpected,
+            "Ambiguous timestamp with two possible results",
+        )),
+        chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid, 
"Invalid timestamp")),
+    }
+    .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
+}
+
 /// Helper macro to check arguments.
 ///
 ///
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 35d5932..9682fa1 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -29,7 +29,7 @@ mod catalog;
 
 pub use catalog::{
     Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, 
TableIdent, TableRequirement,
-    TableUpdate,
+    TableUpdate, ViewCreation,
 };
 
 pub mod table;
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 199fc4a..793f00d 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -27,6 +27,8 @@ mod sort;
 mod table_metadata;
 mod transform;
 mod values;
+mod view_metadata;
+mod view_version;
 
 pub use datatypes::*;
 pub use manifest::*;
@@ -38,3 +40,5 @@ pub use sort::*;
 pub use table_metadata::*;
 pub use transform::*;
 pub use values::*;
+pub use view_metadata::*;
+pub use view_version::*;
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index d9a09d8..53bcabb 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -24,7 +24,7 @@ use std::fmt::{Display, Formatter};
 use std::sync::Arc;
 
 use _serde::TableMetadataEnum;
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use serde_repr::{Deserialize_repr, Serialize_repr};
 use uuid::Uuid;
@@ -33,7 +33,7 @@ use super::snapshot::{Snapshot, SnapshotReference, 
SnapshotRetention};
 use super::{
     PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, 
SortOrder, SortOrderRef,
 };
-use crate::error::Result;
+use crate::error::{timestamp_ms_to_utc, Result};
 use crate::{Error, ErrorKind, TableCreation};
 
 static MAIN_BRANCH: &str = "main";
@@ -143,8 +143,14 @@ impl TableMetadata {
 
     /// Returns last updated time.
     #[inline]
-    pub fn last_updated_ms(&self) -> DateTime<Utc> {
-        Utc.timestamp_millis_opt(self.last_updated_ms).unwrap()
+    pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
+        timestamp_ms_to_utc(self.last_updated_ms)
+    }
+
+    /// Returns last updated time in milliseconds.
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.last_updated_ms
     }
 
     /// Returns schemas
@@ -328,7 +334,7 @@ impl TableMetadataBuilder {
 
         let table_metadata = TableMetadata {
             format_version: FormatVersion::V2,
-            table_uuid: Uuid::new_v4(),
+            table_uuid: Uuid::now_v7(),
             location: location.ok_or_else(|| {
                 Error::new(
                     ErrorKind::DataInvalid,
@@ -472,7 +478,7 @@ pub(super) mod _serde {
 
     /// Helper to serialize and deserialize the format version.
     #[derive(Debug, PartialEq, Eq)]
-    pub(super) struct VersionNumber<const V: u8>;
+    pub(crate) struct VersionNumber<const V: u8>;
 
     impl Serialize for TableMetadata {
         fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -903,8 +909,14 @@ pub struct SnapshotLog {
 
 impl SnapshotLog {
     /// Returns the last updated timestamp as a DateTime<Utc> with millisecond 
precision
-    pub fn timestamp(self) -> DateTime<Utc> {
-        Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
+    pub fn timestamp(self) -> Result<DateTime<Utc>> {
+        timestamp_ms_to_utc(self.timestamp_ms)
+    }
+
+    /// Returns the timestamp in milliseconds
+    #[inline]
+    pub fn timestamp_ms(&self) -> i64 {
+        self.timestamp_ms
     }
 }
 
diff --git a/crates/iceberg/src/spec/view_metadata.rs 
b/crates/iceberg/src/spec/view_metadata.rs
new file mode 100644
index 0000000..cc46f07
--- /dev/null
+++ b/crates/iceberg/src/spec/view_metadata.rs
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the [view 
metadata](https://iceberg.apache.org/view-spec/#view-metadata).
+//! The main struct here is [ViewMetadata] which defines the data for a view.
+
+use std::cmp::Ordering;
+use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+
+use _serde::ViewMetadataEnum;
+use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
+use serde::{Deserialize, Serialize};
+use serde_repr::{Deserialize_repr, Serialize_repr};
+use uuid::Uuid;
+
+use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
+use super::{SchemaId, SchemaRef};
+use crate::catalog::ViewCreation;
+use crate::error::Result;
+
+/// Reference to [`ViewMetadata`].
+pub type ViewMetadataRef = Arc<ViewMetadata>;
+
+pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
+
+#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
+#[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
+/// Fields for the version 1 of the view metadata.
+///
+/// We assume that this data structure is always valid, so we will panic when 
invalid error happens.
+/// We check the validity of this data structure when constructing.
+pub struct ViewMetadata {
+    /// Integer Version for the format.
+    pub(crate) format_version: ViewFormatVersion,
+    /// A UUID that identifies the view, generated when the view is created.
+    pub(crate) view_uuid: Uuid,
+    /// The view's base location; used to create metadata file locations
+    pub(crate) location: String,
+    /// ID of the current version of the view (version-id)
+    pub(crate) current_version_id: ViewVersionId,
+    /// A list of known versions of the view
+    pub(crate) versions: HashMap<ViewVersionId, ViewVersionRef>,
+    /// A list of version log entries with the timestamp and version-id for 
every
+    /// change to current-version-id
+    pub(crate) version_log: Vec<ViewVersionLog>,
+    /// A list of schemas, stored as objects with schema-id.
+    pub(crate) schemas: HashMap<i32, SchemaRef>,
+    /// A string to string map of view properties.
+    /// Properties are used for metadata such as comment and for settings that
+    /// affect view maintenance. This is not intended to be used for arbitrary 
metadata.
+    pub(crate) properties: HashMap<String, String>,
+}
+
+impl ViewMetadata {
+    /// Returns format version of this metadata.
+    #[inline]
+    pub fn format_version(&self) -> ViewFormatVersion {
+        self.format_version
+    }
+
+    /// Returns uuid of current view.
+    #[inline]
+    pub fn uuid(&self) -> Uuid {
+        self.view_uuid
+    }
+
+    /// Returns view location.
+    #[inline]
+    pub fn location(&self) -> &str {
+        self.location.as_str()
+    }
+
+    /// Returns the current version id.
+    #[inline]
+    pub fn current_version_id(&self) -> ViewVersionId {
+        self.current_version_id
+    }
+
+    /// Returns all view versions.
+    #[inline]
+    pub fn versions(&self) -> impl Iterator<Item = &ViewVersionRef> {
+        self.versions.values()
+    }
+
+    /// Lookup a view version by id.
+    #[inline]
+    pub fn version_by_id(&self, version_id: ViewVersionId) -> 
Option<&ViewVersionRef> {
+        self.versions.get(&version_id)
+    }
+
+    /// Returns the current view version.
+    #[inline]
+    pub fn current_version(&self) -> &ViewVersionRef {
+        self.versions
+            .get(&self.current_version_id)
+            .expect("Current version id set, but not found in view versions")
+    }
+
+    /// Returns schemas
+    #[inline]
+    pub fn schemas_iter(&self) -> impl Iterator<Item = &SchemaRef> {
+        self.schemas.values()
+    }
+
+    /// Lookup schema by id.
+    #[inline]
+    pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
+        self.schemas.get(&schema_id)
+    }
+
+    /// Get current schema
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        let schema_id = self.current_version().schema_id();
+        self.schema_by_id(schema_id)
+            .expect("Current schema id set, but not found in view metadata")
+    }
+
+    /// Returns properties of the view.
+    #[inline]
+    pub fn properties(&self) -> &HashMap<String, String> {
+        &self.properties
+    }
+
+    /// Returns view history.
+    #[inline]
+    pub fn history(&self) -> &[ViewVersionLog] {
+        &self.version_log
+    }
+}
+
+/// Manipulating view metadata.
+pub struct ViewMetadataBuilder(ViewMetadata);
+
+impl ViewMetadataBuilder {
+    /// Creates a new view metadata builder from the given view metadata.
+    pub fn new(origin: ViewMetadata) -> Self {
+        Self(origin)
+    }
+
+    /// Creates a new view metadata builder from the given view creation.
+    pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
+        let ViewCreation {
+            location,
+            schema,
+            properties,
+            name: _,
+            representations,
+            default_catalog,
+            default_namespace,
+            summary,
+        } = view_creation;
+        let initial_version_id = super::INITIAL_VIEW_VERSION_ID;
+        let version = ViewVersion::builder()
+            .with_default_catalog(default_catalog)
+            .with_default_namespace(default_namespace)
+            .with_representations(representations)
+            .with_schema_id(schema.schema_id())
+            .with_summary(summary)
+            .with_timestamp_ms(Utc::now().timestamp_millis())
+            .with_version_id(initial_version_id)
+            .build();
+
+        let versions = HashMap::from_iter(vec![(initial_version_id, 
version.into())]);
+
+        let view_metadata = ViewMetadata {
+            format_version: ViewFormatVersion::V1,
+            view_uuid: Uuid::now_v7(),
+            location,
+            current_version_id: initial_version_id,
+            versions,
+            version_log: Vec::new(),
+            schemas: HashMap::from_iter(vec![(schema.schema_id(), 
Arc::new(schema))]),
+            properties,
+        };
+
+        Ok(Self(view_metadata))
+    }
+
+    /// Changes uuid of view metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
+        self.0.view_uuid = uuid;
+        Ok(self)
+    }
+
+    /// Returns the new view metadata after changes.
+    pub fn build(self) -> Result<ViewMetadata> {
+        Ok(self.0)
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A log of when each snapshot was made.
+pub struct ViewVersionLog {
+    /// ID that current-version-id was set to
+    version_id: ViewVersionId,
+    /// Timestamp when the view's current-version-id was updated (ms from 
epoch)
+    timestamp_ms: i64,
+}
+
+impl ViewVersionLog {
+    #[inline]
+    /// Creates a new view version log.
+    pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self {
+        Self {
+            version_id,
+            timestamp_ms: timestamp,
+        }
+    }
+
+    /// Returns the version id.
+    #[inline]
+    pub fn version_id(&self) -> ViewVersionId {
+        self.version_id
+    }
+
+    /// Returns the timestamp in milliseconds from epoch.
+    #[inline]
+    pub fn timestamp_ms(&self) -> i64 {
+        self.timestamp_ms
+    }
+
+    /// Returns the last updated timestamp as a DateTime<Utc> with millisecond 
precision.
+    pub fn timestamp(self) -> MappedLocalTime<DateTime<Utc>> {
+        Utc.timestamp_millis_opt(self.timestamp_ms)
+    }
+}
+
+pub(super) mod _serde {
+    /// This is a helper module that defines types to help with 
serialization/deserialization.
+    /// For deserialization the input first gets read into either the 
[ViewMetadataV1] struct
+    /// and then converted into the [ViewMetadata] struct. Serialization works 
the other way around.
+    /// [ViewMetadataV1] is an internal struct that are only used for 
serialization and deserialization.
+    use std::{collections::HashMap, sync::Arc};
+
+    use serde::{Deserialize, Serialize};
+    use uuid::Uuid;
+
+    use super::{ViewFormatVersion, ViewVersionId, ViewVersionLog};
+    use crate::spec::schema::_serde::SchemaV2;
+    use crate::spec::table_metadata::_serde::VersionNumber;
+    use crate::spec::view_version::_serde::ViewVersionV1;
+    use crate::spec::{ViewMetadata, ViewVersion};
+    use crate::{Error, ErrorKind};
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(untagged)]
+    pub(super) enum ViewMetadataEnum {
+        V1(ViewMetadataV1),
+    }
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v1 view metadata for 
serialization/deserialization
+    pub(super) struct ViewMetadataV1 {
+        pub format_version: VersionNumber<1>,
+        pub(super) view_uuid: Uuid,
+        pub(super) location: String,
+        pub(super) current_version_id: ViewVersionId,
+        pub(super) versions: Vec<ViewVersionV1>,
+        pub(super) version_log: Vec<ViewVersionLog>,
+        pub(super) schemas: Vec<SchemaV2>,
+        pub(super) properties: Option<std::collections::HashMap<String, 
String>>,
+    }
+
+    impl Serialize for ViewMetadata {
+        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+        where S: serde::Serializer {
+            // we must do a clone here
+            let metadata_enum: ViewMetadataEnum =
+                self.clone().try_into().map_err(serde::ser::Error::custom)?;
+
+            metadata_enum.serialize(serializer)
+        }
+    }
+
+    impl TryFrom<ViewMetadataEnum> for ViewMetadata {
+        type Error = Error;
+        fn try_from(value: ViewMetadataEnum) -> Result<Self, Error> {
+            match value {
+                ViewMetadataEnum::V1(value) => value.try_into(),
+            }
+        }
+    }
+
+    impl TryFrom<ViewMetadata> for ViewMetadataEnum {
+        type Error = Error;
+        fn try_from(value: ViewMetadata) -> Result<Self, Error> {
+            Ok(match value.format_version {
+                ViewFormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
+            })
+        }
+    }
+
+    impl TryFrom<ViewMetadataV1> for ViewMetadata {
+        type Error = Error;
+        fn try_from(value: ViewMetadataV1) -> Result<Self, self::Error> {
+            let schemas = HashMap::from_iter(
+                value
+                    .schemas
+                    .into_iter()
+                    .map(|schema| Ok((schema.schema_id, 
Arc::new(schema.try_into()?))))
+                    .collect::<Result<Vec<_>, Error>>()?,
+            );
+            let versions = HashMap::from_iter(
+                value
+                    .versions
+                    .into_iter()
+                    .map(|x| Ok((x.version_id, 
Arc::new(ViewVersion::from(x)))))
+                    .collect::<Result<Vec<_>, Error>>()?,
+            );
+            // Make sure at least the current schema exists
+            let current_version =
+                versions
+                    .get(&value.current_version_id)
+                    .ok_or(self::Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "No version exists with the current version id 
{}.",
+                            value.current_version_id
+                        ),
+                    ))?;
+            if !schemas.contains_key(&current_version.schema_id()) {
+                return Err(self::Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "No schema exists with the schema id {}.",
+                        current_version.schema_id()
+                    ),
+                ));
+            }
+
+            Ok(ViewMetadata {
+                format_version: ViewFormatVersion::V1,
+                view_uuid: value.view_uuid,
+                location: value.location,
+                schemas,
+                properties: value.properties.unwrap_or_default(),
+                current_version_id: value.current_version_id,
+                versions,
+                version_log: value.version_log,
+            })
+        }
+    }
+
+    impl From<ViewMetadata> for ViewMetadataV1 {
+        fn from(v: ViewMetadata) -> Self {
+            let schemas = v
+                .schemas
+                .into_values()
+                .map(|x| {
+                    Arc::try_unwrap(x)
+                        .unwrap_or_else(|schema| schema.as_ref().clone())
+                        .into()
+                })
+                .collect();
+            let versions = v
+                .versions
+                .into_values()
+                .map(|x| {
+                    Arc::try_unwrap(x)
+                        .unwrap_or_else(|version| version.as_ref().clone())
+                        .into()
+                })
+                .collect();
+            ViewMetadataV1 {
+                format_version: VersionNumber::<1>,
+                view_uuid: v.view_uuid,
+                location: v.location,
+                schemas,
+                properties: Some(v.properties),
+                current_version_id: v.current_version_id,
+                versions,
+                version_log: v.version_log,
+            }
+        }
+    }
+}
+
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
+#[repr(u8)]
+/// Iceberg format version
+pub enum ViewFormatVersion {
+    /// Iceberg view spec version 1
+    V1 = 1u8,
+}
+
+impl PartialOrd for ViewFormatVersion {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for ViewFormatVersion {
+    fn cmp(&self, other: &Self) -> Ordering {
+        (*self as u8).cmp(&(*other as u8))
+    }
+}
+
+impl Display for ViewFormatVersion {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ViewFormatVersion::V1 => write!(f, "v1"),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::fs;
+    use std::sync::Arc;
+
+    use anyhow::Result;
+    use pretty_assertions::assert_eq;
+    use uuid::Uuid;
+
+    use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
+    use crate::spec::{
+        NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type, 
ViewMetadata,
+        ViewRepresentations, ViewVersion,
+    };
+    use crate::{NamespaceIdent, ViewCreation};
+
+    fn check_view_metadata_serde(json: &str, expected_type: ViewMetadata) {
+        let desered_type: ViewMetadata = serde_json::from_str(json).unwrap();
+        assert_eq!(desered_type, expected_type);
+
+        let sered_json = serde_json::to_string(&expected_type).unwrap();
+        let parsed_json_value = 
serde_json::from_str::<ViewMetadata>(&sered_json).unwrap();
+
+        assert_eq!(parsed_json_value, desered_type);
+    }
+
+    fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
+        let path = format!("testdata/view_metadata/{}", file_name);
+        let metadata: String = fs::read_to_string(path).unwrap();
+
+        serde_json::from_str(&metadata).unwrap()
+    }
+
+    #[test]
+    fn test_view_data_v1() {
+        let data = r#"
+        {
+            "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+            "format-version" : 1,
+            "location" : "s3://bucket/warehouse/default.db/event_agg",
+            "current-version-id" : 1,
+            "properties" : {
+              "comment" : "Daily event counts"
+            },
+            "versions" : [ {
+              "version-id" : 1,
+              "timestamp-ms" : 1573518431292,
+              "schema-id" : 1,
+              "default-catalog" : "prod",
+              "default-namespace" : [ "default" ],
+              "summary" : {
+                "engine-name" : "Spark",
+                "engineVersion" : "3.3.2"
+              },
+              "representations" : [ {
+                "type" : "sql",
+                "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+                "dialect" : "spark"
+              } ]
+            } ],
+            "schemas": [ {
+              "schema-id": 1,
+              "type" : "struct",
+              "fields" : [ {
+                "id" : 1,
+                "name" : "event_count",
+                "required" : false,
+                "type" : "int",
+                "doc" : "Count of events"
+              } ]
+            } ],
+            "version-log" : [ {
+              "timestamp-ms" : 1573518431292,
+              "version-id" : 1
+            } ]
+          }
+        "#;
+
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![Arc::new(
+                NestedField::optional(1, "event_count", 
Type::Primitive(PrimitiveType::Int))
+                    .with_doc("Count of events"),
+            )])
+            .build()
+            .unwrap();
+        let version = ViewVersion::builder()
+            .with_version_id(1)
+            .with_timestamp_ms(1573518431292)
+            .with_schema_id(1)
+            .with_default_catalog("prod".to_string().into())
+            
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+            .with_summary(HashMap::from_iter(vec![
+                ("engineVersion".to_string(), "3.3.2".to_string()),
+                ("engine-name".to_string(), "Spark".to_string()),
+            ]))
+            
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
+                sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2"
+                    .to_string(),
+                dialect: "spark".to_string(),
+            }
+            .into()]))
+            .build();
+
+        let expected = ViewMetadata {
+            format_version: ViewFormatVersion::V1,
+            view_uuid: 
Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
+            location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
+            current_version_id: 1,
+            versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
+            version_log: vec![ViewVersionLog {
+                timestamp_ms: 1573518431292,
+                version_id: 1,
+            }],
+            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+            properties: HashMap::from_iter(vec![(
+                "comment".to_string(),
+                "Daily event counts".to_string(),
+            )]),
+        };
+
+        check_view_metadata_serde(data, expected);
+    }
+
+    #[test]
+    fn test_invalid_view_uuid() -> Result<()> {
+        let data = r#"
+            {
+                "format-version" : 1,
+                "view-uuid": "xxxx"
+            }
+        "#;
+        assert!(serde_json::from_str::<ViewMetadata>(data).is_err());
+        Ok(())
+    }
+
+    #[test]
+    fn test_view_builder_from_view_creation() {
+        let representations = ViewRepresentations(vec![SqlViewRepresentation {
+            sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2"
+                .to_string(),
+            dialect: "spark".to_string(),
+        }
+        .into()]);
+        let creation = ViewCreation::builder()
+            .location("s3://bucket/warehouse/default.db/event_agg".to_string())
+            .name("view".to_string())
+            .schema(Schema::builder().build().unwrap())
+            
.default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+            .representations(representations)
+            .build();
+
+        let metadata = ViewMetadataBuilder::from_view_creation(creation)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            metadata.location(),
+            "s3://bucket/warehouse/default.db/event_agg"
+        );
+        assert_eq!(metadata.current_version_id(), 1);
+        assert_eq!(metadata.versions().count(), 1);
+        assert_eq!(metadata.schemas_iter().count(), 1);
+        assert_eq!(metadata.properties().len(), 0);
+    }
+
+    #[test]
+    fn test_view_metadata_v1_file_valid() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataV1Valid.json").unwrap();
+
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![
+                Arc::new(
+                    NestedField::optional(1, "event_count", 
Type::Primitive(PrimitiveType::Int))
+                        .with_doc("Count of events"),
+                ),
+                Arc::new(NestedField::optional(
+                    2,
+                    "event_date",
+                    Type::Primitive(PrimitiveType::Date),
+                )),
+            ])
+            .build()
+            .unwrap();
+
+        let version = ViewVersion::builder()
+            .with_version_id(1)
+            .with_timestamp_ms(1573518431292)
+            .with_schema_id(1)
+            .with_default_catalog("prod".to_string().into())
+            
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+            .with_summary(HashMap::from_iter(vec![
+                ("engineVersion".to_string(), "3.3.2".to_string()),
+                ("engine-name".to_string(), "Spark".to_string()),
+            ]))
+            
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
+                sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2"
+                    .to_string(),
+                dialect: "spark".to_string(),
+            }
+            .into()]))
+            .build();
+
+        let expected = ViewMetadata {
+            format_version: ViewFormatVersion::V1,
+            view_uuid: 
Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
+            location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
+            current_version_id: 1,
+            versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
+            version_log: vec![ViewVersionLog {
+                timestamp_ms: 1573518431292,
+                version_id: 1,
+            }],
+            schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+            properties: HashMap::from_iter(vec![(
+                "comment".to_string(),
+                "Daily event counts".to_string(),
+            )]),
+        };
+
+        check_view_metadata_serde(&metadata, expected);
+    }
+
+    #[test]
+    fn test_view_builder_assign_uuid() {
+        let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
+        let metadata_builder = ViewMetadataBuilder::new(metadata);
+        let uuid = Uuid::new_v4();
+        let metadata = 
metadata_builder.assign_uuid(uuid).unwrap().build().unwrap();
+        assert_eq!(metadata.uuid(), uuid);
+    }
+
+    #[test]
+    fn test_view_metadata_v1_unsupported_version() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataUnsupportedVersion.json")
+                .unwrap();
+
+        let desered: Result<ViewMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert_eq!(
+            desered.unwrap_err().to_string(),
+            "data did not match any variant of untagged enum ViewMetadataEnum"
+        )
+    }
+
+    #[test]
+    fn test_view_metadata_v1_version_not_found() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json")
+                .unwrap();
+
+        let desered: Result<ViewMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert_eq!(
+            desered.unwrap_err().to_string(),
+            "DataInvalid => No version exists with the current version id 2."
+        )
+    }
+
+    #[test]
+    fn test_view_metadata_v1_schema_not_found() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataV1SchemaNotFound.json").unwrap();
+
+        let desered: Result<ViewMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert_eq!(
+            desered.unwrap_err().to_string(),
+            "DataInvalid => No schema exists with the schema id 2."
+        )
+    }
+
+    #[test]
+    fn test_view_metadata_v1_missing_schema_for_version() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingSchema.json").unwrap();
+
+        let desered: Result<ViewMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert_eq!(
+            desered.unwrap_err().to_string(),
+            "data did not match any variant of untagged enum ViewMetadataEnum"
+        )
+    }
+
+    #[test]
+    fn test_view_metadata_v1_missing_current_version() {
+        let metadata =
+            
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json")
+                .unwrap();
+
+        let desered: Result<ViewMetadata, serde_json::Error> = 
serde_json::from_str(&metadata);
+
+        assert_eq!(
+            desered.unwrap_err().to_string(),
+            "data did not match any variant of untagged enum ViewMetadataEnum"
+        )
+    }
+}
diff --git a/crates/iceberg/src/spec/view_version.rs 
b/crates/iceberg/src/spec/view_version.rs
new file mode 100644
index 0000000..30686b5
--- /dev/null
+++ b/crates/iceberg/src/spec/view_version.rs
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * View Versions!
+ */
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use _serde::ViewVersionV1;
+use chrono::{DateTime, Utc};
+use serde::{Deserialize, Serialize};
+use typed_builder::TypedBuilder;
+
+use super::view_metadata::ViewVersionLog;
+use crate::catalog::NamespaceIdent;
+use crate::error::{timestamp_ms_to_utc, Result};
+use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
+use crate::{Error, ErrorKind};
+
+/// Reference to [`ViewVersion`].
+pub type ViewVersionRef = Arc<ViewVersion>;
+
+/// Alias for the integer type used for view version ids.
+pub type ViewVersionId = i32;
+
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
+#[serde(from = "ViewVersionV1", into = "ViewVersionV1")]
+#[builder(field_defaults(setter(prefix = "with_")))]
+/// A view versions represents the definition of a view at a specific point in 
time.
+pub struct ViewVersion {
+    /// A unique long ID
+    version_id: ViewVersionId,
+    /// ID of the schema for the view version
+    schema_id: SchemaId,
+    /// Timestamp when the version was created (ms from epoch)
+    timestamp_ms: i64,
+    /// A string to string map of summary metadata about the version
+    summary: HashMap<String, String>,
+    /// A list of representations for the view definition.
+    representations: ViewRepresentations,
+    /// Catalog name to use when a reference in the SELECT does not contain a 
catalog
+    #[builder(default = None)]
+    default_catalog: Option<String>,
+    /// Namespace to use when a reference in the SELECT is a single identifier
+    default_namespace: NamespaceIdent,
+}
+
+impl ViewVersion {
+    /// Get the version id of this view version.
+    #[inline]
+    pub fn version_id(&self) -> ViewVersionId {
+        self.version_id
+    }
+
+    /// Get the schema id of this view version.
+    #[inline]
+    pub fn schema_id(&self) -> SchemaId {
+        self.schema_id
+    }
+
+    /// Get the timestamp of when the view version was created
+    #[inline]
+    pub fn timestamp(&self) -> Result<DateTime<Utc>> {
+        timestamp_ms_to_utc(self.timestamp_ms)
+    }
+
+    /// Get the timestamp of when the view version was created in milliseconds 
since epoch
+    #[inline]
+    pub fn timestamp_ms(&self) -> i64 {
+        self.timestamp_ms
+    }
+
+    /// Get summary of the view version
+    #[inline]
+    pub fn summary(&self) -> &HashMap<String, String> {
+        &self.summary
+    }
+
+    /// Get this views representations
+    #[inline]
+    pub fn representations(&self) -> &ViewRepresentations {
+        &self.representations
+    }
+
+    /// Get the default catalog for this view version
+    #[inline]
+    pub fn default_catalog(&self) -> Option<&String> {
+        self.default_catalog.as_ref()
+    }
+
+    /// Get the default namespace to use when a reference in the SELECT is a 
single identifier
+    #[inline]
+    pub fn default_namespace(&self) -> &NamespaceIdent {
+        &self.default_namespace
+    }
+
+    /// Get the schema of this snapshot.
+    pub fn schema(&self, view_metadata: &ViewMetadata) -> Result<SchemaRef> {
+        let r = view_metadata
+            .schema_by_id(self.schema_id())
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Schema with id {} not found", self.schema_id()),
+                )
+            })
+            .cloned();
+        r
+    }
+
+    /// Retrieve the history log entry for this view version.
+    #[allow(dead_code)]
+    pub(crate) fn log(&self) -> ViewVersionLog {
+        ViewVersionLog::new(self.version_id, self.timestamp_ms)
+    }
+}
+
+/// A list of view representations.
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);
+
+impl ViewRepresentations {
+    #[inline]
+    /// Get the number of representations
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    #[inline]
+    /// Check if there are no representations
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
+
+    /// Get an iterator over the representations
+    pub fn iter(&self) -> impl Iterator<Item = &'_ ViewRepresentation> {
+        self.0.iter()
+    }
+}
+
+// Iterator for ViewRepresentations
+impl IntoIterator for ViewRepresentations {
+    type Item = ViewRepresentation;
+    type IntoIter = std::vec::IntoIter<Self::Item>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.0.into_iter()
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(tag = "type")]
+/// View definitions can be represented in multiple ways.
+/// Representations are documented ways to express a view definition.
+// ToDo: Make unique per Dialect
+pub enum ViewRepresentation {
+    #[serde(rename = "sql")]
+    /// The SQL representation stores the view definition as a SQL SELECT,
+    Sql(SqlViewRepresentation),
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// The SQL representation stores the view definition as a SQL SELECT,
+/// with metadata such as the SQL dialect.
+pub struct SqlViewRepresentation {
+    #[serde(rename = "sql")]
+    /// The SQL SELECT statement that defines the view.
+    pub sql: String,
+    #[serde(rename = "dialect")]
+    /// The dialect of the sql SELECT statement (e.g., "trino" or "spark")
+    pub dialect: String,
+}
+
+pub(super) mod _serde {
+    /// This is a helper module that defines types to help with 
serialization/deserialization.
+    /// For deserialization the input first gets read into the 
[`ViewVersionV1`] struct.
+    /// and then converted into the [Snapshot] struct. Serialization works the 
other way around.
+    /// [ViewVersionV1] are internal struct that are only used for 
serialization and deserialization.
+    use serde::{Deserialize, Serialize};
+
+    use super::{ViewRepresentation, ViewRepresentations, ViewVersion};
+    use crate::catalog::NamespaceIdent;
+
+    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+    #[serde(rename_all = "kebab-case")]
+    /// Defines the structure of a v1 view version for 
serialization/deserialization
+    pub(crate) struct ViewVersionV1 {
+        pub version_id: i32,
+        pub schema_id: i32,
+        pub timestamp_ms: i64,
+        pub summary: std::collections::HashMap<String, String>,
+        pub representations: Vec<ViewRepresentation>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        pub default_catalog: Option<String>,
+        pub default_namespace: NamespaceIdent,
+    }
+
+    impl From<ViewVersionV1> for ViewVersion {
+        fn from(v1: ViewVersionV1) -> Self {
+            ViewVersion {
+                version_id: v1.version_id,
+                schema_id: v1.schema_id,
+                timestamp_ms: v1.timestamp_ms,
+                summary: v1.summary,
+                representations: ViewRepresentations(v1.representations),
+                default_catalog: v1.default_catalog,
+                default_namespace: v1.default_namespace,
+            }
+        }
+    }
+
+    impl From<ViewVersion> for ViewVersionV1 {
+        fn from(v1: ViewVersion) -> Self {
+            ViewVersionV1 {
+                version_id: v1.version_id,
+                schema_id: v1.schema_id,
+                timestamp_ms: v1.timestamp_ms,
+                summary: v1.summary,
+                representations: v1.representations.0,
+                default_catalog: v1.default_catalog,
+                default_namespace: v1.default_namespace,
+            }
+        }
+    }
+}
+
+impl From<SqlViewRepresentation> for ViewRepresentation {
+    fn from(sql: SqlViewRepresentation) -> Self {
+        ViewRepresentation::Sql(sql)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use chrono::{TimeZone, Utc};
+
+    use crate::spec::view_version::ViewVersion;
+    use crate::spec::view_version::_serde::ViewVersionV1;
+    use crate::spec::ViewRepresentations;
+
+    #[test]
+    fn view_version() {
+        let record = serde_json::json!(
+        {
+            "version-id" : 1,
+            "timestamp-ms" : 1573518431292i64,
+            "schema-id" : 1,
+            "default-catalog" : "prod",
+            "default-namespace" : [ "default" ],
+            "summary" : {
+              "engine-name" : "Spark",
+              "engineVersion" : "3.3.2"
+            },
+            "representations" : [ {
+              "type" : "sql",
+              "sql" : "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+              "dialect" : "spark"
+            } ]
+          }
+        );
+
+        let result: ViewVersion = 
serde_json::from_value::<ViewVersionV1>(record.clone())
+            .unwrap()
+            .into();
+
+        // Roundtrip
+        assert_eq!(serde_json::to_value(result.clone()).unwrap(), record);
+
+        assert_eq!(result.version_id(), 1);
+        assert_eq!(
+            result.timestamp().unwrap(),
+            Utc.timestamp_millis_opt(1573518431292).unwrap()
+        );
+        assert_eq!(result.schema_id(), 1);
+        assert_eq!(result.default_catalog, Some("prod".to_string()));
+        assert_eq!(result.summary(), &{
+            let mut map = std::collections::HashMap::new();
+            map.insert("engine-name".to_string(), "Spark".to_string());
+            map.insert("engineVersion".to_string(), "3.3.2".to_string());
+            map
+        });
+        assert_eq!(
+            result.representations().to_owned(),
+            ViewRepresentations(vec![super::ViewRepresentation::Sql(
+                super::SqlViewRepresentation {
+                    sql: "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2"
+                        .to_string(),
+                    dialect: "spark".to_string(),
+                },
+            )])
+        );
+        assert_eq!(
+            result.default_namespace.inner(),
+            vec!["default".to_string()]
+        );
+    }
+}
diff --git 
a/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json 
b/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json
new file mode 100644
index 0000000..c5627b8
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json
@@ -0,0 +1,58 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 2,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "current-version-id": 1,
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "schema-id": 1,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
new file mode 100644
index 0000000..4ba94ca
--- /dev/null
+++ 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
@@ -0,0 +1,58 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "current-version-id": 2,
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "schema-id": 1,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
new file mode 100644
index 0000000..c210881
--- /dev/null
+++ 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
@@ -0,0 +1,57 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "schema-id": 1,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json
new file mode 100644
index 0000000..b5b454b
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json
@@ -0,0 +1,56 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json
new file mode 100644
index 0000000..0026d22
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json
@@ -0,0 +1,58 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "current-version-id": 1,
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "schema-id": 2,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file
diff --git a/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json 
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json
new file mode 100644
index 0000000..5011a80
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json
@@ -0,0 +1,58 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/warehouse/default.db/event_agg",
+  "current-version-id": 1,
+  "properties": {
+    "comment": "Daily event counts"
+  },
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 1573518431292,
+      "schema-id": 1,
+      "default-catalog": "prod",
+      "default-namespace": [
+        "default"
+      ],
+      "summary": {
+        "engine-name": "Spark",
+        "engineVersion": "3.3.2"
+      },
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "SELECT\n    COUNT(1), CAST(event_ts AS DATE)\nFROM 
events\nGROUP BY 2",
+          "dialect": "spark"
+        }
+      ]
+    }
+  ],
+  "schemas": [
+    {
+      "schema-id": 1,
+      "type": "struct",
+      "fields": [
+        {
+          "id": 1,
+          "name": "event_count",
+          "required": false,
+          "type": "int",
+          "doc": "Count of events"
+        },
+        {
+          "id": 2,
+          "name": "event_date",
+          "required": false,
+          "type": "date"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 1573518431292,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file

Reply via email to