This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e26bda3  feat: Implement load table api. (#89)
e26bda3 is described below

commit e26bda3fc2f6df7dde49a6cf7a5f3faafe49313b
Author: Renjie Liu <[email protected]>
AuthorDate: Wed Nov 8 16:57:02 2023 +0800

    feat: Implement load table api. (#89)
    
    * Init commit
    
    * Some comments
    
    * Done
    
    * Fix format
    
    * fix clippy
    
    * Fix fmt
    
    * sort
    
    * Use error response
    
    * Fix
    
    * Fix comments
    
    * fmt
    
    * fmt
    
    * Fix
    
    * Rename
---
 crates/catalog/rest/Cargo.toml                     |   2 +
 crates/catalog/rest/src/catalog.rs                 | 238 ++++++++++++++-
 .../catalog/rest/testdata/load_table_response.json |  68 +++++
 crates/iceberg/Cargo.toml                          |   1 +
 crates/iceberg/src/catalog/mod.rs                  |  35 ++-
 crates/iceberg/src/io.rs                           |  53 +++-
 crates/iceberg/src/spec/partition.rs               |   3 +
 crates/iceberg/src/spec/schema.rs                  |   4 +-
 crates/iceberg/src/spec/snapshot.rs                |   6 +-
 crates/iceberg/src/spec/sort.rs                    |   3 +
 crates/iceberg/src/spec/table_metadata.rs          | 336 ++++++++++++++-------
 crates/iceberg/src/table.rs                        |  25 +-
 12 files changed, 636 insertions(+), 138 deletions(-)

diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index c44f08b..86bb71c 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -28,6 +28,7 @@ keywords = ["iceberg", "rest", "catalog"]
 
 [dependencies]
 async-trait = "0.1"
+chrono = "0.4"
 iceberg = { path = "../../iceberg" }
 reqwest = { version = "^0.11", features = ["json"] }
 serde = { version = "^1.0", features = ["rc"] }
@@ -35,6 +36,7 @@ serde_derive = "^1.0"
 serde_json = "^1.0"
 typed-builder = "^0.18"
 urlencoding = "2"
+uuid = { version = "1.5.0", features = ["v4"] }
 
 [dev-dependencies]
 mockito = "^1"
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index f267437..599f405 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -26,6 +26,8 @@ use serde::de::DeserializeOwned;
 use typed_builder::TypedBuilder;
 use urlencoding::encode;
 
+use crate::catalog::_serde::LoadTableResponse;
+use iceberg::io::{FileIO, FileIOBuilder};
 use iceberg::table::Table;
 use iceberg::Result;
 use iceberg::{
@@ -33,8 +35,8 @@ use iceberg::{
 };
 
 use self::_serde::{
-    CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, 
ListTableResponse,
-    NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
+    CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, 
NamespaceSerde,
+    RenameTableRequest, NO_CONTENT, OK,
 };
 
 const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
@@ -195,7 +197,7 @@ impl Catalog for RestCatalog {
 
         let resp = self
             .client
-            .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
+            .query::<ListNamespaceResponse, ErrorResponse, 
OK>(request.build()?)
             .await?;
 
         resp.namespaces
@@ -222,7 +224,7 @@ impl Catalog for RestCatalog {
 
         let resp = self
             .client
-            .query::<NamespaceSerde, ErrorModel, OK>(request)
+            .query::<NamespaceSerde, ErrorResponse, OK>(request)
             .await?;
 
         Namespace::try_from(resp)
@@ -238,7 +240,7 @@ impl Catalog for RestCatalog {
 
         let resp = self
             .client
-            .query::<NamespaceSerde, ErrorModel, OK>(request)
+            .query::<NamespaceSerde, ErrorResponse, OK>(request)
             .await?;
         Namespace::try_from(resp)
     }
@@ -267,7 +269,7 @@ impl Catalog for RestCatalog {
             .build()?;
 
         self.client
-            .execute::<ErrorModel, NO_CONTENT>(request)
+            .execute::<ErrorResponse, NO_CONTENT>(request)
             .await
             .map(|_| true)
     }
@@ -280,7 +282,9 @@ impl Catalog for RestCatalog {
             .delete(self.config.namespace_endpoint(namespace))
             .build()?;
 
-        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+        self.client
+            .execute::<ErrorResponse, NO_CONTENT>(request)
+            .await
     }
 
     /// List tables from namespace.
@@ -293,7 +297,7 @@ impl Catalog for RestCatalog {
 
         let resp = self
             .client
-            .query::<ListTableResponse, ErrorModel, OK>(request)
+            .query::<ListTableResponse, ErrorResponse, OK>(request)
             .await?;
 
         Ok(resp.identifiers)
@@ -312,11 +316,43 @@ impl Catalog for RestCatalog {
     }
 
     /// Load table from the catalog.
-    async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
-        Err(Error::new(
-            ErrorKind::FeatureUnsupported,
-            "Creating table not supported yet!",
-        ))
+    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
+        let request = self
+            .client
+            .0
+            .get(self.config.table_endpoint(table))
+            .build()?;
+
+        let resp = self
+            .client
+            .query::<LoadTableResponse, ErrorResponse, OK>(request)
+            .await?;
+
+        let mut props = self.config.props.clone();
+        if let Some(config) = resp.config {
+            props.extend(config);
+        }
+
+        let file_io = match self
+            .config
+            .warehouse
+            .as_ref()
+            .or_else(|| resp.metadata_location.as_ref())
+        {
+            Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
+            None => FileIOBuilder::new("s3").with_props(props).build()?,
+        };
+
+        let table_builder = Table::builder()
+            .identifier(table.clone())
+            .file_io(file_io)
+            .metadata(resp.metadata);
+
+        if let Some(metadata_location) = resp.metadata_location {
+            Ok(table_builder.metadata_location(metadata_location).build())
+        } else {
+            Ok(table_builder.build())
+        }
     }
 
     /// Drop a table from the catalog.
@@ -327,7 +363,9 @@ impl Catalog for RestCatalog {
             .delete(self.config.table_endpoint(table))
             .build()?;
 
-        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+        self.client
+            .execute::<ErrorResponse, NO_CONTENT>(request)
+            .await
     }
 
     /// Check if a table exists in the catalog.
@@ -339,7 +377,7 @@ impl Catalog for RestCatalog {
             .build()?;
 
         self.client
-            .execute::<ErrorModel, NO_CONTENT>(request)
+            .execute::<ErrorResponse, NO_CONTENT>(request)
             .await
             .map(|_| true)
     }
@@ -356,7 +394,9 @@ impl Catalog for RestCatalog {
             })
             .build()?;
 
-        self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+        self.client
+            .execute::<ErrorResponse, NO_CONTENT>(request)
+            .await
     }
 
     /// Update a table to the catalog.
@@ -412,10 +452,12 @@ mod _serde {
 
     use serde_derive::{Deserialize, Serialize};
 
+    use iceberg::spec::TableMetadata;
     use iceberg::{Error, ErrorKind, Namespace, TableIdent};
 
     pub(super) const OK: u16 = 200u16;
     pub(super) const NO_CONTENT: u16 = 204u16;
+
     #[derive(Clone, Debug, Serialize, Deserialize)]
     pub(super) struct CatalogConfig {
         pub(super) overrides: HashMap<String, String>,
@@ -534,11 +576,26 @@ mod _serde {
         pub(super) source: TableIdent,
         pub(super) destination: TableIdent,
     }
+
+    #[derive(Debug, Deserialize)]
+    #[serde(rename_all = "kebab-case")]
+    pub(super) struct LoadTableResponse {
+        pub(super) metadata_location: Option<String>,
+        pub(super) metadata: TableMetadata,
+        pub(super) config: Option<HashMap<String, String>>,
+    }
 }
 
 #[cfg(test)]
 mod tests {
+    use iceberg::spec::ManifestListLocation::ManifestListFile;
+    use iceberg::spec::{
+        FormatVersion, NestedField, Operation, PrimitiveType, Schema, 
Snapshot, SnapshotLog,
+        SortOrder, Summary, Type,
+    };
     use mockito::{Mock, Server, ServerGuard};
+    use std::sync::Arc;
+    use uuid::uuid;
 
     use super::*;
 
@@ -884,4 +941,153 @@ mod tests {
         config_mock.assert_async().await;
         rename_table_mock.assert_async().await;
     }
+
+    #[tokio::test]
+    async fn test_load_table() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let rename_table_mock = server
+            .mock("GET", "/v1/namespaces/ns1/tables/test1")
+            .with_status(200)
+            .with_body_from_file(format!(
+                "{}/testdata/{}",
+                env!("CARGO_MANIFEST_DIR"),
+                "load_table_response.json"
+            ))
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let table = catalog
+            .load_table(&TableIdent::new(
+                NamespaceIdent::new("ns1".to_string()),
+                "test1".to_string(),
+            ))
+            .await
+            .unwrap();
+
+        assert_eq!(
+            &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
+            table.identifier()
+        );
+        
assert_eq!("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
 table.metadata_location().unwrap());
+        assert_eq!(FormatVersion::V1, table.metadata().format_version());
+        assert_eq!("s3://warehouse/database/table", 
table.metadata().location());
+        assert_eq!(
+            uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
+            table.metadata().uuid()
+        );
+        assert_eq!(1646787054459, table.metadata().last_updated_ms());
+        assert_eq!(
+            vec![&Arc::new(
+                Schema::builder()
+                    .with_fields(vec![
+                        NestedField::optional(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                        NestedField::optional(2, "data", 
Type::Primitive(PrimitiveType::String))
+                            .into(),
+                    ])
+                    .build()
+                    .unwrap()
+            )],
+            table.metadata().schemas_iter().collect::<Vec<_>>()
+        );
+        assert_eq!(
+            &HashMap::from([
+                ("owner".to_string(), "bryan".to_string()),
+                (
+                    "write.metadata.compression-codec".to_string(),
+                    "gzip".to_string()
+                )
+            ]),
+            table.metadata().properties()
+        );
+        assert_eq!(vec![&Arc::new(Snapshot::builder()
+            .with_snapshot_id(3497810964824022504)
+            .with_timestamp_ms(1646787054459)
+            
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
+            .with_sequence_number(0)
+            .with_schema_id(0)
+            .with_summary(Summary {
+                operation: Operation::Append,
+                other: HashMap::from_iter([
+                                          ("spark.app.id", 
"local-1646787004168"),
+                                          ("added-data-files", "1"),
+                                          ("added-records", "1"),
+                                          ("added-files-size", "697"),
+                                          ("changed-partition-count", "1"),
+                                          ("total-records", "1"),
+                                          ("total-files-size", "697"),
+                                          ("total-data-files", "1"),
+                                          ("total-delete-files", "0"),
+                                          ("total-position-deletes", "0"),
+                                          ("total-equality-deletes", "0")
+                ].iter().map(|p|(p.0.to_string(), p.1.to_string())))
+            }).build().unwrap()
+        )], table.metadata().snapshots().collect::<Vec<_>>());
+        assert_eq!(
+            &[SnapshotLog {
+                timestamp_ms: 1646787054459,
+                snapshot_id: 3497810964824022504
+            }],
+            table.metadata().history()
+        );
+        assert_eq!(
+            vec![&Arc::new(SortOrder {
+                order_id: 0,
+                fields: vec![]
+            })],
+            table.metadata().sort_orders_iter().collect::<Vec<_>>()
+        );
+
+        config_mock.assert_async().await;
+        rename_table_mock.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_load_table_404() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let rename_table_mock = server
+            .mock("GET", "/v1/namespaces/ns1/tables/test1")
+            .with_status(404)
+            .with_body(r#"
+{
+    "error": {
+        "message": "Table does not exist: ns1.test1 in warehouse 
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+        "type": "NoSuchNamespaceErrorException",
+        "code": 404
+    }
+}
+            "#)
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+            .await
+            .unwrap();
+
+        let table = catalog
+            .load_table(&TableIdent::new(
+                NamespaceIdent::new("ns1".to_string()),
+                "test1".to_string(),
+            ))
+            .await;
+
+        assert!(table.is_err());
+        assert!(table
+            .err()
+            .unwrap()
+            .message()
+            .contains("Table does not exist"));
+
+        config_mock.assert_async().await;
+        rename_table_mock.assert_async().await;
+    }
 }
diff --git a/crates/catalog/rest/testdata/load_table_response.json 
b/crates/catalog/rest/testdata/load_table_response.json
new file mode 100644
index 0000000..012f0e9
--- /dev/null
+++ b/crates/catalog/rest/testdata/load_table_response.json
@@ -0,0 +1,68 @@
+{
+  "metadata-location": 
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
+  "metadata": {
+    "format-version": 1,
+    "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
+    "location": "s3://warehouse/database/table",
+    "last-updated-ms": 1646787054459,
+    "last-column-id": 2,
+    "schema": {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {"id": 1, "name": "id", "required": false, "type": "int"},
+        {"id": 2, "name": "data", "required": false, "type": "string"}
+      ]
+    },
+    "current-schema-id": 0,
+    "schemas": [
+      {
+        "type": "struct",
+        "schema-id": 0,
+        "fields": [
+          {"id": 1, "name": "id", "required": false, "type": "int"},
+          {"id": 2, "name": "data", "required": false, "type": "string"}
+        ]
+      }
+    ],
+    "partition-spec": [],
+    "default-spec-id": 0,
+    "partition-specs": [{"spec-id": 0, "fields": []}],
+    "last-partition-id": 999,
+    "default-sort-order-id": 0,
+    "sort-orders": [{"order-id": 0, "fields": []}],
+    "properties": {"owner": "bryan", "write.metadata.compression-codec": 
"gzip"},
+    "current-snapshot-id": 3497810964824022504,
+    "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}},
+    "snapshots": [
+      {
+        "snapshot-id": 3497810964824022504,
+        "timestamp-ms": 1646787054459,
+        "summary": {
+          "operation": "append",
+          "spark.app.id": "local-1646787004168",
+          "added-data-files": "1",
+          "added-records": "1",
+          "added-files-size": "697",
+          "changed-partition-count": "1",
+          "total-records": "1",
+          "total-files-size": "697",
+          "total-data-files": "1",
+          "total-delete-files": "0",
+          "total-position-deletes": "0",
+          "total-equality-deletes": "0"
+        },
+        "manifest-list": 
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+        "schema-id": 0
+      }
+    ],
+    "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 
3497810964824022504}],
+    "metadata-log": [
+      {
+        "timestamp-ms": 1646787031514,
+        "metadata-file": 
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json"
+      }
+    ]
+  },
+  "config": {"client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"}
+}
\ No newline at end of file
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index c7caa24..871304c 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -53,6 +53,7 @@ serde_bytes = "0.11.8"
 serde_derive = "^1.0"
 serde_json = "^1.0"
 serde_repr = "0.1.16"
+typed-builder = "^0.17"
 url = "2"
 urlencoding = "2"
 uuid = "1.4.1"
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index d58eaa2..3a582a1 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -95,7 +95,6 @@ pub trait Catalog {
 /// The namespace identifier is a list of strings, where each string is a
 /// component of the namespace. It's catalog implementer's responsibility to
 /// handle the namespace identifier correctly.
-
 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
 pub struct NamespaceIdent(Vec<String>);
 
@@ -116,6 +115,11 @@ impl NamespaceIdent {
         Ok(Self(names))
     }
 
+    /// Try to create namespace identifier from an iterator of string.
+    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> 
Result<Self> {
+        Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
+    }
+
     /// Returns url encoded format.
     pub fn encode_in_url(&self) -> String {
         encode(&self.as_ref().join("\u{1F}")).to_string()
@@ -194,6 +198,20 @@ impl TableIdent {
     pub fn name(&self) -> &str {
         &self.name
     }
+
+    /// Try to create table identifier from an iterator of string.
+    pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) -> 
Result<Self> {
+        let mut vec: Vec<String> = iter.into_iter().map(|s| 
s.to_string()).collect();
+        let table_name = vec.pop().ok_or_else(|| {
+            Error::new(ErrorKind::DataInvalid, "Table identifier can't be 
empty!")
+        })?;
+        let namespace_ident = NamespaceIdent::from_vec(vec)?;
+
+        Ok(Self {
+            namespace: namespace_ident,
+            name: table_name,
+        })
+    }
 }
 
 /// TableCreation represents the creation of a table in the catalog.
@@ -256,3 +274,18 @@ pub enum TableRequirement {
 ///
 /// TODO: we should fill with UpgradeFormatVersionUpdate, AddSchemaUpdate and 
so on.
 pub enum TableUpdate {}
+
+#[cfg(test)]
+mod tests {
+    use crate::{NamespaceIdent, TableIdent};
+
+    #[test]
+    fn test_create_table_id() {
+        let table_id = TableIdent {
+            namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
+            name: "t1".to_string(),
+        };
+
+        assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", 
"t1"]).unwrap());
+    }
+}
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 9d55aac..2444fda 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -29,6 +29,16 @@
 //!     .unwrap();
 //! ```
 //!
+//! Or you can pass a path to ask `FileIO` to infer schema for you:
+//! ```rust
+//! use iceberg::io::{FileIO, S3_REGION};
+//! let file_io = FileIO::from_path("s3://bucket/a")
+//!     .unwrap()
+//!     .with_prop(S3_REGION, "us-east-1")
+//!     .build()
+//!     .unwrap();
+//! ```
+//!
 //! # How to use `FileIO`
 //!
 //! Currently `FileIO` provides simple methods for file operations:
@@ -68,6 +78,7 @@ static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static 
str>> = Lazy::new(
 });
 
 const DEFAULT_ROOT_PATH: &str = "/";
+
 /// FileIO implementation, used to manipulate files in underlying storage.
 ///
 /// # Note
@@ -132,6 +143,29 @@ impl FileIOBuilder {
 }
 
 impl FileIO {
+    /// Try to infer file io scheme from path.
+    ///
+    /// If it's a valid url, for example http://example.org, url scheme will 
be used.
+    /// If it's not a valid url, will try to detect if it's a file path.
+    ///
+    /// Otherwise will return parsing error.
+    pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+        let url = Url::parse(path.as_ref())
+            .map_err(Error::from)
+            .or_else(|e| {
+                Url::from_file_path(path.as_ref()).map_err(|_| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Input is neither a valid url nor path",
+                    )
+                    .with_context("input", path.as_ref().to_string())
+                    .with_source(e)
+                })
+            })?;
+
+        Ok(FileIOBuilder::new(url.scheme()))
+    }
+
     /// Deletes file.
     pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
         let (op, relative_path) = self.inner.create_operator(&path)?;
@@ -351,7 +385,6 @@ impl Storage {
 
 #[cfg(test)]
 mod tests {
-
     use std::io::Write;
 
     use std::{fs::File, path::Path};
@@ -453,4 +486,22 @@ mod tests {
 
         assert_eq!(content, &read_content);
     }
+
+    #[test]
+    fn test_create_file_from_path() {
+        let io = FileIO::from_path("/tmp/a").unwrap();
+        assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+        let io = FileIO::from_path("file:/tmp/b").unwrap();
+        assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+        let io = FileIO::from_path("file:///tmp/c").unwrap();
+        assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+        let io = FileIO::from_path("s3://bucket/a").unwrap();
+        assert_eq!("s3", io.scheme_str.unwrap().as_str());
+
+        let io = FileIO::from_path("tmp/||c");
+        assert!(io.is_err());
+    }
 }
diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
index 505f248..c5ea8f3 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -19,9 +19,12 @@
  * Partitioning
 */
 use serde::{Deserialize, Serialize};
+use std::sync::Arc;
 
 use super::transform::Transform;
 
+/// Reference to [`PartitionSpec`].
+pub type PartitionSpecRef = Arc<PartitionSpec>;
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
 #[serde(rename_all = "kebab-case")]
 /// Partition fields capture the transform from table data to partition values.
diff --git a/crates/iceberg/src/spec/schema.rs 
b/crates/iceberg/src/spec/schema.rs
index cef2dcc..d2204be 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -28,10 +28,12 @@ use itertools::Itertools;
 use serde::{Deserialize, Serialize};
 use std::collections::{HashMap, HashSet};
 use std::fmt::{Display, Formatter};
-use std::sync::OnceLock;
+use std::sync::{Arc, OnceLock};
 
 use _serde::SchemaEnum;
 
+/// Reference to [`Schema`].
+pub type SchemaRef = Arc<Schema>;
 const DEFAULT_SCHEMA_ID: i32 = 0;
 
 /// Defines schema in iceberg.
diff --git a/crates/iceberg/src/spec/snapshot.rs 
b/crates/iceberg/src/spec/snapshot.rs
index d4e941b..a04bb99 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -18,12 +18,14 @@
 /*!
  * Snapshots
 */
-use std::collections::HashMap;
-
 use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::sync::Arc;
 
 use super::table_metadata::SnapshotLog;
 
+/// Reference to [`Snapshot`].
+pub type SnapshotRef = Arc<Snapshot>;
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
 #[serde(rename_all = "lowercase")]
 /// The operation field is used by some operations, like snapshot expiration, 
to skip processing certain snapshots.
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
index 357e68f..dcb19d3 100644
--- a/crates/iceberg/src/spec/sort.rs
+++ b/crates/iceberg/src/spec/sort.rs
@@ -19,9 +19,12 @@
  * Sorting
 */
 use serde::{Deserialize, Serialize};
+use std::sync::Arc;
 
 use super::transform::Transform;
 
+/// Reference to [`SortOrder`].
+pub type SortOrderRef = Arc<SortOrder>;
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
 /// Sort direction in a partition, either ascending or descending
 pub enum SortDirection {
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index ccd58f2..6abe959 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -20,19 +20,14 @@ Defines the [table 
metadata](https://iceberg.apache.org/spec/#table-metadata).
 The main struct here is [TableMetadataV2] which defines the data for a table.
 */
 
-use std::{collections::HashMap, sync::Arc};
-
 use serde::{Deserialize, Serialize};
 use serde_repr::{Deserialize_repr, Serialize_repr};
+use std::{collections::HashMap, sync::Arc};
 use uuid::Uuid;
 
-use crate::{Error, ErrorKind};
-
 use super::{
-    partition::PartitionSpec,
-    schema::Schema,
     snapshot::{Snapshot, SnapshotReference, SnapshotRetention},
-    sort::SortOrder,
+    PartitionSpecRef, SchemaRef, SnapshotRef, SortOrderRef,
 };
 
 use _serde::TableMetadataEnum;
@@ -44,6 +39,9 @@ static DEFAULT_SORT_ORDER_ID: i64 = 0;
 #[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
 #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
 /// Fields for the version 2 of the table metadata.
+///
+/// We assume that this data structure is always valid, so we will panic when 
invalid error happens.
+/// We check the validity of this data structure when constructing.
 pub struct TableMetadata {
     /// Integer Version for the format.
     format_version: FormatVersion,
@@ -58,11 +56,11 @@ pub struct TableMetadata {
     /// An integer; the highest assigned column ID for the table.
     last_column_id: i32,
     /// A list of schemas, stored as objects with schema-id.
-    schemas: HashMap<i32, Arc<Schema>>,
+    schemas: HashMap<i32, SchemaRef>,
     /// ID of the table’s current schema.
     current_schema_id: i32,
     /// A list of partition specs, stored as full partition spec objects.
-    partition_specs: HashMap<i32, PartitionSpec>,
+    partition_specs: HashMap<i32, PartitionSpecRef>,
     /// ID of the “current” spec that writers should use by default.
     default_spec_id: i32,
     /// An integer; the highest assigned partition field ID across all 
partition specs for the table.
@@ -78,7 +76,7 @@ pub struct TableMetadata {
     /// data files exist in the file system. A data file must not be deleted
     /// from the file system until the last snapshot in which it was listed is
     /// garbage collected.
-    snapshots: Option<HashMap<i64, Arc<Snapshot>>>,
+    snapshots: HashMap<i64, SnapshotRef>,
     /// A list (optional) of timestamp and snapshot ID pairs that encodes 
changes
     /// to the current snapshot for the table. Each time the 
current-snapshot-id
     /// is changed, a new entry should be added with the last-updated-ms
@@ -96,7 +94,7 @@ pub struct TableMetadata {
     metadata_log: Vec<MetadataLog>,
 
     /// A list of sort orders, stored as full sort order objects.
-    sort_orders: HashMap<i64, SortOrder>,
+    sort_orders: HashMap<i64, SortOrderRef>,
     /// Default sort order id of the table. Note that this could be used by
     /// writers, but is not used when reading because reads use the specs
     /// stored in manifest files.
@@ -109,52 +107,141 @@ pub struct TableMetadata {
 }
 
 impl TableMetadata {
+    /// Returns format version of this metadata.
+    #[inline]
+    pub fn format_version(&self) -> FormatVersion {
+        self.format_version
+    }
+
+    /// Returns uuid of current table.
+    #[inline]
+    pub fn uuid(&self) -> Uuid {
+        self.table_uuid
+    }
+
+    /// Returns table location.
+    #[inline]
+    pub fn location(&self) -> &str {
+        self.location.as_str()
+    }
+
+    /// Returns last sequence number.
+    #[inline]
+    pub fn last_sequence_number(&self) -> i64 {
+        self.last_sequence_number
+    }
+
+    /// Returns last updated time.
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.last_updated_ms
+    }
+
+    /// Returns schemas
+    #[inline]
+    pub fn schemas_iter(&self) -> impl Iterator<Item = &SchemaRef> {
+        self.schemas.values()
+    }
+
+    /// Lookup schema by id.
+    #[inline]
+    pub fn schema_by_id(&self, schema_id: i32) -> Option<&SchemaRef> {
+        self.schemas.get(&schema_id)
+    }
+
     /// Get current schema
     #[inline]
-    pub fn current_schema(&self) -> Result<Arc<Schema>, Error> {
-        self.schemas
-            .get(&self.current_schema_id)
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    format!("Schema id {} not found!", self.current_schema_id),
-                )
-            })
-            .cloned()
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.schema_by_id(self.current_schema_id)
+            .expect("Current schema id set, but not found in table metadata")
     }
+
+    /// Returns all partition specs.
+    #[inline]
+    pub fn partition_specs_iter(&self) -> impl Iterator<Item = 
&PartitionSpecRef> {
+        self.partition_specs.values()
+    }
+
+    /// Lookup partition spec by id.
+    #[inline]
+    pub fn partition_spec_by_id(&self, spec_id: i32) -> 
Option<&PartitionSpecRef> {
+        self.partition_specs.get(&spec_id)
+    }
+
     /// Get default partition spec
     #[inline]
-    pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
-        self.partition_specs
-            .get(&self.default_spec_id)
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    format!("Partition spec id {} not found!", 
self.default_spec_id),
-                )
-            })
+    pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> {
+        if self.default_spec_id == DEFAULT_SPEC_ID {
+            self.partition_spec_by_id(DEFAULT_SPEC_ID)
+        } else {
+            Some(
+                self.partition_spec_by_id(DEFAULT_SPEC_ID)
+                    .expect("Default partition spec id set, but not found in 
table metadata"),
+            )
+        }
+    }
+
+    /// Returns all snapshots
+    #[inline]
+    pub fn snapshots(&self) -> impl Iterator<Item = &SnapshotRef> {
+        self.snapshots.values()
+    }
+
+    /// Lookup snapshot by id.
+    #[inline]
+    pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
+        self.snapshots.get(&snapshot_id)
+    }
+
+    /// Returns snapshot history.
+    #[inline]
+    pub fn history(&self) -> &[SnapshotLog] {
+        &self.snapshot_log
     }
 
     /// Get current snapshot
     #[inline]
-    pub fn current_snapshot(&self) -> Result<Option<Arc<Snapshot>>, Error> {
-        match (&self.current_snapshot_id, &self.snapshots) {
-            (Some(snapshot_id), Some(snapshots)) => 
Ok(snapshots.get(snapshot_id).cloned()),
-            (Some(-1), None) => Ok(None),
-            (None, None) => Ok(None),
-            (Some(_), None) => Err(Error::new(
-                ErrorKind::DataInvalid,
-                "Snapshot id is provided but there are no 
snapshots".to_string(),
-            )),
-            (None, Some(_)) => Err(Error::new(
-                ErrorKind::DataInvalid,
-                "There are snapshots but no snapshot id is 
provided".to_string(),
-            )),
+    pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
+        self.current_snapshot_id.map(|s| {
+            self.snapshot_by_id(s)
+                .expect("Current snapshot id has been set, but doesn't exist 
in metadata")
+        })
+    }
+
+    /// Return all sort orders.
+    #[inline]
+    pub fn sort_orders_iter(&self) -> impl Iterator<Item = &SortOrderRef> {
+        self.sort_orders.values()
+    }
+
+    /// Lookup sort order by id.
+    #[inline]
+    pub fn sort_order_by_id(&self, sort_order_id: i64) -> 
Option<&SortOrderRef> {
+        self.sort_orders.get(&sort_order_id)
+    }
+
+    /// Returns default sort order id.
+    #[inline]
+    pub fn default_sort_order(&self) -> Option<&SortOrderRef> {
+        if self.default_sort_order_id == DEFAULT_SORT_ORDER_ID {
+            self.sort_orders.get(&DEFAULT_SORT_ORDER_ID)
+        } else {
+            Some(
+                self.sort_orders
+                    .get(&self.default_sort_order_id)
+                    .expect("Default order id has been set, but not found in 
table metadata!"),
+            )
         }
     }
 
+    /// Returns properties of table.
+    #[inline]
+    pub fn properties(&self) -> &HashMap<String, String> {
+        &self.properties
+    }
+
     /// Append snapshot to table
-    pub fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error> 
{
+    pub fn append_snapshot(&mut self, snapshot: Snapshot) {
         self.last_updated_ms = snapshot.timestamp();
         self.last_sequence_number = snapshot.sequence_number();
 
@@ -174,25 +261,9 @@ impl TableMetadata {
                 )
             });
 
-        if let Some(snapshots) = &mut self.snapshots {
-            self.snapshot_log.push(snapshot.log());
-            snapshots.insert(snapshot.snapshot_id(), Arc::new(snapshot));
-        } else {
-            if !self.snapshot_log.is_empty() {
-                return Err(Error::new(
-                    ErrorKind::DataInvalid,
-                    "Snapshot logs is empty while snapshots is not!",
-                ));
-            }
-
-            self.snapshot_log = vec![snapshot.log()];
-            self.snapshots = Some(HashMap::from_iter(vec![(
-                snapshot.snapshot_id(),
-                Arc::new(snapshot),
-            )]));
-        }
-
-        Ok(())
+        self.snapshot_log.push(snapshot.log());
+        self.snapshots
+            .insert(snapshot.snapshot_id(), Arc::new(snapshot));
     }
 }
 
@@ -207,6 +278,7 @@ pub(super) mod _serde {
     use serde::{Deserialize, Serialize};
     use uuid::Uuid;
 
+    use crate::spec::Snapshot;
     use crate::{
         spec::{
             schema::_serde::{SchemaV1, SchemaV2},
@@ -376,23 +448,32 @@ pub(super) mod _serde {
                 }?,
                 schemas,
                 partition_specs: HashMap::from_iter(
-                    value.partition_specs.into_iter().map(|x| (x.spec_id, x)),
+                    value
+                        .partition_specs
+                        .into_iter()
+                        .map(|x| (x.spec_id, Arc::new(x))),
                 ),
                 default_spec_id: value.default_spec_id,
                 last_partition_id: value.last_partition_id,
                 properties: value.properties.unwrap_or_default(),
                 current_snapshot_id,
-                snapshots: value.snapshots.map(|snapshots| {
-                    HashMap::from_iter(
-                        snapshots
-                            .into_iter()
-                            .map(|x| (x.snapshot_id, Arc::new(x.into()))),
-                    )
-                }),
+                snapshots: value
+                    .snapshots
+                    .map(|snapshots| {
+                        HashMap::from_iter(
+                            snapshots
+                                .into_iter()
+                                .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+                        )
+                    })
+                    .unwrap_or_default(),
                 snapshot_log: value.snapshot_log.unwrap_or_default(),
                 metadata_log: value.metadata_log.unwrap_or_default(),
                 sort_orders: HashMap::from_iter(
-                    value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+                    value
+                        .sort_orders
+                        .into_iter()
+                        .map(|x| (x.order_id, Arc::new(x))),
                 ),
                 default_sort_order_id: value.default_sort_order_id,
                 refs: value.refs.unwrap_or_else(|| {
@@ -453,7 +534,7 @@ pub(super) mod _serde {
                         }]
                     })
                     .into_iter()
-                    .map(|x| (x.spec_id, x)),
+                    .map(|x| (x.spec_id, Arc::new(x))),
             );
             Ok(TableMetadata {
                 format_version: FormatVersion::V1,
@@ -490,13 +571,14 @@ pub(super) mod _serde {
                                 .collect::<Result<Vec<_>, Error>>()?,
                         ))
                     })
-                    .transpose()?,
+                    .transpose()?
+                    .unwrap_or_default(),
                 snapshot_log: value.snapshot_log.unwrap_or_default(),
                 metadata_log: value.metadata_log.unwrap_or_default(),
                 sort_orders: match value.sort_orders {
-                    Some(sort_orders) => {
-                        HashMap::from_iter(sort_orders.into_iter().map(|x| 
(x.order_id, x)))
-                    }
+                    Some(sort_orders) => HashMap::from_iter(
+                        sort_orders.into_iter().map(|x| (x.order_id, 
Arc::new(x))),
+                    ),
                     None => HashMap::new(),
                 },
                 default_sort_order_id: 
value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID),
@@ -534,7 +616,11 @@ pub(super) mod _serde {
                     })
                     .collect(),
                 current_schema_id: v.current_schema_id,
-                partition_specs: v.partition_specs.into_values().collect(),
+                partition_specs: v
+                    .partition_specs
+                    .into_values()
+                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| 
s.as_ref().clone()))
+                    .collect(),
                 default_spec_id: v.default_spec_id,
                 last_partition_id: v.last_partition_id,
                 properties: if v.properties.is_empty() {
@@ -543,16 +629,20 @@ pub(super) mod _serde {
                     Some(v.properties)
                 },
                 current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
-                snapshots: v.snapshots.map(|snapshots| {
-                    snapshots
-                        .into_values()
-                        .map(|x| {
-                            Arc::try_unwrap(x)
-                                .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
-                                .into()
-                        })
-                        .collect()
-                }),
+                snapshots: if v.snapshots.is_empty() {
+                    None
+                } else {
+                    Some(
+                        v.snapshots
+                            .into_values()
+                            .map(|x| {
+                                Arc::try_unwrap(x)
+                                    .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
+                                    .into()
+                            })
+                            .collect(),
+                    )
+                },
                 snapshot_log: if v.snapshot_log.is_empty() {
                     None
                 } else {
@@ -563,7 +653,11 @@ pub(super) mod _serde {
                 } else {
                     Some(v.metadata_log)
                 },
-                sort_orders: v.sort_orders.into_values().collect(),
+                sort_orders: v
+                    .sort_orders
+                    .into_values()
+                    .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| 
s.as_ref().clone()))
+                    .collect(),
                 default_sort_order_id: v.default_sort_order_id,
                 refs: Some(v.refs),
             }
@@ -601,7 +695,12 @@ pub(super) mod _serde {
                     .get(&v.default_spec_id)
                     .map(|x| x.fields.clone())
                     .unwrap_or_default(),
-                partition_specs: 
Some(v.partition_specs.into_values().collect()),
+                partition_specs: Some(
+                    v.partition_specs
+                        .into_values()
+                        .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| 
s.as_ref().clone()))
+                        .collect(),
+                ),
                 default_spec_id: Some(v.default_spec_id),
                 last_partition_id: Some(v.last_partition_id),
                 properties: if v.properties.is_empty() {
@@ -610,16 +709,16 @@ pub(super) mod _serde {
                     Some(v.properties)
                 },
                 current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
-                snapshots: v.snapshots.map(|snapshots| {
-                    snapshots
-                        .into_values()
-                        .map(|x| {
-                            Arc::try_unwrap(x)
-                                .unwrap_or_else(|snapshot| 
snapshot.as_ref().clone())
-                                .into()
-                        })
-                        .collect()
-                }),
+                snapshots: if v.snapshots.is_empty() {
+                    None
+                } else {
+                    Some(
+                        v.snapshots
+                            .into_values()
+                            .map(|x| Snapshot::clone(&x).into())
+                            .collect(),
+                    )
+                },
                 snapshot_log: if v.snapshot_log.is_empty() {
                     None
                 } else {
@@ -630,14 +729,19 @@ pub(super) mod _serde {
                 } else {
                     Some(v.metadata_log)
                 },
-                sort_orders: Some(v.sort_orders.into_values().collect()),
+                sort_orders: Some(
+                    v.sort_orders
+                        .into_values()
+                        .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s| 
s.as_ref().clone()))
+                        .collect(),
+                ),
                 default_sort_order_id: Some(v.default_sort_order_id),
             }
         }
     }
 }
 
-#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
 #[repr(u8)]
 /// Iceberg format version
 pub enum FormatVersion {
@@ -778,12 +882,12 @@ mod tests {
             last_column_id: 1,
             schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
             current_schema_id: 1,
-            partition_specs: HashMap::from_iter(vec![(1, partition_spec)]),
+            partition_specs: HashMap::from_iter(vec![(1, 
partition_spec.into())]),
             default_spec_id: 1,
             last_partition_id: 1000,
             default_sort_order_id: 0,
             sort_orders: HashMap::from_iter(vec![]),
-            snapshots: None,
+            snapshots: HashMap::default(),
             current_snapshot_id: None,
             last_sequence_number: 1,
             properties: HashMap::from_iter(vec![(
@@ -948,12 +1052,12 @@ mod tests {
             last_column_id: 5,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
             default_spec_id: 0,
             last_partition_id: 1000,
             default_sort_order_id: 0,
-            sort_orders: HashMap::from_iter(vec![(0, sort_order)]),
-            snapshots: Some(HashMap::from_iter(vec![(638933773299822130, 
Arc::new(snapshot))])),
+            sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
+            snapshots: HashMap::from_iter(vec![(638933773299822130, 
Arc::new(snapshot))]),
             current_snapshot_id: Some(638933773299822130),
             last_sequence_number: 0,
             properties: 
HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]),
@@ -1093,15 +1197,15 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, 
Arc::new(schema2))]),
             current_schema_id: 1,
-            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
             default_spec_id: 0,
             last_partition_id: 1000,
             default_sort_order_id: 3,
-            sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
-            snapshots: Some(HashMap::from_iter(vec![
+            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+            snapshots: HashMap::from_iter(vec![
                 (3051729675574597004, Arc::new(snapshot1)),
                 (3055729675574597004, Arc::new(snapshot2)),
-            ])),
+            ]),
             current_snapshot_id: Some(3055729675574597004),
             last_sequence_number: 34,
             properties: HashMap::new(),
@@ -1194,12 +1298,12 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
             default_spec_id: 0,
             last_partition_id: 1000,
             default_sort_order_id: 3,
-            sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
-            snapshots: None,
+            sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+            snapshots: HashMap::default(),
             current_snapshot_id: None,
             last_sequence_number: 34,
             properties: HashMap::new(),
@@ -1256,12 +1360,12 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
             default_spec_id: 0,
             last_partition_id: 0,
             default_sort_order_id: 0,
             sort_orders: HashMap::new(),
-            snapshots: Some(HashMap::new()),
+            snapshots: HashMap::new(),
             current_snapshot_id: None,
             last_sequence_number: 0,
             properties: HashMap::new(),
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index ebe6753..981ac8b 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -17,10 +17,33 @@
 
 //! Table API for Apache Iceberg
 
+use crate::io::FileIO;
 use crate::spec::TableMetadata;
+use crate::TableIdent;
+use typed_builder::TypedBuilder;
 
 /// Table represents a table in the catalog.
+#[derive(TypedBuilder)]
 pub struct Table {
-    metadata_location: String,
+    file_io: FileIO,
+    #[builder(default, setter(strip_option))]
+    metadata_location: Option<String>,
     metadata: TableMetadata,
+    identifier: TableIdent,
+}
+
+impl Table {
+    /// Returns table identifier.
+    pub fn identifier(&self) -> &TableIdent {
+        &self.identifier
+    }
+    /// Returns current metadata.
+    pub fn metadata(&self) -> &TableMetadata {
+        &self.metadata
+    }
+
+    /// Returns current metadata location.
+    pub fn metadata_location(&self) -> Option<&str> {
+        self.metadata_location.as_deref()
+    }
 }


Reply via email to