This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ab4f69a View Spec implementation (#331)
ab4f69a is described below
commit ab4f69a64115fc66edff0fa0a5c5032c53e138e7
Author: Christian <[email protected]>
AuthorDate: Sun Jul 28 16:02:22 2024 +0200
View Spec implementation (#331)
* Add support for ViewSpec
* Fix typos
* Fix typos
* clippy is always right
* Add tests
* Remove new_view_version test function
* Remove append_version
* View Representations Struct
* ViewRepresentation case insensitive
* Add fallible methods for ViewRepresentationsBuilder
* Add tests for fallibe ViewRepresentationsBuilder methods
* Introduce ViewVersionId as i32
* Iterator for &'a ViewRepresentations
* Improve comments
Co-authored-by: Renjie Liu <[email protected]>
* Add test_view_metadata_v1_file_valid
* Fix view_version iter
* Remove ViewRepresentationsBuilder
* Fix comment
* Timestamp error handling
* Fallible Timestamp Conversion from Millis
* Fix Initial view Version = 1
* Cleanup
* Hide ViewMetadata iter() type
* timestamp_ms_to_utc -> error.rs
* TableMetadata timestamp conversion -> utility function
* Improve error context
* timestamp_ms_to_utc: LocalResult::None -> DataInvalid
* Fix obsolete comment
* ViewRepresentation::SqlViewRepresentation -> ::Sql
* Fix broken clippy from rebase
---------
Co-authored-by: Renjie Liu <[email protected]>
---
Cargo.toml | 3 +-
crates/catalog/rest/src/catalog.rs | 14 +-
crates/iceberg/src/catalog/mod.rs | 27 +-
crates/iceberg/src/error.rs | 24 +
crates/iceberg/src/lib.rs | 2 +-
crates/iceberg/src/spec/mod.rs | 4 +
crates/iceberg/src/spec/table_metadata.rs | 28 +-
crates/iceberg/src/spec/view_metadata.rs | 728 +++++++++++++++++++++
crates/iceberg/src/spec/view_version.rs | 313 +++++++++
.../ViewMetadataUnsupportedVersion.json | 58 ++
.../ViewMetadataV1CurrentVersionNotFound.json | 58 ++
.../ViewMetadataV1MissingCurrentVersion.json | 57 ++
.../view_metadata/ViewMetadataV1MissingSchema.json | 56 ++
.../ViewMetadataV1SchemaNotFound.json | 58 ++
.../view_metadata/ViewMetadataV1Valid.json | 58 ++
15 files changed, 1474 insertions(+), 14 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c4f8482..60307bd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -85,7 +85,8 @@ tempfile = "3.8"
tokio = { version = "1", default-features = false }
typed-builder = "^0.19"
url = "2"
-uuid = "1.6.1"
+urlencoding = "2"
+uuid = { version = "1.6.1", features = ["v7"] }
volo-thrift = "0.10"
hive_metastore = "0.1.0"
tera = "1"
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index aab615c..30f2e29 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -1321,7 +1321,7 @@ mod tests {
);
assert_eq!(
Utc.timestamp_millis_opt(1646787054459).unwrap(),
- table.metadata().last_updated_ms()
+ table.metadata().last_updated_timestamp().unwrap()
);
assert_eq!(
vec![&Arc::new(
@@ -1511,7 +1511,11 @@ mod tests {
);
assert_eq!(
1657810967051,
- table.metadata().last_updated_ms().timestamp_millis()
+ table
+ .metadata()
+ .last_updated_timestamp()
+ .unwrap()
+ .timestamp_millis()
);
assert_eq!(
vec![&Arc::new(
@@ -1682,7 +1686,11 @@ mod tests {
);
assert_eq!(
1657810967051,
- table.metadata().last_updated_ms().timestamp_millis()
+ table
+ .metadata()
+ .last_updated_timestamp()
+ .unwrap()
+ .timestamp_millis()
);
assert_eq!(
vec![&Arc::new(
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 5c63e1e..bc98772 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,7 +29,7 @@ use uuid::Uuid;
use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder,
TableMetadataBuilder,
- UnboundPartitionSpec,
+ UnboundPartitionSpec, ViewRepresentations,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
@@ -439,6 +439,31 @@ impl TableUpdate {
}
}
+/// ViewCreation represents the creation of a view in the catalog.
+#[derive(Debug, TypedBuilder)]
+pub struct ViewCreation {
+ /// The name of the view.
+ pub name: String,
+ /// The view's base location; used to create metadata file locations
+ pub location: String,
+ /// Representations for the view.
+ pub representations: ViewRepresentations,
+ /// The schema of the view.
+ pub schema: Schema,
+ /// The properties of the view.
+ #[builder(default)]
+ pub properties: HashMap<String, String>,
+ /// The default namespace to use when a reference in the SELECT is a
single identifier
+ pub default_namespace: NamespaceIdent,
+ /// Default catalog to use when a reference in the SELECT does not contain
a catalog
+ #[builder(default)]
+ pub default_catalog: Option<String>,
+ /// A string to string map of summary metadata about the version
+ /// Typical keys are "engine-name" and "engine-version"
+ #[builder(default)]
+ pub summary: HashMap<String, String>,
+}
+
#[cfg(test)]
mod tests {
use std::collections::HashMap;
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 6270b43..6f7fd7c 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -19,6 +19,8 @@ use std::backtrace::{Backtrace, BacktraceStatus};
use std::fmt;
use std::fmt::{Debug, Display, Formatter};
+use chrono::{DateTime, TimeZone as _, Utc};
+
/// Result that is a wrapper of `Result<T, iceberg::Error>`
pub type Result<T> = std::result::Result<T, Error>;
@@ -331,6 +333,28 @@ define_from_err!(
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
+/// Converts a timestamp in milliseconds to `DateTime<Utc>`, handling errors.
+///
+/// # Arguments
+///
+/// * `timestamp_ms` - The timestamp in milliseconds to convert.
+///
+/// # Returns
+///
+/// This function returns a `Result<DateTime<Utc>, Error>` which is `Ok` with
the `DateTime<Utc>` if the conversion is successful,
+/// or an `Err` with an appropriate error if the timestamp is ambiguous or
invalid.
+pub(crate) fn timestamp_ms_to_utc(timestamp_ms: i64) -> Result<DateTime<Utc>> {
+ match Utc.timestamp_millis_opt(timestamp_ms) {
+ chrono::LocalResult::Single(t) => Ok(t),
+ chrono::LocalResult::Ambiguous(_, _) => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Ambiguous timestamp with two possible results",
+ )),
+ chrono::LocalResult::None => Err(Error::new(ErrorKind::DataInvalid,
"Invalid timestamp")),
+ }
+ .map_err(|e| e.with_context("timestamp value", timestamp_ms.to_string()))
+}
+
/// Helper macro to check arguments.
///
///
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 35d5932..9682fa1 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -29,7 +29,7 @@ mod catalog;
pub use catalog::{
Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation,
TableIdent, TableRequirement,
- TableUpdate,
+ TableUpdate, ViewCreation,
};
pub mod table;
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 199fc4a..793f00d 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -27,6 +27,8 @@ mod sort;
mod table_metadata;
mod transform;
mod values;
+mod view_metadata;
+mod view_version;
pub use datatypes::*;
pub use manifest::*;
@@ -38,3 +40,5 @@ pub use sort::*;
pub use table_metadata::*;
pub use transform::*;
pub use values::*;
+pub use view_metadata::*;
+pub use view_version::*;
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index d9a09d8..53bcabb 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -24,7 +24,7 @@ use std::fmt::{Display, Formatter};
use std::sync::Arc;
use _serde::TableMetadataEnum;
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
@@ -33,7 +33,7 @@ use super::snapshot::{Snapshot, SnapshotReference,
SnapshotRetention};
use super::{
PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef,
SortOrder, SortOrderRef,
};
-use crate::error::Result;
+use crate::error::{timestamp_ms_to_utc, Result};
use crate::{Error, ErrorKind, TableCreation};
static MAIN_BRANCH: &str = "main";
@@ -143,8 +143,14 @@ impl TableMetadata {
/// Returns last updated time.
#[inline]
- pub fn last_updated_ms(&self) -> DateTime<Utc> {
- Utc.timestamp_millis_opt(self.last_updated_ms).unwrap()
+ pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
+ timestamp_ms_to_utc(self.last_updated_ms)
+ }
+
+ /// Returns last updated time in milliseconds.
+ #[inline]
+ pub fn last_updated_ms(&self) -> i64 {
+ self.last_updated_ms
}
/// Returns schemas
@@ -328,7 +334,7 @@ impl TableMetadataBuilder {
let table_metadata = TableMetadata {
format_version: FormatVersion::V2,
- table_uuid: Uuid::new_v4(),
+ table_uuid: Uuid::now_v7(),
location: location.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
@@ -472,7 +478,7 @@ pub(super) mod _serde {
/// Helper to serialize and deserialize the format version.
#[derive(Debug, PartialEq, Eq)]
- pub(super) struct VersionNumber<const V: u8>;
+ pub(crate) struct VersionNumber<const V: u8>;
impl Serialize for TableMetadata {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -903,8 +909,14 @@ pub struct SnapshotLog {
impl SnapshotLog {
/// Returns the last updated timestamp as a DateTime<Utc> with millisecond
precision
- pub fn timestamp(self) -> DateTime<Utc> {
- Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
+ pub fn timestamp(self) -> Result<DateTime<Utc>> {
+ timestamp_ms_to_utc(self.timestamp_ms)
+ }
+
+ /// Returns the timestamp in milliseconds
+ #[inline]
+ pub fn timestamp_ms(&self) -> i64 {
+ self.timestamp_ms
}
}
diff --git a/crates/iceberg/src/spec/view_metadata.rs
b/crates/iceberg/src/spec/view_metadata.rs
new file mode 100644
index 0000000..cc46f07
--- /dev/null
+++ b/crates/iceberg/src/spec/view_metadata.rs
@@ -0,0 +1,728 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the [view
metadata](https://iceberg.apache.org/view-spec/#view-metadata).
+//! The main struct here is [ViewMetadata] which defines the data for a view.
+
+use std::cmp::Ordering;
+use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+
+use _serde::ViewMetadataEnum;
+use chrono::{DateTime, MappedLocalTime, TimeZone, Utc};
+use serde::{Deserialize, Serialize};
+use serde_repr::{Deserialize_repr, Serialize_repr};
+use uuid::Uuid;
+
+use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
+use super::{SchemaId, SchemaRef};
+use crate::catalog::ViewCreation;
+use crate::error::Result;
+
+/// Reference to [`ViewMetadata`].
+pub type ViewMetadataRef = Arc<ViewMetadata>;
+
+pub(crate) static INITIAL_VIEW_VERSION_ID: i32 = 1;
+
+#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
+#[serde(try_from = "ViewMetadataEnum", into = "ViewMetadataEnum")]
+/// Fields for the version 1 of the view metadata.
+///
+/// We assume that this data structure is always valid, so we will panic when
invalid error happens.
+/// We check the validity of this data structure when constructing.
+pub struct ViewMetadata {
+ /// Integer Version for the format.
+ pub(crate) format_version: ViewFormatVersion,
+ /// A UUID that identifies the view, generated when the view is created.
+ pub(crate) view_uuid: Uuid,
+ /// The view's base location; used to create metadata file locations
+ pub(crate) location: String,
+ /// ID of the current version of the view (version-id)
+ pub(crate) current_version_id: ViewVersionId,
+ /// A list of known versions of the view
+ pub(crate) versions: HashMap<ViewVersionId, ViewVersionRef>,
+ /// A list of version log entries with the timestamp and version-id for
every
+ /// change to current-version-id
+ pub(crate) version_log: Vec<ViewVersionLog>,
+ /// A list of schemas, stored as objects with schema-id.
+ pub(crate) schemas: HashMap<i32, SchemaRef>,
+ /// A string to string map of view properties.
+ /// Properties are used for metadata such as comment and for settings that
+ /// affect view maintenance. This is not intended to be used for arbitrary
metadata.
+ pub(crate) properties: HashMap<String, String>,
+}
+
+impl ViewMetadata {
+ /// Returns format version of this metadata.
+ #[inline]
+ pub fn format_version(&self) -> ViewFormatVersion {
+ self.format_version
+ }
+
+ /// Returns uuid of current view.
+ #[inline]
+ pub fn uuid(&self) -> Uuid {
+ self.view_uuid
+ }
+
+ /// Returns view location.
+ #[inline]
+ pub fn location(&self) -> &str {
+ self.location.as_str()
+ }
+
+ /// Returns the current version id.
+ #[inline]
+ pub fn current_version_id(&self) -> ViewVersionId {
+ self.current_version_id
+ }
+
+ /// Returns all view versions.
+ #[inline]
+ pub fn versions(&self) -> impl Iterator<Item = &ViewVersionRef> {
+ self.versions.values()
+ }
+
+ /// Lookup a view version by id.
+ #[inline]
+ pub fn version_by_id(&self, version_id: ViewVersionId) ->
Option<&ViewVersionRef> {
+ self.versions.get(&version_id)
+ }
+
+ /// Returns the current view version.
+ #[inline]
+ pub fn current_version(&self) -> &ViewVersionRef {
+ self.versions
+ .get(&self.current_version_id)
+ .expect("Current version id set, but not found in view versions")
+ }
+
+ /// Returns schemas
+ #[inline]
+ pub fn schemas_iter(&self) -> impl Iterator<Item = &SchemaRef> {
+ self.schemas.values()
+ }
+
+ /// Lookup schema by id.
+ #[inline]
+ pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> {
+ self.schemas.get(&schema_id)
+ }
+
+ /// Get current schema
+ #[inline]
+ pub fn current_schema(&self) -> &SchemaRef {
+ let schema_id = self.current_version().schema_id();
+ self.schema_by_id(schema_id)
+ .expect("Current schema id set, but not found in view metadata")
+ }
+
+ /// Returns properties of the view.
+ #[inline]
+ pub fn properties(&self) -> &HashMap<String, String> {
+ &self.properties
+ }
+
+ /// Returns view history.
+ #[inline]
+ pub fn history(&self) -> &[ViewVersionLog] {
+ &self.version_log
+ }
+}
+
+/// Manipulating view metadata.
+pub struct ViewMetadataBuilder(ViewMetadata);
+
+impl ViewMetadataBuilder {
+ /// Creates a new view metadata builder from the given view metadata.
+ pub fn new(origin: ViewMetadata) -> Self {
+ Self(origin)
+ }
+
+ /// Creates a new view metadata builder from the given view creation.
+ pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
+ let ViewCreation {
+ location,
+ schema,
+ properties,
+ name: _,
+ representations,
+ default_catalog,
+ default_namespace,
+ summary,
+ } = view_creation;
+ let initial_version_id = super::INITIAL_VIEW_VERSION_ID;
+ let version = ViewVersion::builder()
+ .with_default_catalog(default_catalog)
+ .with_default_namespace(default_namespace)
+ .with_representations(representations)
+ .with_schema_id(schema.schema_id())
+ .with_summary(summary)
+ .with_timestamp_ms(Utc::now().timestamp_millis())
+ .with_version_id(initial_version_id)
+ .build();
+
+ let versions = HashMap::from_iter(vec![(initial_version_id,
version.into())]);
+
+ let view_metadata = ViewMetadata {
+ format_version: ViewFormatVersion::V1,
+ view_uuid: Uuid::now_v7(),
+ location,
+ current_version_id: initial_version_id,
+ versions,
+ version_log: Vec::new(),
+ schemas: HashMap::from_iter(vec![(schema.schema_id(),
Arc::new(schema))]),
+ properties,
+ };
+
+ Ok(Self(view_metadata))
+ }
+
+ /// Changes uuid of view metadata.
+ pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
+ self.0.view_uuid = uuid;
+ Ok(self)
+ }
+
+ /// Returns the new view metadata after changes.
+ pub fn build(self) -> Result<ViewMetadata> {
+ Ok(self.0)
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A log of when each snapshot was made.
+pub struct ViewVersionLog {
+ /// ID that current-version-id was set to
+ version_id: ViewVersionId,
+ /// Timestamp when the view's current-version-id was updated (ms from
epoch)
+ timestamp_ms: i64,
+}
+
+impl ViewVersionLog {
+ #[inline]
+ /// Creates a new view version log.
+ pub fn new(version_id: ViewVersionId, timestamp: i64) -> Self {
+ Self {
+ version_id,
+ timestamp_ms: timestamp,
+ }
+ }
+
+ /// Returns the version id.
+ #[inline]
+ pub fn version_id(&self) -> ViewVersionId {
+ self.version_id
+ }
+
+ /// Returns the timestamp in milliseconds from epoch.
+ #[inline]
+ pub fn timestamp_ms(&self) -> i64 {
+ self.timestamp_ms
+ }
+
+ /// Returns the last updated timestamp as a DateTime<Utc> with millisecond
precision.
+ pub fn timestamp(self) -> MappedLocalTime<DateTime<Utc>> {
+ Utc.timestamp_millis_opt(self.timestamp_ms)
+ }
+}
+
+pub(super) mod _serde {
+ /// This is a helper module that defines types to help with
serialization/deserialization.
+ /// For deserialization the input first gets read into either the
[ViewMetadataV1] struct
+ /// and then converted into the [ViewMetadata] struct. Serialization works
the other way around.
+ /// [ViewMetadataV1] is an internal struct that are only used for
serialization and deserialization.
+ use std::{collections::HashMap, sync::Arc};
+
+ use serde::{Deserialize, Serialize};
+ use uuid::Uuid;
+
+ use super::{ViewFormatVersion, ViewVersionId, ViewVersionLog};
+ use crate::spec::schema::_serde::SchemaV2;
+ use crate::spec::table_metadata::_serde::VersionNumber;
+ use crate::spec::view_version::_serde::ViewVersionV1;
+ use crate::spec::{ViewMetadata, ViewVersion};
+ use crate::{Error, ErrorKind};
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(untagged)]
+ pub(super) enum ViewMetadataEnum {
+ V1(ViewMetadataV1),
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v1 view metadata for
serialization/deserialization
+ pub(super) struct ViewMetadataV1 {
+ pub format_version: VersionNumber<1>,
+ pub(super) view_uuid: Uuid,
+ pub(super) location: String,
+ pub(super) current_version_id: ViewVersionId,
+ pub(super) versions: Vec<ViewVersionV1>,
+ pub(super) version_log: Vec<ViewVersionLog>,
+ pub(super) schemas: Vec<SchemaV2>,
+ pub(super) properties: Option<std::collections::HashMap<String,
String>>,
+ }
+
+ impl Serialize for ViewMetadata {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where S: serde::Serializer {
+ // we must do a clone here
+ let metadata_enum: ViewMetadataEnum =
+ self.clone().try_into().map_err(serde::ser::Error::custom)?;
+
+ metadata_enum.serialize(serializer)
+ }
+ }
+
+ impl TryFrom<ViewMetadataEnum> for ViewMetadata {
+ type Error = Error;
+ fn try_from(value: ViewMetadataEnum) -> Result<Self, Error> {
+ match value {
+ ViewMetadataEnum::V1(value) => value.try_into(),
+ }
+ }
+ }
+
+ impl TryFrom<ViewMetadata> for ViewMetadataEnum {
+ type Error = Error;
+ fn try_from(value: ViewMetadata) -> Result<Self, Error> {
+ Ok(match value.format_version {
+ ViewFormatVersion::V1 => ViewMetadataEnum::V1(value.into()),
+ })
+ }
+ }
+
+ impl TryFrom<ViewMetadataV1> for ViewMetadata {
+ type Error = Error;
+ fn try_from(value: ViewMetadataV1) -> Result<Self, self::Error> {
+ let schemas = HashMap::from_iter(
+ value
+ .schemas
+ .into_iter()
+ .map(|schema| Ok((schema.schema_id,
Arc::new(schema.try_into()?))))
+ .collect::<Result<Vec<_>, Error>>()?,
+ );
+ let versions = HashMap::from_iter(
+ value
+ .versions
+ .into_iter()
+ .map(|x| Ok((x.version_id,
Arc::new(ViewVersion::from(x)))))
+ .collect::<Result<Vec<_>, Error>>()?,
+ );
+ // Make sure at least the current schema exists
+ let current_version =
+ versions
+ .get(&value.current_version_id)
+ .ok_or(self::Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No version exists with the current version id
{}.",
+ value.current_version_id
+ ),
+ ))?;
+ if !schemas.contains_key(¤t_version.schema_id()) {
+ return Err(self::Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "No schema exists with the schema id {}.",
+ current_version.schema_id()
+ ),
+ ));
+ }
+
+ Ok(ViewMetadata {
+ format_version: ViewFormatVersion::V1,
+ view_uuid: value.view_uuid,
+ location: value.location,
+ schemas,
+ properties: value.properties.unwrap_or_default(),
+ current_version_id: value.current_version_id,
+ versions,
+ version_log: value.version_log,
+ })
+ }
+ }
+
+ impl From<ViewMetadata> for ViewMetadataV1 {
+ fn from(v: ViewMetadata) -> Self {
+ let schemas = v
+ .schemas
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|schema| schema.as_ref().clone())
+ .into()
+ })
+ .collect();
+ let versions = v
+ .versions
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|version| version.as_ref().clone())
+ .into()
+ })
+ .collect();
+ ViewMetadataV1 {
+ format_version: VersionNumber::<1>,
+ view_uuid: v.view_uuid,
+ location: v.location,
+ schemas,
+ properties: Some(v.properties),
+ current_version_id: v.current_version_id,
+ versions,
+ version_log: v.version_log,
+ }
+ }
+ }
+}
+
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
+#[repr(u8)]
+/// Iceberg format version
+pub enum ViewFormatVersion {
+ /// Iceberg view spec version 1
+ V1 = 1u8,
+}
+
+impl PartialOrd for ViewFormatVersion {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl Ord for ViewFormatVersion {
+ fn cmp(&self, other: &Self) -> Ordering {
+ (*self as u8).cmp(&(*other as u8))
+ }
+}
+
+impl Display for ViewFormatVersion {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ ViewFormatVersion::V1 => write!(f, "v1"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+ use std::fs;
+ use std::sync::Arc;
+
+ use anyhow::Result;
+ use pretty_assertions::assert_eq;
+ use uuid::Uuid;
+
+ use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
+ use crate::spec::{
+ NestedField, PrimitiveType, Schema, SqlViewRepresentation, Type,
ViewMetadata,
+ ViewRepresentations, ViewVersion,
+ };
+ use crate::{NamespaceIdent, ViewCreation};
+
+ fn check_view_metadata_serde(json: &str, expected_type: ViewMetadata) {
+ let desered_type: ViewMetadata = serde_json::from_str(json).unwrap();
+ assert_eq!(desered_type, expected_type);
+
+ let sered_json = serde_json::to_string(&expected_type).unwrap();
+ let parsed_json_value =
serde_json::from_str::<ViewMetadata>(&sered_json).unwrap();
+
+ assert_eq!(parsed_json_value, desered_type);
+ }
+
+ fn get_test_view_metadata(file_name: &str) -> ViewMetadata {
+ let path = format!("testdata/view_metadata/{}", file_name);
+ let metadata: String = fs::read_to_string(path).unwrap();
+
+ serde_json::from_str(&metadata).unwrap()
+ }
+
+ #[test]
+ fn test_view_data_v1() {
+ let data = r#"
+ {
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version" : 1,
+ "location" : "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id" : 1,
+ "properties" : {
+ "comment" : "Daily event counts"
+ },
+ "versions" : [ {
+ "version-id" : 1,
+ "timestamp-ms" : 1573518431292,
+ "schema-id" : 1,
+ "default-catalog" : "prod",
+ "default-namespace" : [ "default" ],
+ "summary" : {
+ "engine-name" : "Spark",
+ "engineVersion" : "3.3.2"
+ },
+ "representations" : [ {
+ "type" : "sql",
+ "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect" : "spark"
+ } ]
+ } ],
+ "schemas": [ {
+ "schema-id": 1,
+ "type" : "struct",
+ "fields" : [ {
+ "id" : 1,
+ "name" : "event_count",
+ "required" : false,
+ "type" : "int",
+ "doc" : "Count of events"
+ } ]
+ } ],
+ "version-log" : [ {
+ "timestamp-ms" : 1573518431292,
+ "version-id" : 1
+ } ]
+ }
+ "#;
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![Arc::new(
+ NestedField::optional(1, "event_count",
Type::Primitive(PrimitiveType::Int))
+ .with_doc("Count of events"),
+ )])
+ .build()
+ .unwrap();
+ let version = ViewVersion::builder()
+ .with_version_id(1)
+ .with_timestamp_ms(1573518431292)
+ .with_schema_id(1)
+ .with_default_catalog("prod".to_string().into())
+
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+ .with_summary(HashMap::from_iter(vec![
+ ("engineVersion".to_string(), "3.3.2".to_string()),
+ ("engine-name".to_string(), "Spark".to_string()),
+ ]))
+
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
+ sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2"
+ .to_string(),
+ dialect: "spark".to_string(),
+ }
+ .into()]))
+ .build();
+
+ let expected = ViewMetadata {
+ format_version: ViewFormatVersion::V1,
+ view_uuid:
Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
+ location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
+ current_version_id: 1,
+ versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
+ version_log: vec![ViewVersionLog {
+ timestamp_ms: 1573518431292,
+ version_id: 1,
+ }],
+ schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+ properties: HashMap::from_iter(vec![(
+ "comment".to_string(),
+ "Daily event counts".to_string(),
+ )]),
+ };
+
+ check_view_metadata_serde(data, expected);
+ }
+
+ #[test]
+ fn test_invalid_view_uuid() -> Result<()> {
+ let data = r#"
+ {
+ "format-version" : 1,
+ "view-uuid": "xxxx"
+ }
+ "#;
+ assert!(serde_json::from_str::<ViewMetadata>(data).is_err());
+ Ok(())
+ }
+
+ #[test]
+ fn test_view_builder_from_view_creation() {
+ let representations = ViewRepresentations(vec![SqlViewRepresentation {
+ sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2"
+ .to_string(),
+ dialect: "spark".to_string(),
+ }
+ .into()]);
+ let creation = ViewCreation::builder()
+ .location("s3://bucket/warehouse/default.db/event_agg".to_string())
+ .name("view".to_string())
+ .schema(Schema::builder().build().unwrap())
+
.default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+ .representations(representations)
+ .build();
+
+ let metadata = ViewMetadataBuilder::from_view_creation(creation)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(
+ metadata.location(),
+ "s3://bucket/warehouse/default.db/event_agg"
+ );
+ assert_eq!(metadata.current_version_id(), 1);
+ assert_eq!(metadata.versions().count(), 1);
+ assert_eq!(metadata.schemas_iter().count(), 1);
+ assert_eq!(metadata.properties().len(), 0);
+ }
+
+ #[test]
+ fn test_view_metadata_v1_file_valid() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataV1Valid.json").unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ Arc::new(
+ NestedField::optional(1, "event_count",
Type::Primitive(PrimitiveType::Int))
+ .with_doc("Count of events"),
+ ),
+ Arc::new(NestedField::optional(
+ 2,
+ "event_date",
+ Type::Primitive(PrimitiveType::Date),
+ )),
+ ])
+ .build()
+ .unwrap();
+
+ let version = ViewVersion::builder()
+ .with_version_id(1)
+ .with_timestamp_ms(1573518431292)
+ .with_schema_id(1)
+ .with_default_catalog("prod".to_string().into())
+
.with_default_namespace(NamespaceIdent::from_vec(vec!["default".to_string()]).unwrap())
+ .with_summary(HashMap::from_iter(vec![
+ ("engineVersion".to_string(), "3.3.2".to_string()),
+ ("engine-name".to_string(), "Spark".to_string()),
+ ]))
+
.with_representations(ViewRepresentations(vec![SqlViewRepresentation {
+ sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2"
+ .to_string(),
+ dialect: "spark".to_string(),
+ }
+ .into()]))
+ .build();
+
+ let expected = ViewMetadata {
+ format_version: ViewFormatVersion::V1,
+ view_uuid:
Uuid::parse_str("fa6506c3-7681-40c8-86dc-e36561f83385").unwrap(),
+ location: "s3://bucket/warehouse/default.db/event_agg".to_string(),
+ current_version_id: 1,
+ versions: HashMap::from_iter(vec![(1, Arc::new(version))]),
+ version_log: vec![ViewVersionLog {
+ timestamp_ms: 1573518431292,
+ version_id: 1,
+ }],
+ schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
+ properties: HashMap::from_iter(vec![(
+ "comment".to_string(),
+ "Daily event counts".to_string(),
+ )]),
+ };
+
+ check_view_metadata_serde(&metadata, expected);
+ }
+
+ #[test]
+ fn test_view_builder_assign_uuid() {
+ let metadata = get_test_view_metadata("ViewMetadataV1Valid.json");
+ let metadata_builder = ViewMetadataBuilder::new(metadata);
+ let uuid = Uuid::new_v4();
+ let metadata =
metadata_builder.assign_uuid(uuid).unwrap().build().unwrap();
+ assert_eq!(metadata.uuid(), uuid);
+ }
+
+ #[test]
+ fn test_view_metadata_v1_unsupported_version() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataUnsupportedVersion.json")
+ .unwrap();
+
+ let desered: Result<ViewMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert_eq!(
+ desered.unwrap_err().to_string(),
+ "data did not match any variant of untagged enum ViewMetadataEnum"
+ )
+ }
+
+ #[test]
+ fn test_view_metadata_v1_version_not_found() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json")
+ .unwrap();
+
+ let desered: Result<ViewMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert_eq!(
+ desered.unwrap_err().to_string(),
+ "DataInvalid => No version exists with the current version id 2."
+ )
+ }
+
+ #[test]
+ fn test_view_metadata_v1_schema_not_found() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataV1SchemaNotFound.json").unwrap();
+
+ let desered: Result<ViewMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert_eq!(
+ desered.unwrap_err().to_string(),
+ "DataInvalid => No schema exists with the schema id 2."
+ )
+ }
+
+ #[test]
+ fn test_view_metadata_v1_missing_schema_for_version() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingSchema.json").unwrap();
+
+ let desered: Result<ViewMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert_eq!(
+ desered.unwrap_err().to_string(),
+ "data did not match any variant of untagged enum ViewMetadataEnum"
+ )
+ }
+
+ #[test]
+ fn test_view_metadata_v1_missing_current_version() {
+ let metadata =
+
fs::read_to_string("testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json")
+ .unwrap();
+
+ let desered: Result<ViewMetadata, serde_json::Error> =
serde_json::from_str(&metadata);
+
+ assert_eq!(
+ desered.unwrap_err().to_string(),
+ "data did not match any variant of untagged enum ViewMetadataEnum"
+ )
+ }
+}
diff --git a/crates/iceberg/src/spec/view_version.rs
b/crates/iceberg/src/spec/view_version.rs
new file mode 100644
index 0000000..30686b5
--- /dev/null
+++ b/crates/iceberg/src/spec/view_version.rs
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * View Versions!
+ */
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use _serde::ViewVersionV1;
+use chrono::{DateTime, Utc};
+use serde::{Deserialize, Serialize};
+use typed_builder::TypedBuilder;
+
+use super::view_metadata::ViewVersionLog;
+use crate::catalog::NamespaceIdent;
+use crate::error::{timestamp_ms_to_utc, Result};
+use crate::spec::{SchemaId, SchemaRef, ViewMetadata};
+use crate::{Error, ErrorKind};
+
+/// Reference to [`ViewVersion`].
+pub type ViewVersionRef = Arc<ViewVersion>;
+
+/// Alias for the integer type used for view version ids.
+pub type ViewVersionId = i32;
+
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, TypedBuilder)]
+#[serde(from = "ViewVersionV1", into = "ViewVersionV1")]
+#[builder(field_defaults(setter(prefix = "with_")))]
+/// A view versions represents the definition of a view at a specific point in
time.
+pub struct ViewVersion {
+ /// A unique long ID
+ version_id: ViewVersionId,
+ /// ID of the schema for the view version
+ schema_id: SchemaId,
+ /// Timestamp when the version was created (ms from epoch)
+ timestamp_ms: i64,
+ /// A string to string map of summary metadata about the version
+ summary: HashMap<String, String>,
+ /// A list of representations for the view definition.
+ representations: ViewRepresentations,
+ /// Catalog name to use when a reference in the SELECT does not contain a
catalog
+ #[builder(default = None)]
+ default_catalog: Option<String>,
+ /// Namespace to use when a reference in the SELECT is a single identifier
+ default_namespace: NamespaceIdent,
+}
+
+impl ViewVersion {
+ /// Get the version id of this view version.
+ #[inline]
+ pub fn version_id(&self) -> ViewVersionId {
+ self.version_id
+ }
+
+ /// Get the schema id of this view version.
+ #[inline]
+ pub fn schema_id(&self) -> SchemaId {
+ self.schema_id
+ }
+
+ /// Get the timestamp of when the view version was created
+ #[inline]
+ pub fn timestamp(&self) -> Result<DateTime<Utc>> {
+ timestamp_ms_to_utc(self.timestamp_ms)
+ }
+
+ /// Get the timestamp of when the view version was created in milliseconds
since epoch
+ #[inline]
+ pub fn timestamp_ms(&self) -> i64 {
+ self.timestamp_ms
+ }
+
+ /// Get summary of the view version
+ #[inline]
+ pub fn summary(&self) -> &HashMap<String, String> {
+ &self.summary
+ }
+
+ /// Get this views representations
+ #[inline]
+ pub fn representations(&self) -> &ViewRepresentations {
+ &self.representations
+ }
+
+ /// Get the default catalog for this view version
+ #[inline]
+ pub fn default_catalog(&self) -> Option<&String> {
+ self.default_catalog.as_ref()
+ }
+
+ /// Get the default namespace to use when a reference in the SELECT is a
single identifier
+ #[inline]
+ pub fn default_namespace(&self) -> &NamespaceIdent {
+ &self.default_namespace
+ }
+
+ /// Get the schema of this snapshot.
+ pub fn schema(&self, view_metadata: &ViewMetadata) -> Result<SchemaRef> {
+ let r = view_metadata
+ .schema_by_id(self.schema_id())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Schema with id {} not found", self.schema_id()),
+ )
+ })
+ .cloned();
+ r
+ }
+
+ /// Retrieve the history log entry for this view version.
+ #[allow(dead_code)]
+ pub(crate) fn log(&self) -> ViewVersionLog {
+ ViewVersionLog::new(self.version_id, self.timestamp_ms)
+ }
+}
+
+/// A list of view representations.
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
+pub struct ViewRepresentations(pub(crate) Vec<ViewRepresentation>);
+
+impl ViewRepresentations {
+ #[inline]
+ /// Get the number of representations
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ #[inline]
+ /// Check if there are no representations
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// Get an iterator over the representations
+ pub fn iter(&self) -> impl Iterator<Item = &'_ ViewRepresentation> {
+ self.0.iter()
+ }
+}
+
+// Iterator for ViewRepresentations
+impl IntoIterator for ViewRepresentations {
+ type Item = ViewRepresentation;
+ type IntoIter = std::vec::IntoIter<Self::Item>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.0.into_iter()
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(tag = "type")]
+/// View definitions can be represented in multiple ways.
+/// Representations are documented ways to express a view definition.
+// ToDo: Make unique per Dialect
+pub enum ViewRepresentation {
+ #[serde(rename = "sql")]
+ /// The SQL representation stores the view definition as a SQL SELECT,
+ Sql(SqlViewRepresentation),
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// The SQL representation stores the view definition as a SQL SELECT,
+/// with metadata such as the SQL dialect.
+pub struct SqlViewRepresentation {
+ #[serde(rename = "sql")]
+ /// The SQL SELECT statement that defines the view.
+ pub sql: String,
+ #[serde(rename = "dialect")]
+ /// The dialect of the sql SELECT statement (e.g., "trino" or "spark")
+ pub dialect: String,
+}
+
+pub(super) mod _serde {
+ /// This is a helper module that defines types to help with
serialization/deserialization.
+ /// For deserialization the input first gets read into the
[`ViewVersionV1`] struct.
+ /// and then converted into the [Snapshot] struct. Serialization works the
other way around.
+ /// [ViewVersionV1] are internal struct that are only used for
serialization and deserialization.
+ use serde::{Deserialize, Serialize};
+
+ use super::{ViewRepresentation, ViewRepresentations, ViewVersion};
+ use crate::catalog::NamespaceIdent;
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
+ #[serde(rename_all = "kebab-case")]
+ /// Defines the structure of a v1 view version for
serialization/deserialization
+ pub(crate) struct ViewVersionV1 {
+ pub version_id: i32,
+ pub schema_id: i32,
+ pub timestamp_ms: i64,
+ pub summary: std::collections::HashMap<String, String>,
+ pub representations: Vec<ViewRepresentation>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub default_catalog: Option<String>,
+ pub default_namespace: NamespaceIdent,
+ }
+
+ impl From<ViewVersionV1> for ViewVersion {
+ fn from(v1: ViewVersionV1) -> Self {
+ ViewVersion {
+ version_id: v1.version_id,
+ schema_id: v1.schema_id,
+ timestamp_ms: v1.timestamp_ms,
+ summary: v1.summary,
+ representations: ViewRepresentations(v1.representations),
+ default_catalog: v1.default_catalog,
+ default_namespace: v1.default_namespace,
+ }
+ }
+ }
+
+ impl From<ViewVersion> for ViewVersionV1 {
+ fn from(v1: ViewVersion) -> Self {
+ ViewVersionV1 {
+ version_id: v1.version_id,
+ schema_id: v1.schema_id,
+ timestamp_ms: v1.timestamp_ms,
+ summary: v1.summary,
+ representations: v1.representations.0,
+ default_catalog: v1.default_catalog,
+ default_namespace: v1.default_namespace,
+ }
+ }
+ }
+}
+
+impl From<SqlViewRepresentation> for ViewRepresentation {
+ fn from(sql: SqlViewRepresentation) -> Self {
+ ViewRepresentation::Sql(sql)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use chrono::{TimeZone, Utc};
+
+ use crate::spec::view_version::ViewVersion;
+ use crate::spec::view_version::_serde::ViewVersionV1;
+ use crate::spec::ViewRepresentations;
+
+ #[test]
+ fn view_version() {
+ let record = serde_json::json!(
+ {
+ "version-id" : 1,
+ "timestamp-ms" : 1573518431292i64,
+ "schema-id" : 1,
+ "default-catalog" : "prod",
+ "default-namespace" : [ "default" ],
+ "summary" : {
+ "engine-name" : "Spark",
+ "engineVersion" : "3.3.2"
+ },
+ "representations" : [ {
+ "type" : "sql",
+ "sql" : "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect" : "spark"
+ } ]
+ }
+ );
+
+ let result: ViewVersion =
serde_json::from_value::<ViewVersionV1>(record.clone())
+ .unwrap()
+ .into();
+
+ // Roundtrip
+ assert_eq!(serde_json::to_value(result.clone()).unwrap(), record);
+
+ assert_eq!(result.version_id(), 1);
+ assert_eq!(
+ result.timestamp().unwrap(),
+ Utc.timestamp_millis_opt(1573518431292).unwrap()
+ );
+ assert_eq!(result.schema_id(), 1);
+ assert_eq!(result.default_catalog, Some("prod".to_string()));
+ assert_eq!(result.summary(), &{
+ let mut map = std::collections::HashMap::new();
+ map.insert("engine-name".to_string(), "Spark".to_string());
+ map.insert("engineVersion".to_string(), "3.3.2".to_string());
+ map
+ });
+ assert_eq!(
+ result.representations().to_owned(),
+ ViewRepresentations(vec![super::ViewRepresentation::Sql(
+ super::SqlViewRepresentation {
+ sql: "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2"
+ .to_string(),
+ dialect: "spark".to_string(),
+ },
+ )])
+ );
+ assert_eq!(
+ result.default_namespace.inner(),
+ vec!["default".to_string()]
+ );
+ }
+}
diff --git
a/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json
new file mode 100644
index 0000000..c5627b8
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataUnsupportedVersion.json
@@ -0,0 +1,58 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 2,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 1,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
new file mode 100644
index 0000000..4ba94ca
--- /dev/null
+++
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1CurrentVersionNotFound.json
@@ -0,0 +1,58 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 2,
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 1,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
new file mode 100644
index 0000000..c210881
--- /dev/null
+++
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingCurrentVersion.json
@@ -0,0 +1,57 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 1,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json
new file mode 100644
index 0000000..b5b454b
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1MissingSchema.json
@@ -0,0 +1,56 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json
new file mode 100644
index 0000000..0026d22
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1SchemaNotFound.json
@@ -0,0 +1,58 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 2,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file
diff --git a/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json
b/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json
new file mode 100644
index 0000000..5011a80
--- /dev/null
+++ b/crates/iceberg/testdata/view_metadata/ViewMetadataV1Valid.json
@@ -0,0 +1,58 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/warehouse/default.db/event_agg",
+ "current-version-id": 1,
+ "properties": {
+ "comment": "Daily event counts"
+ },
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 1573518431292,
+ "schema-id": 1,
+ "default-catalog": "prod",
+ "default-namespace": [
+ "default"
+ ],
+ "summary": {
+ "engine-name": "Spark",
+ "engineVersion": "3.3.2"
+ },
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "SELECT\n COUNT(1), CAST(event_ts AS DATE)\nFROM
events\nGROUP BY 2",
+ "dialect": "spark"
+ }
+ ]
+ }
+ ],
+ "schemas": [
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "event_count",
+ "required": false,
+ "type": "int",
+ "doc": "Count of events"
+ },
+ {
+ "id": 2,
+ "name": "event_date",
+ "required": false,
+ "type": "date"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 1573518431292,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file