This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f17bf30 feat: Initial version of rest catalog. (#78)
f17bf30 is described below
commit f17bf305c9e3ea43addaeee51236ecf8ffd3629f
Author: Renjie Liu <[email protected]>
AuthorDate: Sun Oct 29 01:58:53 2023 +0800
feat: Initial version of rest catalog. (#78)
* feat: Add rest catalog
* feat: Initial checkin of rest catalog
* Add tests
* Fix typo
* Fix
* Fix comments
* Fix comment
* Move rest catalog to another crate
* Remove unused deps
* Rename to iceberg-catalog-rest
* Fix
* Fix comments
* Fix comments
---
Cargo.toml | 2 +-
crates/{iceberg => catalog/rest}/Cargo.toml | 35 +-
crates/catalog/rest/src/catalog.rs | 887 ++++++++++++++++++++++++++++
crates/catalog/rest/src/lib.rs | 23 +
crates/iceberg/Cargo.toml | 3 +
crates/iceberg/src/catalog/mod.rs | 47 +-
crates/iceberg/src/error.rs | 12 +
crates/iceberg/src/rest.rs | 22 +
8 files changed, 998 insertions(+), 33 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 34eef8d..e8e0473 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,4 +17,4 @@
[workspace]
resolver = "2"
-members = ["crates/*"]
+members = ["crates/catalog/*", "crates/iceberg"]
diff --git a/crates/iceberg/Cargo.toml b/crates/catalog/rest/Cargo.toml
similarity index 65%
copy from crates/iceberg/Cargo.toml
copy to crates/catalog/rest/Cargo.toml
index 9f2d4a8..0531a51 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -16,45 +16,26 @@
# under the License.
[package]
-name = "iceberg"
+name = "iceberg-catalog-rest"
version = "0.1.0"
edition = "2021"
categories = ["database"]
-description = "Apache Iceberg Rust implementation"
+description = "Apache Iceberg Rust REST API"
repository = "https://github.com/apache/iceberg-rust"
license = "Apache-2.0"
-keywords = ["iceberg"]
+keywords = ["iceberg", "rest", "catalog"]
[dependencies]
-anyhow = "1.0.72"
-apache-avro = "0.16"
-arrow-arith = { version = ">=46" }
-arrow-array = { version = ">=46" }
-arrow-schema = { version = ">=46" }
async-trait = "0.1"
-bimap = "0.6"
-bitvec = "1.0.1"
-chrono = "0.4"
-derive_builder = "0.12.0"
-either = "1"
-futures = "0.3"
-itertools = "0.11"
-lazy_static = "1"
-murmur3 = "0.5.2"
-once_cell = "1"
-opendal = "0.41"
-ordered-float = "4.0.0"
-rust_decimal = "1.31.0"
+iceberg = { path = "../../iceberg" }
+reqwest = { version = "^0.11", features = ["json"] }
serde = { version = "^1.0", features = ["rc"] }
-serde_bytes = "0.11.8"
serde_derive = "^1.0"
serde_json = "^1.0"
-serde_repr = "0.1.16"
-url = "2"
-uuid = "1.4.1"
+typed-builder = "^0.17"
+urlencoding = "2"
[dev-dependencies]
-pretty_assertions = "1.4.0"
-tempdir = "0.3"
+mockito = "^1"
tokio = { version = "1", features = ["macros"] }
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
new file mode 100644
index 0000000..f267437
--- /dev/null
+++ b/crates/catalog/rest/src/catalog.rs
@@ -0,0 +1,887 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains rest catalog implementation.
+
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
+use reqwest::{Client, Request};
+use serde::de::DeserializeOwned;
+use typed_builder::TypedBuilder;
+use urlencoding::encode;
+
+use iceberg::table::Table;
+use iceberg::Result;
+use iceberg::{
+ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit,
TableCreation, TableIdent,
+};
+
+use self::_serde::{
+ CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse,
ListTableResponse,
+ NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
+};
+
+const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
+const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
+const PATH_V1: &str = "v1";
+
+/// Rest catalog configuration.
+#[derive(Debug, TypedBuilder)]
+pub struct RestCatalogConfig {
+ uri: String,
+ #[builder(default, setter(strip_option))]
+ warehouse: Option<String>,
+
+ #[builder(default)]
+ props: HashMap<String, String>,
+}
+
+impl RestCatalogConfig {
+ fn config_endpoint(&self) -> String {
+ [&self.uri, PATH_V1, "config"].join("/")
+ }
+
+ fn namespaces_endpoint(&self) -> String {
+ [&self.uri, PATH_V1, "namespaces"].join("/")
+ }
+
+ fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String {
+ [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/")
+ }
+
+ fn tables_endpoint(&self, ns: &NamespaceIdent) -> String {
+ [
+ &self.uri,
+ PATH_V1,
+ "namespaces",
+ &ns.encode_in_url(),
+ "tables",
+ ]
+ .join("/")
+ }
+
+ fn rename_table_endpoint(&self) -> String {
+ [&self.uri, PATH_V1, "tables", "rename"].join("/")
+ }
+
+ fn table_endpoint(&self, table: &TableIdent) -> String {
+ [
+ &self.uri,
+ PATH_V1,
+ "namespaces",
+ &table.namespace.encode_in_url(),
+ "tables",
+ encode(&table.name).as_ref(),
+ ]
+ .join("/")
+ }
+
+ fn try_create_rest_client(&self) -> Result<HttpClient> {
+ //TODO: We will add oauth, ssl config, sigv4 later
+ let headers = HeaderMap::from_iter([
+ (
+ header::CONTENT_TYPE,
+ HeaderValue::from_static("application/json"),
+ ),
+ (
+ HeaderName::from_static("x-client-version"),
+ HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION),
+ ),
+ (
+ header::USER_AGENT,
+ HeaderValue::from_str(&format!("iceberg-rs/{}",
CARGO_PKG_VERSION)).unwrap(),
+ ),
+ ]);
+
+ Ok(HttpClient(
+ Client::builder().default_headers(headers).build()?,
+ ))
+ }
+}
+
+struct HttpClient(Client);
+
+impl HttpClient {
+ async fn query<
+ R: DeserializeOwned,
+ E: DeserializeOwned + Into<Error>,
+ const SUCCESS_CODE: u16,
+ >(
+ &self,
+ request: Request,
+ ) -> Result<R> {
+ let resp = self.0.execute(request).await?;
+
+ if resp.status().as_u16() == SUCCESS_CODE {
+ let text = resp.bytes().await?;
+ Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?)
+ } else {
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+
+ async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE:
u16>(
+ &self,
+ request: Request,
+ ) -> Result<()> {
+ let resp = self.0.execute(request).await?;
+
+ if resp.status().as_u16() == SUCCESS_CODE {
+ Ok(())
+ } else {
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+}
+
+/// Rest catalog implementation.
+pub struct RestCatalog {
+ config: RestCatalogConfig,
+ client: HttpClient,
+}
+
+#[async_trait]
+impl Catalog for RestCatalog {
+ /// List namespaces from table.
+ async fn list_namespaces(
+ &self,
+ parent: Option<&NamespaceIdent>,
+ ) -> Result<Vec<NamespaceIdent>> {
+ let mut request = self.client.0.get(self.config.namespaces_endpoint());
+ if let Some(ns) = parent {
+ request = request.query(&[("parent", ns.encode_in_url())]);
+ }
+
+ let resp = self
+ .client
+ .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
+ .await?;
+
+ resp.namespaces
+ .into_iter()
+ .map(NamespaceIdent::from_vec)
+ .collect::<Result<Vec<NamespaceIdent>>>()
+ }
+
+ /// Create a new namespace inside the catalog.
+ async fn create_namespace(
+ &self,
+ namespace: &NamespaceIdent,
+ properties: HashMap<String, String>,
+ ) -> Result<Namespace> {
+ let request = self
+ .client
+ .0
+ .post(self.config.namespaces_endpoint())
+ .json(&NamespaceSerde {
+ namespace: namespace.as_ref().clone(),
+ properties: Some(properties),
+ })
+ .build()?;
+
+ let resp = self
+ .client
+ .query::<NamespaceSerde, ErrorModel, OK>(request)
+ .await?;
+
+ Namespace::try_from(resp)
+ }
+
+ /// Get a namespace information from the catalog.
+ async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
+ let request = self
+ .client
+ .0
+ .get(self.config.namespace_endpoint(namespace))
+ .build()?;
+
+ let resp = self
+ .client
+ .query::<NamespaceSerde, ErrorModel, OK>(request)
+ .await?;
+ Namespace::try_from(resp)
+ }
+
+ /// Update a namespace inside the catalog.
+ ///
+ /// # Behavior
+ ///
+ /// The properties must be the full set of namespace.
+ async fn update_namespace(
+ &self,
+ _namespace: &NamespaceIdent,
+ _properties: HashMap<String, String>,
+ ) -> Result<()> {
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Updating namespace not supported yet!",
+ ))
+ }
+
+ async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
+ let request = self
+ .client
+ .0
+ .head(self.config.namespace_endpoint(ns))
+ .build()?;
+
+ self.client
+ .execute::<ErrorModel, NO_CONTENT>(request)
+ .await
+ .map(|_| true)
+ }
+
+ /// Drop a namespace from the catalog.
+ async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+ let request = self
+ .client
+ .0
+ .delete(self.config.namespace_endpoint(namespace))
+ .build()?;
+
+ self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ }
+
+ /// List tables from namespace.
+ async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
+ let request = self
+ .client
+ .0
+ .get(self.config.tables_endpoint(namespace))
+ .build()?;
+
+ let resp = self
+ .client
+ .query::<ListTableResponse, ErrorModel, OK>(request)
+ .await?;
+
+ Ok(resp.identifiers)
+ }
+
+ /// Create a new table inside the namespace.
+ async fn create_table(
+ &self,
+ _namespace: &NamespaceIdent,
+ _creation: TableCreation,
+ ) -> Result<Table> {
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Creating table not supported yet!",
+ ))
+ }
+
+ /// Load table from the catalog.
+ async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Creating table not supported yet!",
+ ))
+ }
+
+ /// Drop a table from the catalog.
+ async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ let request = self
+ .client
+ .0
+ .delete(self.config.table_endpoint(table))
+ .build()?;
+
+ self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ }
+
+ /// Check if a table exists in the catalog.
+ async fn stat_table(&self, table: &TableIdent) -> Result<bool> {
+ let request = self
+ .client
+ .0
+ .head(self.config.table_endpoint(table))
+ .build()?;
+
+ self.client
+ .execute::<ErrorModel, NO_CONTENT>(request)
+ .await
+ .map(|_| true)
+ }
+
+ /// Rename a table in the catalog.
+ async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
+ let request = self
+ .client
+ .0
+ .post(self.config.rename_table_endpoint())
+ .json(&RenameTableRequest {
+ source: src.clone(),
+ destination: dest.clone(),
+ })
+ .build()?;
+
+ self.client.execute::<ErrorModel, NO_CONTENT>(request).await
+ }
+
+ /// Update a table to the catalog.
+ async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) ->
Result<Table> {
+ todo!()
+ }
+
+ /// Update multiple tables to the catalog as an atomic operation.
+ async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) ->
Result<()> {
+ todo!()
+ }
+}
+
+impl RestCatalog {
+ /// Creates a rest catalog from config.
+ pub async fn new(config: RestCatalogConfig) -> Result<Self> {
+ let mut catalog = Self {
+ client: config.try_create_rest_client()?,
+ config,
+ };
+
+ catalog.update_config().await?;
+ catalog.client = catalog.config.try_create_rest_client()?;
+
+ Ok(catalog)
+ }
+
+ async fn update_config(&mut self) -> Result<()> {
+ let mut request = self.client.0.get(self.config.config_endpoint());
+
+ if let Some(warehouse_location) = &self.config.warehouse {
+ request = request.query(&[("warehouse", warehouse_location)]);
+ }
+
+ let config = self
+ .client
+ .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
+ .await?;
+
+ let mut props = config.defaults;
+ props.extend(self.config.props.clone());
+ props.extend(config.overrides);
+
+ self.config.props = props;
+
+ Ok(())
+ }
+}
+
+/// Requests and responses for rest api.
+mod _serde {
+ use std::collections::HashMap;
+
+ use serde_derive::{Deserialize, Serialize};
+
+ use iceberg::{Error, ErrorKind, Namespace, TableIdent};
+
+ pub(super) const OK: u16 = 200u16;
+ pub(super) const NO_CONTENT: u16 = 204u16;
+ #[derive(Clone, Debug, Serialize, Deserialize)]
+ pub(super) struct CatalogConfig {
+ pub(super) overrides: HashMap<String, String>,
+ pub(super) defaults: HashMap<String, String>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct ErrorResponse {
+ error: ErrorModel,
+ }
+
+ impl From<ErrorResponse> for Error {
+ fn from(resp: ErrorResponse) -> Error {
+ resp.error.into()
+ }
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct ErrorModel {
+ pub(super) message: String,
+ pub(super) r#type: String,
+ pub(super) code: u16,
+ pub(super) stack: Option<Vec<String>>,
+ }
+
+ impl From<ErrorModel> for Error {
+ fn from(value: ErrorModel) -> Self {
+ let mut error = Error::new(ErrorKind::DataInvalid, value.message)
+ .with_context("type", value.r#type)
+ .with_context("code", format!("{}", value.code));
+
+ if let Some(stack) = value.stack {
+ error = error.with_context("stack", stack.join("\n"));
+ }
+
+ error
+ }
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct OAuthError {
+ pub(super) error: String,
+ pub(super) error_description: Option<String>,
+ pub(super) error_uri: Option<String>,
+ }
+
+ impl From<OAuthError> for Error {
+ fn from(value: OAuthError) -> Self {
+ let mut error = Error::new(
+ ErrorKind::DataInvalid,
+ format!("OAuthError: {}", value.error),
+ );
+
+ if let Some(desc) = value.error_description {
+ error = error.with_context("description", desc);
+ }
+
+ if let Some(uri) = value.error_uri {
+ error = error.with_context("uri", uri);
+ }
+
+ error
+ }
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct NamespaceSerde {
+ pub(super) namespace: Vec<String>,
+ pub(super) properties: Option<HashMap<String, String>>,
+ }
+
+ impl TryFrom<NamespaceSerde> for super::Namespace {
+ type Error = Error;
+ fn try_from(value: NamespaceSerde) -> std::result::Result<Self,
Self::Error> {
+ Ok(super::Namespace::with_properties(
+ super::NamespaceIdent::from_vec(value.namespace)?,
+ value.properties.unwrap_or_default(),
+ ))
+ }
+ }
+
+ impl From<&Namespace> for NamespaceSerde {
+ fn from(value: &Namespace) -> Self {
+ Self {
+ namespace: value.name().as_ref().clone(),
+ properties: Some(value.properties().clone()),
+ }
+ }
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct ListNamespaceResponse {
+ pub(super) namespaces: Vec<Vec<String>>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct UpdateNamespacePropsRequest {
+ removals: Option<Vec<String>>,
+ updates: Option<HashMap<String, String>>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct UpdateNamespacePropsResponse {
+ updated: Vec<String>,
+ removed: Vec<String>,
+ missing: Option<Vec<String>>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct ListTableResponse {
+ pub(super) identifiers: Vec<TableIdent>,
+ }
+
+ #[derive(Debug, Serialize, Deserialize)]
+ pub(super) struct RenameTableRequest {
+ pub(super) source: TableIdent,
+ pub(super) destination: TableIdent,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use mockito::{Mock, Server, ServerGuard};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_update_config() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = server
+ .mock("GET", "/v1/config")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "overrides": {
+ "warehouse": "s3://iceberg-catalog"
+ },
+ "defaults": {}
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.config.props.get("warehouse"),
+ Some(&"s3://iceberg-catalog".to_string())
+ );
+
+ config_mock.assert_async().await;
+ }
+
+ async fn create_config_mock(server: &mut ServerGuard) -> Mock {
+ server
+ .mock("GET", "/v1/config")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "overrides": {
+ "warehouse": "s3://iceberg-catalog"
+ },
+ "defaults": {}
+ }"#,
+ )
+ .create_async()
+ .await
+ }
+
+ #[tokio::test]
+ async fn test_list_namespace() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let list_ns_mock = server
+ .mock("GET", "/v1/namespaces")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns1", "ns11"],
+ ["ns2"]
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+ let expected_ns = vec![
+ NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+ ];
+
+ assert_eq!(expected_ns, namespaces);
+
+ config_mock.assert_async().await;
+ list_ns_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_create_namespace() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let create_ns_mock = server
+ .mock("POST", "/v1/namespaces")
+ .with_body(
+ r#"{
+ "namespace": [ "ns1", "ns11"],
+ "properties" : {
+ "key1": "value1"
+ }
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let namespaces = catalog
+ .create_namespace(
+ &NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap(),
+ HashMap::from([("key1".to_string(), "value1".to_string())]),
+ )
+ .await
+ .unwrap();
+
+ let expected_ns = Namespace::with_properties(
+ NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap(),
+ HashMap::from([("key1".to_string(), "value1".to_string())]),
+ );
+
+ assert_eq!(expected_ns, namespaces);
+
+ config_mock.assert_async().await;
+ create_ns_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_get_namespace() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let get_ns_mock = server
+ .mock("GET", "/v1/namespaces/ns1")
+ .with_body(
+ r#"{
+ "namespace": [ "ns1"],
+ "properties" : {
+ "key1": "value1"
+ }
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let namespaces = catalog
+ .get_namespace(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap();
+
+ let expected_ns = Namespace::with_properties(
+ NamespaceIdent::new("ns1".to_string()),
+ HashMap::from([("key1".to_string(), "value1".to_string())]),
+ );
+
+ assert_eq!(expected_ns, namespaces);
+
+ config_mock.assert_async().await;
+ get_ns_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn check_namespace_exists() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let get_ns_mock = server
+ .mock("HEAD", "/v1/namespaces/ns1")
+ .with_status(204)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ assert!(catalog
+ .namespace_exists(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap());
+
+ config_mock.assert_async().await;
+ get_ns_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_drop_namespace() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let drop_ns_mock = server
+ .mock("DELETE", "/v1/namespaces/ns1")
+ .with_status(204)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ catalog
+ .drop_namespace(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap();
+
+ config_mock.assert_async().await;
+ drop_ns_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_list_tables() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let list_tables_mock = server
+ .mock("GET", "/v1/namespaces/ns1/tables")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table1"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table2"
+ }
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ let tables = catalog
+ .list_tables(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap();
+
+ let expected_tables = vec![
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table1".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table2".to_string()),
+ ];
+
+ assert_eq!(tables, expected_tables);
+
+ config_mock.assert_async().await;
+ list_tables_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_drop_tables() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let delete_table_mock = server
+ .mock("DELETE", "/v1/namespaces/ns1/tables/table1")
+ .with_status(204)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ catalog
+ .drop_table(&TableIdent::new(
+ NamespaceIdent::new("ns1".to_string()),
+ "table1".to_string(),
+ ))
+ .await
+ .unwrap();
+
+ config_mock.assert_async().await;
+ delete_table_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_check_table_exists() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let check_table_exists_mock = server
+ .mock("HEAD", "/v1/namespaces/ns1/tables/table1")
+ .with_status(204)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ assert!(catalog
+ .stat_table(&TableIdent::new(
+ NamespaceIdent::new("ns1".to_string()),
+ "table1".to_string(),
+ ))
+ .await
+ .unwrap());
+
+ config_mock.assert_async().await;
+ check_table_exists_mock.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_rename_table() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let rename_table_mock = server
+ .mock("POST", "/v1/tables/rename")
+ .with_status(204)
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
+ .await
+ .unwrap();
+
+ catalog
+ .rename_table(
+ &TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table1".to_string()),
+ &TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table2".to_string()),
+ )
+ .await
+ .unwrap();
+
+ config_mock.assert_async().await;
+ rename_table_mock.assert_async().await;
+ }
+}
diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs
new file mode 100644
index 0000000..023fe7a
--- /dev/null
+++ b/crates/catalog/rest/src/lib.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg REST API implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+pub use catalog::*;
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 9f2d4a8..c7caa24 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -41,10 +41,12 @@ either = "1"
futures = "0.3"
itertools = "0.11"
lazy_static = "1"
+log = "^0.4"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.41"
ordered-float = "4.0.0"
+reqwest = { version = "^0.11", features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
serde_bytes = "0.11.8"
@@ -52,6 +54,7 @@ serde_derive = "^1.0"
serde_json = "^1.0"
serde_repr = "0.1.16"
url = "2"
+urlencoding = "2"
uuid = "1.4.1"
[dev-dependencies]
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index f50d78c..d58eaa2 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -17,11 +17,15 @@
//! Catalog API for Apache Iceberg
+use serde_derive::{Deserialize, Serialize};
+use urlencoding::encode;
+
use crate::spec::{PartitionSpec, Schema, SortOrder};
use crate::table::Table;
-use crate::Result;
+use crate::{Error, ErrorKind, Result};
use async_trait::async_trait;
use std::collections::HashMap;
+use std::ops::Deref;
/// The catalog API for Iceberg Rust.
#[async_trait]
@@ -40,6 +44,9 @@ pub trait Catalog {
/// Get a namespace information from the catalog.
async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace>;
+ /// Check if namespace exists in catalog.
+ async fn namespace_exists(&self, namesace: &NamespaceIdent) ->
Result<bool>;
+
/// Update a namespace inside the catalog.
///
/// # Behavior
@@ -88,6 +95,8 @@ pub trait Catalog {
/// The namespace identifier is a list of strings, where each string is a
/// component of the namespace. It's catalog implementer's responsibility to
/// handle the namespace identifier correctly.
+
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct NamespaceIdent(Vec<String>);
impl NamespaceIdent {
@@ -97,8 +106,24 @@ impl NamespaceIdent {
}
/// Create a multi-level namespace identifier from vector.
- pub fn from_vec(names: Vec<String>) -> Self {
- Self(names)
+ pub fn from_vec(names: Vec<String>) -> Result<Self> {
+ if names.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Namespace identifier can't be empty!",
+ ));
+ }
+ Ok(Self(names))
+ }
+
+ /// Returns url encoded format.
+ pub fn encode_in_url(&self) -> String {
+ encode(&self.as_ref().join("\u{1F}")).to_string()
+ }
+
+ /// Returns inner strings.
+ pub fn inner(self) -> Vec<String> {
+ self.0
}
}
@@ -108,7 +133,16 @@ impl AsRef<Vec<String>> for NamespaceIdent {
}
}
+impl Deref for NamespaceIdent {
+ type Target = [String];
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
/// Namespace represents a namespace in the catalog.
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Namespace {
name: NamespaceIdent,
properties: HashMap<String, String>,
@@ -137,9 +171,12 @@ impl Namespace {
}
/// TableIdent represents the identifier of a table in the catalog.
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableIdent {
- namespace: NamespaceIdent,
- name: String,
+ /// Namespace of the table.
+ pub namespace: NamespaceIdent,
+ /// Table name.
+ pub name: String,
}
impl TableIdent {
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index e4ae576..8948c7d 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -307,6 +307,18 @@ define_from_err!(
"Failed to parse url"
);
+define_from_err!(
+ reqwest::Error,
+ ErrorKind::Unexpected,
+ "Failed to execute http request"
+);
+
+define_from_err!(
+ serde_json::Error,
+ ErrorKind::DataInvalid,
+ "Failed to parse json string"
+);
+
/// Helper macro to check arguments.
///
///
diff --git a/crates/iceberg/src/rest.rs b/crates/iceberg/src/rest.rs
new file mode 100644
index 0000000..f20e961
--- /dev/null
+++ b/crates/iceberg/src/rest.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provide rest catalog implementation.
+
+pub struct RestCatalog {
+ url: String,
+}