This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e26bda3 feat: Implement load table api. (#89)
e26bda3 is described below
commit e26bda3fc2f6df7dde49a6cf7a5f3faafe49313b
Author: Renjie Liu <[email protected]>
AuthorDate: Wed Nov 8 16:57:02 2023 +0800
feat: Implement load table api. (#89)
* Init commit
* Some comments
* Done
* Fix format
* fix clippy
* Fix fmt
* sort
* Use error response
* Fix
* Fix comments
* fmt
* fmt
* Fix
* Rename
---
crates/catalog/rest/Cargo.toml | 2 +
crates/catalog/rest/src/catalog.rs | 238 ++++++++++++++-
.../catalog/rest/testdata/load_table_response.json | 68 +++++
crates/iceberg/Cargo.toml | 1 +
crates/iceberg/src/catalog/mod.rs | 35 ++-
crates/iceberg/src/io.rs | 53 +++-
crates/iceberg/src/spec/partition.rs | 3 +
crates/iceberg/src/spec/schema.rs | 4 +-
crates/iceberg/src/spec/snapshot.rs | 6 +-
crates/iceberg/src/spec/sort.rs | 3 +
crates/iceberg/src/spec/table_metadata.rs | 336 ++++++++++++++-------
crates/iceberg/src/table.rs | 25 +-
12 files changed, 636 insertions(+), 138 deletions(-)
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index c44f08b..86bb71c 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -28,6 +28,7 @@ keywords = ["iceberg", "rest", "catalog"]
[dependencies]
async-trait = "0.1"
+chrono = "0.4"
iceberg = { path = "../../iceberg" }
reqwest = { version = "^0.11", features = ["json"] }
serde = { version = "^1.0", features = ["rc"] }
@@ -35,6 +36,7 @@ serde_derive = "^1.0"
serde_json = "^1.0"
typed-builder = "^0.18"
urlencoding = "2"
+uuid = { version = "1.5.0", features = ["v4"] }
[dev-dependencies]
mockito = "^1"
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index f267437..599f405 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -26,6 +26,8 @@ use serde::de::DeserializeOwned;
use typed_builder::TypedBuilder;
use urlencoding::encode;
+use crate::catalog::_serde::LoadTableResponse;
+use iceberg::io::{FileIO, FileIOBuilder};
use iceberg::table::Table;
use iceberg::Result;
use iceberg::{
@@ -33,8 +35,8 @@ use iceberg::{
};
use self::_serde::{
- CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse,
ListTableResponse,
- NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
+ CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse,
NamespaceSerde,
+ RenameTableRequest, NO_CONTENT, OK,
};
const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
@@ -195,7 +197,7 @@ impl Catalog for RestCatalog {
let resp = self
.client
- .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
+ .query::<ListNamespaceResponse, ErrorResponse,
OK>(request.build()?)
.await?;
resp.namespaces
@@ -222,7 +224,7 @@ impl Catalog for RestCatalog {
let resp = self
.client
- .query::<NamespaceSerde, ErrorModel, OK>(request)
+ .query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;
Namespace::try_from(resp)
@@ -238,7 +240,7 @@ impl Catalog for RestCatalog {
let resp = self
.client
- .query::<NamespaceSerde, ErrorModel, OK>(request)
+ .query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;
Namespace::try_from(resp)
}
@@ -267,7 +269,7 @@ impl Catalog for RestCatalog {
.build()?;
self.client
- .execute::<ErrorModel, NO_CONTENT>(request)
+ .execute::<ErrorResponse, NO_CONTENT>(request)
.await
.map(|_| true)
}
@@ -280,7 +282,9 @@ impl Catalog for RestCatalog {
.delete(self.config.namespace_endpoint(namespace))
.build()?;
- self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ self.client
+ .execute::<ErrorResponse, NO_CONTENT>(request)
+ .await
}
/// List tables from namespace.
@@ -293,7 +297,7 @@ impl Catalog for RestCatalog {
let resp = self
.client
- .query::<ListTableResponse, ErrorModel, OK>(request)
+ .query::<ListTableResponse, ErrorResponse, OK>(request)
.await?;
Ok(resp.identifiers)
@@ -312,11 +316,43 @@ impl Catalog for RestCatalog {
}
/// Load table from the catalog.
- async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Creating table not supported yet!",
- ))
+ async fn load_table(&self, table: &TableIdent) -> Result<Table> {
+ let request = self
+ .client
+ .0
+ .get(self.config.table_endpoint(table))
+ .build()?;
+
+ let resp = self
+ .client
+ .query::<LoadTableResponse, ErrorResponse, OK>(request)
+ .await?;
+
+ let mut props = self.config.props.clone();
+ if let Some(config) = resp.config {
+ props.extend(config);
+ }
+
+ let file_io = match self
+ .config
+ .warehouse
+ .as_ref()
+ .or_else(|| resp.metadata_location.as_ref())
+ {
+ Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
+ None => FileIOBuilder::new("s3").with_props(props).build()?,
+ };
+
+ let table_builder = Table::builder()
+ .identifier(table.clone())
+ .file_io(file_io)
+ .metadata(resp.metadata);
+
+ if let Some(metadata_location) = resp.metadata_location {
+ Ok(table_builder.metadata_location(metadata_location).build())
+ } else {
+ Ok(table_builder.build())
+ }
}
/// Drop a table from the catalog.
@@ -327,7 +363,9 @@ impl Catalog for RestCatalog {
.delete(self.config.table_endpoint(table))
.build()?;
- self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ self.client
+ .execute::<ErrorResponse, NO_CONTENT>(request)
+ .await
}
/// Check if a table exists in the catalog.
@@ -339,7 +377,7 @@ impl Catalog for RestCatalog {
.build()?;
self.client
- .execute::<ErrorModel, NO_CONTENT>(request)
+ .execute::<ErrorResponse, NO_CONTENT>(request)
.await
.map(|_| true)
}
@@ -356,7 +394,9 @@ impl Catalog for RestCatalog {
})
.build()?;
- self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ self.client
+ .execute::<ErrorResponse, NO_CONTENT>(request)
+ .await
}
/// Update a table to the catalog.
@@ -412,10 +452,12 @@ mod _serde {
use serde_derive::{Deserialize, Serialize};
+ use iceberg::spec::TableMetadata;
use iceberg::{Error, ErrorKind, Namespace, TableIdent};
pub(super) const OK: u16 = 200u16;
pub(super) const NO_CONTENT: u16 = 204u16;
+
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct CatalogConfig {
pub(super) overrides: HashMap<String, String>,
@@ -534,11 +576,26 @@ mod _serde {
pub(super) source: TableIdent,
pub(super) destination: TableIdent,
}
+
+ #[derive(Debug, Deserialize)]
+ #[serde(rename_all = "kebab-case")]
+ pub(super) struct LoadTableResponse {
+ pub(super) metadata_location: Option<String>,
+ pub(super) metadata: TableMetadata,
+ pub(super) config: Option<HashMap<String, String>>,
+ }
}
#[cfg(test)]
mod tests {
+ use iceberg::spec::ManifestListLocation::ManifestListFile;
+ use iceberg::spec::{
+ FormatVersion, NestedField, Operation, PrimitiveType, Schema,
Snapshot, SnapshotLog,
+ SortOrder, Summary, Type,
+ };
use mockito::{Mock, Server, ServerGuard};
+ use std::sync::Arc;
+ use uuid::uuid;
use super::*;
@@ -884,4 +941,153 @@ mod tests {
config_mock.assert_async().await;
rename_table_mock.assert_async().await;
}
+
+ #[tokio::test]
+ async fn test_load_table() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let rename_table_mock = server
+ .mock("GET", "/v1/namespaces/ns1/tables/test1")
+ .with_status(200)
+ .with_body_from_file(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "load_table_response.json"
+ ))
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table = catalog
+ .load_table(&TableIdent::new(
+ NamespaceIdent::new("ns1".to_string()),
+ "test1".to_string(),
+ ))
+ .await
+ .unwrap();
+
+ assert_eq!(
+ &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
+ table.identifier()
+ );
+
assert_eq!("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
table.metadata_location().unwrap());
+ assert_eq!(FormatVersion::V1, table.metadata().format_version());
+ assert_eq!("s3://warehouse/database/table",
table.metadata().location());
+ assert_eq!(
+ uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
+ table.metadata().uuid()
+ );
+ assert_eq!(1646787054459, table.metadata().last_updated_ms());
+ assert_eq!(
+ vec![&Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::optional(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ ])
+ .build()
+ .unwrap()
+ )],
+ table.metadata().schemas_iter().collect::<Vec<_>>()
+ );
+ assert_eq!(
+ &HashMap::from([
+ ("owner".to_string(), "bryan".to_string()),
+ (
+ "write.metadata.compression-codec".to_string(),
+ "gzip".to_string()
+ )
+ ]),
+ table.metadata().properties()
+ );
+ assert_eq!(vec![&Arc::new(Snapshot::builder()
+ .with_snapshot_id(3497810964824022504)
+ .with_timestamp_ms(1646787054459)
+
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::from_iter([
+ ("spark.app.id",
"local-1646787004168"),
+ ("added-data-files", "1"),
+ ("added-records", "1"),
+ ("added-files-size", "697"),
+ ("changed-partition-count", "1"),
+ ("total-records", "1"),
+ ("total-files-size", "697"),
+ ("total-data-files", "1"),
+ ("total-delete-files", "0"),
+ ("total-position-deletes", "0"),
+ ("total-equality-deletes", "0")
+ ].iter().map(|p|(p.0.to_string(), p.1.to_string())))
+ }).build().unwrap()
+ )], table.metadata().snapshots().collect::<Vec<_>>());
+ assert_eq!(
+ &[SnapshotLog {
+ timestamp_ms: 1646787054459,
+ snapshot_id: 3497810964824022504
+ }],
+ table.metadata().history()
+ );
+ assert_eq!(
+ vec![&Arc::new(SortOrder {
+ order_id: 0,
+ fields: vec![]
+ })],
+ table.metadata().sort_orders_iter().collect::<Vec<_>>()
+ );
+
+ config_mock.assert_async().await;
+ rename_table_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_load_table_404() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let rename_table_mock = server
+ .mock("GET", "/v1/namespaces/ns1/tables/test1")
+ .with_status(404)
+ .with_body(r#"
+{
+ "error": {
+ "message": "Table does not exist: ns1.test1 in warehouse
8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
+ "type": "NoSuchNamespaceErrorException",
+ "code": 404
+ }
+}
+ "#)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let table = catalog
+ .load_table(&TableIdent::new(
+ NamespaceIdent::new("ns1".to_string()),
+ "test1".to_string(),
+ ))
+ .await;
+
+ assert!(table.is_err());
+ assert!(table
+ .err()
+ .unwrap()
+ .message()
+ .contains("Table does not exist"));
+
+ config_mock.assert_async().await;
+ rename_table_mock.assert_async().await;
+ }
}
diff --git a/crates/catalog/rest/testdata/load_table_response.json
b/crates/catalog/rest/testdata/load_table_response.json
new file mode 100644
index 0000000..012f0e9
--- /dev/null
+++ b/crates/catalog/rest/testdata/load_table_response.json
@@ -0,0 +1,68 @@
+{
+ "metadata-location":
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
+ "metadata": {
+ "format-version": 1,
+ "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
+ "location": "s3://warehouse/database/table",
+ "last-updated-ms": 1646787054459,
+ "last-column-id": 2,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": false, "type": "int"},
+ {"id": 2, "name": "data", "required": false, "type": "string"}
+ ]
+ },
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": false, "type": "int"},
+ {"id": 2, "name": "data", "required": false, "type": "string"}
+ ]
+ }
+ ],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {"owner": "bryan", "write.metadata.compression-codec":
"gzip"},
+ "current-snapshot-id": 3497810964824022504,
+ "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}},
+ "snapshots": [
+ {
+ "snapshot-id": 3497810964824022504,
+ "timestamp-ms": 1646787054459,
+ "summary": {
+ "operation": "append",
+ "spark.app.id": "local-1646787004168",
+ "added-data-files": "1",
+ "added-records": "1",
+ "added-files-size": "697",
+ "changed-partition-count": "1",
+ "total-records": "1",
+ "total-files-size": "697",
+ "total-data-files": "1",
+ "total-delete-files": "0",
+ "total-position-deletes": "0",
+ "total-equality-deletes": "0"
+ },
+ "manifest-list":
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+ "schema-id": 0
+ }
+ ],
+ "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id":
3497810964824022504}],
+ "metadata-log": [
+ {
+ "timestamp-ms": 1646787031514,
+ "metadata-file":
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json"
+ }
+ ]
+ },
+ "config": {"client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"}
+}
\ No newline at end of file
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index c7caa24..871304c 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -53,6 +53,7 @@ serde_bytes = "0.11.8"
serde_derive = "^1.0"
serde_json = "^1.0"
serde_repr = "0.1.16"
+typed-builder = "^0.17"
url = "2"
urlencoding = "2"
uuid = "1.4.1"
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index d58eaa2..3a582a1 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -95,7 +95,6 @@ pub trait Catalog {
/// The namespace identifier is a list of strings, where each string is a
/// component of the namespace. It's catalog implementer's responsibility to
/// handle the namespace identifier correctly.
-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct NamespaceIdent(Vec<String>);
@@ -116,6 +115,11 @@ impl NamespaceIdent {
Ok(Self(names))
}
+ /// Try to create namespace identifier from an iterator of string.
+ pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) ->
Result<Self> {
+ Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect())
+ }
+
/// Returns url encoded format.
pub fn encode_in_url(&self) -> String {
encode(&self.as_ref().join("\u{1F}")).to_string()
@@ -194,6 +198,20 @@ impl TableIdent {
pub fn name(&self) -> &str {
&self.name
}
+
+ /// Try to create table identifier from an iterator of string.
+ pub fn from_strs(iter: impl IntoIterator<Item = impl ToString>) ->
Result<Self> {
+ let mut vec: Vec<String> = iter.into_iter().map(|s|
s.to_string()).collect();
+ let table_name = vec.pop().ok_or_else(|| {
+ Error::new(ErrorKind::DataInvalid, "Table identifier can't be
empty!")
+ })?;
+ let namespace_ident = NamespaceIdent::from_vec(vec)?;
+
+ Ok(Self {
+ namespace: namespace_ident,
+ name: table_name,
+ })
+ }
}
/// TableCreation represents the creation of a table in the catalog.
@@ -256,3 +274,18 @@ pub enum TableRequirement {
///
/// TODO: we should fill with UpgradeFormatVersionUpdate, AddSchemaUpdate and
so on.
pub enum TableUpdate {}
+
+#[cfg(test)]
+mod tests {
+ use crate::{NamespaceIdent, TableIdent};
+
+ #[test]
+ fn test_create_table_id() {
+ let table_id = TableIdent {
+ namespace: NamespaceIdent::from_strs(vec!["ns1"]).unwrap(),
+ name: "t1".to_string(),
+ };
+
+ assert_eq!(table_id, TableIdent::from_strs(vec!["ns1",
"t1"]).unwrap());
+ }
+}
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 9d55aac..2444fda 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -29,6 +29,16 @@
//! .unwrap();
//! ```
//!
+//! Or you can pass a path to ask `FileIO` to infer schema for you:
+//! ```rust
+//! use iceberg::io::{FileIO, S3_REGION};
+//! let file_io = FileIO::from_path("s3://bucket/a")
+//! .unwrap()
+//! .with_prop(S3_REGION, "us-east-1")
+//! .build()
+//! .unwrap();
+//! ```
+//!
//! # How to use `FileIO`
//!
//! Currently `FileIO` provides simple methods for file operations:
@@ -68,6 +78,7 @@ static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static
str>> = Lazy::new(
});
const DEFAULT_ROOT_PATH: &str = "/";
+
/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
@@ -132,6 +143,29 @@ impl FileIOBuilder {
}
impl FileIO {
+ /// Try to infer file io scheme from path.
+ ///
+ /// If it's a valid url, for example http://example.org, url scheme will
be used.
+ /// If it's not a valid url, will try to detect if it's a file path.
+ ///
+ /// Otherwise will return parsing error.
+ pub fn from_path(path: impl AsRef<str>) -> Result<FileIOBuilder> {
+ let url = Url::parse(path.as_ref())
+ .map_err(Error::from)
+ .or_else(|e| {
+ Url::from_file_path(path.as_ref()).map_err(|_| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Input is neither a valid url nor path",
+ )
+ .with_context("input", path.as_ref().to_string())
+ .with_source(e)
+ })
+ })?;
+
+ Ok(FileIOBuilder::new(url.scheme()))
+ }
+
/// Deletes file.
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
@@ -351,7 +385,6 @@ impl Storage {
#[cfg(test)]
mod tests {
-
use std::io::Write;
use std::{fs::File, path::Path};
@@ -453,4 +486,22 @@ mod tests {
assert_eq!(content, &read_content);
}
+
+ #[test]
+ fn test_create_file_from_path() {
+ let io = FileIO::from_path("/tmp/a").unwrap();
+ assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+ let io = FileIO::from_path("file:/tmp/b").unwrap();
+ assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+ let io = FileIO::from_path("file:///tmp/c").unwrap();
+ assert_eq!("file", io.scheme_str.unwrap().as_str());
+
+ let io = FileIO::from_path("s3://bucket/a").unwrap();
+ assert_eq!("s3", io.scheme_str.unwrap().as_str());
+
+ let io = FileIO::from_path("tmp/||c");
+ assert!(io.is_err());
+ }
}
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index 505f248..c5ea8f3 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -19,9 +19,12 @@
* Partitioning
*/
use serde::{Deserialize, Serialize};
+use std::sync::Arc;
use super::transform::Transform;
+/// Reference to [`PartitionSpec`].
+pub type PartitionSpecRef = Arc<PartitionSpec>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
/// Partition fields capture the transform from table data to partition values.
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index cef2dcc..d2204be 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -28,10 +28,12 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
-use std::sync::OnceLock;
+use std::sync::{Arc, OnceLock};
use _serde::SchemaEnum;
+/// Reference to [`Schema`].
+pub type SchemaRef = Arc<Schema>;
const DEFAULT_SCHEMA_ID: i32 = 0;
/// Defines schema in iceberg.
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
index d4e941b..a04bb99 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -18,12 +18,14 @@
/*!
* Snapshots
*/
-use std::collections::HashMap;
-
use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+use std::sync::Arc;
use super::table_metadata::SnapshotLog;
+/// Reference to [`Snapshot`].
+pub type SnapshotRef = Arc<Snapshot>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "lowercase")]
/// The operation field is used by some operations, like snapshot expiration,
to skip processing certain snapshots.
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
index 357e68f..dcb19d3 100644
--- a/crates/iceberg/src/spec/sort.rs
+++ b/crates/iceberg/src/spec/sort.rs
@@ -19,9 +19,12 @@
* Sorting
*/
use serde::{Deserialize, Serialize};
+use std::sync::Arc;
use super::transform::Transform;
+/// Reference to [`SortOrder`].
+pub type SortOrderRef = Arc<SortOrder>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
/// Sort direction in a partition, either ascending or descending
pub enum SortDirection {
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index ccd58f2..6abe959 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -20,19 +20,14 @@ Defines the [table
metadata](https://iceberg.apache.org/spec/#table-metadata).
The main struct here is [TableMetadataV2] which defines the data for a table.
*/
-use std::{collections::HashMap, sync::Arc};
-
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
+use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
-use crate::{Error, ErrorKind};
-
use super::{
- partition::PartitionSpec,
- schema::Schema,
snapshot::{Snapshot, SnapshotReference, SnapshotRetention},
- sort::SortOrder,
+ PartitionSpecRef, SchemaRef, SnapshotRef, SortOrderRef,
};
use _serde::TableMetadataEnum;
@@ -44,6 +39,9 @@ static DEFAULT_SORT_ORDER_ID: i64 = 0;
#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
/// Fields for the version 2 of the table metadata.
+///
+/// We assume that this data structure is always valid, so we will panic when
invalid error happens.
+/// We check the validity of this data structure when constructing.
pub struct TableMetadata {
/// Integer Version for the format.
format_version: FormatVersion,
@@ -58,11 +56,11 @@ pub struct TableMetadata {
/// An integer; the highest assigned column ID for the table.
last_column_id: i32,
/// A list of schemas, stored as objects with schema-id.
- schemas: HashMap<i32, Arc<Schema>>,
+ schemas: HashMap<i32, SchemaRef>,
/// ID of the table’s current schema.
current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
- partition_specs: HashMap<i32, PartitionSpec>,
+ partition_specs: HashMap<i32, PartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
default_spec_id: i32,
/// An integer; the highest assigned partition field ID across all
partition specs for the table.
@@ -78,7 +76,7 @@ pub struct TableMetadata {
/// data files exist in the file system. A data file must not be deleted
/// from the file system until the last snapshot in which it was listed is
/// garbage collected.
- snapshots: Option<HashMap<i64, Arc<Snapshot>>>,
+ snapshots: HashMap<i64, SnapshotRef>,
/// A list (optional) of timestamp and snapshot ID pairs that encodes
changes
/// to the current snapshot for the table. Each time the
current-snapshot-id
/// is changed, a new entry should be added with the last-updated-ms
@@ -96,7 +94,7 @@ pub struct TableMetadata {
metadata_log: Vec<MetadataLog>,
/// A list of sort orders, stored as full sort order objects.
- sort_orders: HashMap<i64, SortOrder>,
+ sort_orders: HashMap<i64, SortOrderRef>,
/// Default sort order id of the table. Note that this could be used by
/// writers, but is not used when reading because reads use the specs
/// stored in manifest files.
@@ -109,52 +107,141 @@ pub struct TableMetadata {
}
impl TableMetadata {
+ /// Returns format version of this metadata.
+ #[inline]
+ pub fn format_version(&self) -> FormatVersion {
+ self.format_version
+ }
+
+ /// Returns uuid of current table.
+ #[inline]
+ pub fn uuid(&self) -> Uuid {
+ self.table_uuid
+ }
+
+ /// Returns table location.
+ #[inline]
+ pub fn location(&self) -> &str {
+ self.location.as_str()
+ }
+
+ /// Returns last sequence number.
+ #[inline]
+ pub fn last_sequence_number(&self) -> i64 {
+ self.last_sequence_number
+ }
+
+ /// Returns last updated time.
+ #[inline]
+ pub fn last_updated_ms(&self) -> i64 {
+ self.last_updated_ms
+ }
+
+ /// Returns schemas
+ #[inline]
+ pub fn schemas_iter(&self) -> impl Iterator<Item = &SchemaRef> {
+ self.schemas.values()
+ }
+
+ /// Lookup schema by id.
+ #[inline]
+ pub fn schema_by_id(&self, schema_id: i32) -> Option<&SchemaRef> {
+ self.schemas.get(&schema_id)
+ }
+
/// Get current schema
#[inline]
- pub fn current_schema(&self) -> Result<Arc<Schema>, Error> {
- self.schemas
- .get(&self.current_schema_id)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Schema id {} not found!", self.current_schema_id),
- )
- })
- .cloned()
+ pub fn current_schema(&self) -> &SchemaRef {
+ self.schema_by_id(self.current_schema_id)
+ .expect("Current schema id set, but not found in table metadata")
}
+
+ /// Returns all partition specs.
+ #[inline]
+ pub fn partition_specs_iter(&self) -> impl Iterator<Item =
&PartitionSpecRef> {
+ self.partition_specs.values()
+ }
+
+ /// Lookup partition spec by id.
+ #[inline]
+ pub fn partition_spec_by_id(&self, spec_id: i32) ->
Option<&PartitionSpecRef> {
+ self.partition_specs.get(&spec_id)
+ }
+
/// Get default partition spec
#[inline]
- pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> {
- self.partition_specs
- .get(&self.default_spec_id)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Partition spec id {} not found!",
self.default_spec_id),
- )
- })
+ pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> {
+ if self.default_spec_id == DEFAULT_SPEC_ID {
+ self.partition_spec_by_id(DEFAULT_SPEC_ID)
+ } else {
+ Some(
+ self.partition_spec_by_id(DEFAULT_SPEC_ID)
+ .expect("Default partition spec id set, but not found in
table metadata"),
+ )
+ }
+ }
+
+ /// Returns all snapshots
+ #[inline]
+ pub fn snapshots(&self) -> impl Iterator<Item = &SnapshotRef> {
+ self.snapshots.values()
+ }
+
+ /// Lookup snapshot by id.
+ #[inline]
+ pub fn snapshot_by_id(&self, snapshot_id: i64) -> Option<&SnapshotRef> {
+ self.snapshots.get(&snapshot_id)
+ }
+
+ /// Returns snapshot history.
+ #[inline]
+ pub fn history(&self) -> &[SnapshotLog] {
+ &self.snapshot_log
}
/// Get current snapshot
#[inline]
- pub fn current_snapshot(&self) -> Result<Option<Arc<Snapshot>>, Error> {
- match (&self.current_snapshot_id, &self.snapshots) {
- (Some(snapshot_id), Some(snapshots)) =>
Ok(snapshots.get(snapshot_id).cloned()),
- (Some(-1), None) => Ok(None),
- (None, None) => Ok(None),
- (Some(_), None) => Err(Error::new(
- ErrorKind::DataInvalid,
- "Snapshot id is provided but there are no
snapshots".to_string(),
- )),
- (None, Some(_)) => Err(Error::new(
- ErrorKind::DataInvalid,
- "There are snapshots but no snapshot id is
provided".to_string(),
- )),
+ pub fn current_snapshot(&self) -> Option<&SnapshotRef> {
+ self.current_snapshot_id.map(|s| {
+ self.snapshot_by_id(s)
+ .expect("Current snapshot id has been set, but doesn't exist
in metadata")
+ })
+ }
+
+ /// Return all sort orders.
+ #[inline]
+ pub fn sort_orders_iter(&self) -> impl Iterator<Item = &SortOrderRef> {
+ self.sort_orders.values()
+ }
+
+ /// Lookup sort order by id.
+ #[inline]
+ pub fn sort_order_by_id(&self, sort_order_id: i64) ->
Option<&SortOrderRef> {
+ self.sort_orders.get(&sort_order_id)
+ }
+
+ /// Returns default sort order id.
+ #[inline]
+ pub fn default_sort_order(&self) -> Option<&SortOrderRef> {
+ if self.default_sort_order_id == DEFAULT_SORT_ORDER_ID {
+ self.sort_orders.get(&DEFAULT_SORT_ORDER_ID)
+ } else {
+ Some(
+ self.sort_orders
+ .get(&self.default_sort_order_id)
+ .expect("Default order id has been set, but not found in
table metadata!"),
+ )
}
}
+ /// Returns properties of table.
+ #[inline]
+ pub fn properties(&self) -> &HashMap<String, String> {
+ &self.properties
+ }
+
/// Append snapshot to table
- pub fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error>
{
+ pub fn append_snapshot(&mut self, snapshot: Snapshot) {
self.last_updated_ms = snapshot.timestamp();
self.last_sequence_number = snapshot.sequence_number();
@@ -174,25 +261,9 @@ impl TableMetadata {
)
});
- if let Some(snapshots) = &mut self.snapshots {
- self.snapshot_log.push(snapshot.log());
- snapshots.insert(snapshot.snapshot_id(), Arc::new(snapshot));
- } else {
- if !self.snapshot_log.is_empty() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Snapshot logs is empty while snapshots is not!",
- ));
- }
-
- self.snapshot_log = vec![snapshot.log()];
- self.snapshots = Some(HashMap::from_iter(vec![(
- snapshot.snapshot_id(),
- Arc::new(snapshot),
- )]));
- }
-
- Ok(())
+ self.snapshot_log.push(snapshot.log());
+ self.snapshots
+ .insert(snapshot.snapshot_id(), Arc::new(snapshot));
}
}
@@ -207,6 +278,7 @@ pub(super) mod _serde {
use serde::{Deserialize, Serialize};
use uuid::Uuid;
+ use crate::spec::Snapshot;
use crate::{
spec::{
schema::_serde::{SchemaV1, SchemaV2},
@@ -376,23 +448,32 @@ pub(super) mod _serde {
}?,
schemas,
partition_specs: HashMap::from_iter(
- value.partition_specs.into_iter().map(|x| (x.spec_id, x)),
+ value
+ .partition_specs
+ .into_iter()
+ .map(|x| (x.spec_id, Arc::new(x))),
),
default_spec_id: value.default_spec_id,
last_partition_id: value.last_partition_id,
properties: value.properties.unwrap_or_default(),
current_snapshot_id,
- snapshots: value.snapshots.map(|snapshots| {
- HashMap::from_iter(
- snapshots
- .into_iter()
- .map(|x| (x.snapshot_id, Arc::new(x.into()))),
- )
- }),
+ snapshots: value
+ .snapshots
+ .map(|snapshots| {
+ HashMap::from_iter(
+ snapshots
+ .into_iter()
+ .map(|x| (x.snapshot_id, Arc::new(x.into()))),
+ )
+ })
+ .unwrap_or_default(),
snapshot_log: value.snapshot_log.unwrap_or_default(),
metadata_log: value.metadata_log.unwrap_or_default(),
sort_orders: HashMap::from_iter(
- value.sort_orders.into_iter().map(|x| (x.order_id, x)),
+ value
+ .sort_orders
+ .into_iter()
+ .map(|x| (x.order_id, Arc::new(x))),
),
default_sort_order_id: value.default_sort_order_id,
refs: value.refs.unwrap_or_else(|| {
@@ -453,7 +534,7 @@ pub(super) mod _serde {
}]
})
.into_iter()
- .map(|x| (x.spec_id, x)),
+ .map(|x| (x.spec_id, Arc::new(x))),
);
Ok(TableMetadata {
format_version: FormatVersion::V1,
@@ -490,13 +571,14 @@ pub(super) mod _serde {
.collect::<Result<Vec<_>, Error>>()?,
))
})
- .transpose()?,
+ .transpose()?
+ .unwrap_or_default(),
snapshot_log: value.snapshot_log.unwrap_or_default(),
metadata_log: value.metadata_log.unwrap_or_default(),
sort_orders: match value.sort_orders {
- Some(sort_orders) => {
- HashMap::from_iter(sort_orders.into_iter().map(|x|
(x.order_id, x)))
- }
+ Some(sort_orders) => HashMap::from_iter(
+ sort_orders.into_iter().map(|x| (x.order_id,
Arc::new(x))),
+ ),
None => HashMap::new(),
},
default_sort_order_id:
value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID),
@@ -534,7 +616,11 @@ pub(super) mod _serde {
})
.collect(),
current_schema_id: v.current_schema_id,
- partition_specs: v.partition_specs.into_values().collect(),
+ partition_specs: v
+ .partition_specs
+ .into_values()
+ .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s|
s.as_ref().clone()))
+ .collect(),
default_spec_id: v.default_spec_id,
last_partition_id: v.last_partition_id,
properties: if v.properties.is_empty() {
@@ -543,16 +629,20 @@ pub(super) mod _serde {
Some(v.properties)
},
current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
- snapshots: v.snapshots.map(|snapshots| {
- snapshots
- .into_values()
- .map(|x| {
- Arc::try_unwrap(x)
- .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
- .into()
- })
- .collect()
- }),
+ snapshots: if v.snapshots.is_empty() {
+ None
+ } else {
+ Some(
+ v.snapshots
+ .into_values()
+ .map(|x| {
+ Arc::try_unwrap(x)
+ .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
+ .into()
+ })
+ .collect(),
+ )
+ },
snapshot_log: if v.snapshot_log.is_empty() {
None
} else {
@@ -563,7 +653,11 @@ pub(super) mod _serde {
} else {
Some(v.metadata_log)
},
- sort_orders: v.sort_orders.into_values().collect(),
+ sort_orders: v
+ .sort_orders
+ .into_values()
+ .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s|
s.as_ref().clone()))
+ .collect(),
default_sort_order_id: v.default_sort_order_id,
refs: Some(v.refs),
}
@@ -601,7 +695,12 @@ pub(super) mod _serde {
.get(&v.default_spec_id)
.map(|x| x.fields.clone())
.unwrap_or_default(),
- partition_specs:
Some(v.partition_specs.into_values().collect()),
+ partition_specs: Some(
+ v.partition_specs
+ .into_values()
+ .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s|
s.as_ref().clone()))
+ .collect(),
+ ),
default_spec_id: Some(v.default_spec_id),
last_partition_id: Some(v.last_partition_id),
properties: if v.properties.is_empty() {
@@ -610,16 +709,16 @@ pub(super) mod _serde {
Some(v.properties)
},
current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
- snapshots: v.snapshots.map(|snapshots| {
- snapshots
- .into_values()
- .map(|x| {
- Arc::try_unwrap(x)
- .unwrap_or_else(|snapshot|
snapshot.as_ref().clone())
- .into()
- })
- .collect()
- }),
+ snapshots: if v.snapshots.is_empty() {
+ None
+ } else {
+ Some(
+ v.snapshots
+ .into_values()
+ .map(|x| Snapshot::clone(&x).into())
+ .collect(),
+ )
+ },
snapshot_log: if v.snapshot_log.is_empty() {
None
} else {
@@ -630,14 +729,19 @@ pub(super) mod _serde {
} else {
Some(v.metadata_log)
},
- sort_orders: Some(v.sort_orders.into_values().collect()),
+ sort_orders: Some(
+ v.sort_orders
+ .into_values()
+ .map(|s| Arc::try_unwrap(s).unwrap_or_else(|s|
s.as_ref().clone()))
+ .collect(),
+ ),
default_sort_order_id: Some(v.default_sort_order_id),
}
}
}
}
-#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
+#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
#[repr(u8)]
/// Iceberg format version
pub enum FormatVersion {
@@ -778,12 +882,12 @@ mod tests {
last_column_id: 1,
schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
current_schema_id: 1,
- partition_specs: HashMap::from_iter(vec![(1, partition_spec)]),
+ partition_specs: HashMap::from_iter(vec![(1,
partition_spec.into())]),
default_spec_id: 1,
last_partition_id: 1000,
default_sort_order_id: 0,
sort_orders: HashMap::from_iter(vec![]),
- snapshots: None,
+ snapshots: HashMap::default(),
current_snapshot_id: None,
last_sequence_number: 1,
properties: HashMap::from_iter(vec![(
@@ -948,12 +1052,12 @@ mod tests {
last_column_id: 5,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
default_spec_id: 0,
last_partition_id: 1000,
default_sort_order_id: 0,
- sort_orders: HashMap::from_iter(vec![(0, sort_order)]),
- snapshots: Some(HashMap::from_iter(vec![(638933773299822130,
Arc::new(snapshot))])),
+ sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
+ snapshots: HashMap::from_iter(vec![(638933773299822130,
Arc::new(snapshot))]),
current_snapshot_id: Some(638933773299822130),
last_sequence_number: 0,
properties:
HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]),
@@ -1093,15 +1197,15 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1,
Arc::new(schema2))]),
current_schema_id: 1,
- partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
default_spec_id: 0,
last_partition_id: 1000,
default_sort_order_id: 3,
- sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
- snapshots: Some(HashMap::from_iter(vec![
+ sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+ snapshots: HashMap::from_iter(vec![
(3051729675574597004, Arc::new(snapshot1)),
(3055729675574597004, Arc::new(snapshot2)),
- ])),
+ ]),
current_snapshot_id: Some(3055729675574597004),
last_sequence_number: 34,
properties: HashMap::new(),
@@ -1194,12 +1298,12 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
default_spec_id: 0,
last_partition_id: 1000,
default_sort_order_id: 3,
- sort_orders: HashMap::from_iter(vec![(3, sort_order)]),
- snapshots: None,
+ sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
+ snapshots: HashMap::default(),
current_snapshot_id: None,
last_sequence_number: 34,
properties: HashMap::new(),
@@ -1256,12 +1360,12 @@ mod tests {
last_column_id: 3,
schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
current_schema_id: 0,
- partition_specs: HashMap::from_iter(vec![(0, partition_spec)]),
+ partition_specs: HashMap::from_iter(vec![(0,
partition_spec.into())]),
default_spec_id: 0,
last_partition_id: 0,
default_sort_order_id: 0,
sort_orders: HashMap::new(),
- snapshots: Some(HashMap::new()),
+ snapshots: HashMap::new(),
current_snapshot_id: None,
last_sequence_number: 0,
properties: HashMap::new(),
diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs
index ebe6753..981ac8b 100644
--- a/crates/iceberg/src/table.rs
+++ b/crates/iceberg/src/table.rs
@@ -17,10 +17,33 @@
//! Table API for Apache Iceberg
+use crate::io::FileIO;
use crate::spec::TableMetadata;
+use crate::TableIdent;
+use typed_builder::TypedBuilder;
/// Table represents a table in the catalog.
+#[derive(TypedBuilder)]
pub struct Table {
- metadata_location: String,
+ file_io: FileIO,
+ #[builder(default, setter(strip_option))]
+ metadata_location: Option<String>,
metadata: TableMetadata,
+ identifier: TableIdent,
+}
+
+impl Table {
+ /// Returns table identifier.
+ pub fn identifier(&self) -> &TableIdent {
+ &self.identifier
+ }
+ /// Returns current metadata.
+ pub fn metadata(&self) -> &TableMetadata {
+ &self.metadata
+ }
+
+ /// Returns current metadata location.
+ pub fn metadata_location(&self) -> Option<&str> {
+ self.metadata_location.as_deref()
+ }
}