This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c3b5364 refactor(catalogs/rest): Split user config and runtime config
(#431)
c3b5364 is described below
commit c3b5364ef0789632e77602cec1bbf6f9c8a17739
Author: Xuanwo <[email protected]>
AuthorDate: Thu Jul 4 12:50:50 2024 +0800
refactor(catalogs/rest): Split user config and runtime config (#431)
* refactor(catalogs/rest): Split user config and runtime config
Signed-off-by: Xuanwo <[email protected]>
* Sort cargo
Signed-off-by: Xuanwo <[email protected]>
* Fix unit tests
Signed-off-by: Xuanwo <[email protected]>
* Remove default feature of tokio
Signed-off-by: Xuanwo <[email protected]>
* return error here
Signed-off-by: Xuanwo <[email protected]>
* Return error if cred doesn't exist
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.toml | 12 +-
crates/catalog/rest/Cargo.toml | 2 +
crates/catalog/rest/src/catalog.rs | 762 +++++++++++--------------
crates/catalog/rest/src/client.rs | 178 +++++-
crates/catalog/rest/src/lib.rs | 2 +
crates/catalog/rest/src/types.rs | 189 ++++++
crates/catalog/rest/tests/rest_catalog_test.rs | 2 +-
crates/examples/src/rest_catalog_namespace.rs | 2 +-
crates/examples/src/rest_catalog_table.rs | 2 +-
9 files changed, 688 insertions(+), 463 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 0c2bf9c..106aa8a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,11 +18,11 @@
[workspace]
resolver = "2"
members = [
- "crates/catalog/*",
- "crates/examples",
- "crates/iceberg",
- "crates/integrations/*",
- "crates/test_utils",
+ "crates/catalog/*",
+ "crates/examples",
+ "crates/iceberg",
+ "crates/integrations/*",
+ "crates/test_utils",
]
[workspace.package]
@@ -81,7 +81,7 @@ serde_json = "^1.0"
serde_repr = "0.1.16"
serde_with = "3.4.0"
tempfile = "3.8"
-tokio = { version = "1", features = ["macros"] }
+tokio = { version = "1", default-features = false }
typed-builder = "^0.18"
url = "2"
urlencoding = "2"
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 7abe9c8..6492340 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -32,6 +32,7 @@ keywords = ["iceberg", "rest", "catalog"]
# async-trait = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
+http = "1.1.0"
iceberg = { workspace = true }
itertools = { workspace = true }
log = "0.4.20"
@@ -39,6 +40,7 @@ reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
+tokio = { workspace = true, features = ["sync"] }
typed-builder = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 2e5b1b1..a43862a 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -24,13 +24,16 @@ use async_trait::async_trait;
use itertools::Itertools;
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use reqwest::{Method, StatusCode, Url};
+use tokio::sync::OnceCell;
use typed_builder::TypedBuilder;
use urlencoding::encode;
-use crate::catalog::_serde::{
- CommitTableRequest, CommitTableResponse, CreateTableRequest,
LoadTableResponse,
-};
use crate::client::HttpClient;
+use crate::types::{
+ CatalogConfig, CommitTableRequest, CommitTableResponse,
CreateTableRequest, ErrorResponse,
+ ListNamespaceResponse, ListTableResponse, LoadTableResponse,
NamespaceSerde,
+ RenameTableRequest, NO_CONTENT, OK,
+};
use iceberg::io::FileIO;
use iceberg::table::Table;
use iceberg::Result;
@@ -38,17 +41,12 @@ use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit,
TableCreation, TableIdent,
};
-use self::_serde::{
- CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse,
NamespaceSerde,
- RenameTableRequest, TokenResponse, NO_CONTENT, OK,
-};
-
const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const PATH_V1: &str = "v1";
/// Rest catalog configuration.
-#[derive(Debug, TypedBuilder)]
+#[derive(Clone, Debug, TypedBuilder)]
pub struct RestCatalogConfig {
uri: String,
#[builder(default, setter(strip_option))]
@@ -71,7 +69,7 @@ impl RestCatalogConfig {
[&self.uri, PATH_V1, "config"].join("/")
}
- fn get_token_endpoint(&self) -> String {
+ pub(crate) fn get_token_endpoint(&self) -> String {
if let Some(auth_url) = self.props.get("rest.authorization-url") {
auth_url.to_string()
} else {
@@ -104,7 +102,41 @@ impl RestCatalogConfig {
])
}
- fn http_headers(&self) -> Result<HeaderMap> {
+ /// Get the token from the config.
+ ///
+ /// Client will use `token` to send requests if exists.
+ pub(crate) fn token(&self) -> Option<String> {
+ self.props.get("token").cloned()
+ }
+
+ /// Get the credentials from the config. Client will use `credential`
+ /// to fetch a new token if exists.
+ ///
+ /// ## Output
+ ///
+ /// - `None`: No credential is set.
+ /// - `Some(None, client_secret)`: No client_id is set, use client_secret
directly.
+ /// - `Some(Some(client_id), client_secret)`: Both client_id and
client_secret are set.
+ pub(crate) fn credential(&self) -> Option<(Option<String>, String)> {
+ let cred = self.props.get("credential")?;
+
+ match cred.split_once(':') {
+ Some((client_id, client_secret)) => {
+ Some((Some(client_id.to_string()), client_secret.to_string()))
+ }
+ None => Some((None, cred.to_string())),
+ }
+ }
+
+ /// Get the extra headers from config.
+ ///
+ /// We will include:
+ ///
+ /// - `content-type`
+ /// - `x-client-version`
+ /// - `user-agnet`
+ /// - all headers specified by `header.xxx` in props.
+ pub(crate) fn extra_headers(&self) -> Result<HeaderMap> {
let mut headers = HeaderMap::from_iter([
(
header::CONTENT_TYPE,
@@ -120,73 +152,160 @@ impl RestCatalogConfig {
),
]);
- if let Some(token) = self.props.get("token") {
+ for (key, value) in self
+ .props
+ .iter()
+ .filter(|(k, _)| k.starts_with("header."))
+ // The unwrap here is same since we are filtering the keys
+ .map(|(k, v)| (k.strip_prefix("header.").unwrap(), v))
+ {
headers.insert(
- header::AUTHORIZATION,
- HeaderValue::from_str(&format!("Bearer {token}")).map_err(|e| {
+ HeaderName::from_str(key).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
- "Invalid token received from catalog server!",
+ format!("Invalid header name: {key}"),
+ )
+ .with_source(e)
+ })?,
+ HeaderValue::from_str(value).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid header value: {value}"),
)
.with_source(e)
})?,
);
}
- for (key, value) in self.props.iter() {
- if let Some(stripped_key) = key.strip_prefix("header.") {
- // Avoid overwriting default headers
- if !headers.contains_key(stripped_key) {
- headers.insert(
- HeaderName::from_str(stripped_key).map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid header name:
{stripped_key}!"),
- )
- .with_source(e)
- })?,
- HeaderValue::from_str(value).map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Invalid header value: {value}!"),
- )
- .with_source(e)
- })?,
- );
- }
- }
- }
Ok(headers)
}
- fn try_create_rest_client(&self) -> Result<HttpClient> {
- // TODO: We will add ssl config, sigv4 later
- let headers = self.http_headers()?;
- HttpClient::try_create(headers)
- }
+ /// Get the optional oauth headers from the config.
+ pub(crate) fn extra_oauth_params(&self) -> HashMap<String, String> {
+ let mut params = HashMap::new();
- fn optional_oauth_params(&self) -> HashMap<&str, &str> {
- let mut optional_oauth_param = HashMap::new();
if let Some(scope) = self.props.get("scope") {
- optional_oauth_param.insert("scope", scope.as_str());
+ params.insert("scope".to_string(), scope.to_string());
} else {
- optional_oauth_param.insert("scope", "catalog");
+ params.insert("scope".to_string(), "catalog".to_string());
}
- let set_of_optional_params = ["audience", "resource"];
- for param_name in set_of_optional_params.iter() {
- if let Some(value) = self.props.get(*param_name) {
- optional_oauth_param.insert(param_name.to_owned(), value);
+
+ let optional_params = ["audience", "resource"];
+ for param_name in optional_params {
+ if let Some(value) = self.props.get(param_name) {
+ params.insert(param_name.to_string(), value.to_string());
}
}
- optional_oauth_param
+ params
+ }
+
+ /// Merge the config with the given config fetched from rest server.
+ pub(crate) fn merge_with_config(mut self, mut config: CatalogConfig) ->
Self {
+ if let Some(uri) = config.overrides.remove("uri") {
+ self.uri = uri;
+ }
+
+ let mut props = config.defaults;
+ props.extend(self.props);
+ props.extend(config.overrides);
+
+ self.props = props;
+ self
}
}
+#[derive(Debug)]
+struct RestContext {
+ client: HttpClient,
+
+ /// Runtime config is fetched from rest server and stored here.
+ ///
+ /// It's could be different from the user config.
+ config: RestCatalogConfig,
+}
+
+impl RestContext {}
+
/// Rest catalog implementation.
#[derive(Debug)]
pub struct RestCatalog {
- config: RestCatalogConfig,
- client: HttpClient,
+ /// User config is stored as-is and never be changed.
+ ///
+ /// It's could be different from the config fetched from the server and
used at runtime.
+ user_config: RestCatalogConfig,
+ ctx: OnceCell<RestContext>,
+}
+
+impl RestCatalog {
+ /// Creates a rest catalog from config.
+ pub fn new(config: RestCatalogConfig) -> Self {
+ Self {
+ user_config: config,
+ ctx: OnceCell::new(),
+ }
+ }
+
+ /// Get the context from the catalog.
+ async fn context(&self) -> Result<&RestContext> {
+ self.ctx
+ .get_or_try_init(|| async {
+ let catalog_config =
RestCatalog::load_config(&self.user_config).await?;
+ let config =
self.user_config.clone().merge_with_config(catalog_config);
+ let client = HttpClient::new(&config)?;
+
+ Ok(RestContext { config, client })
+ })
+ .await
+ }
+
+ /// Load the runtime config from the server by user_config.
+ ///
+ /// It's required for a rest catalog to update it's config after creation.
+ async fn load_config(user_config: &RestCatalogConfig) ->
Result<CatalogConfig> {
+ let client = HttpClient::new(user_config)?;
+
+ let mut request = client.request(Method::GET,
user_config.config_endpoint());
+
+ if let Some(warehouse_location) = &user_config.warehouse {
+ request = request.query(&[("warehouse", warehouse_location)]);
+ }
+
+ let config = client
+ .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
+ .await?;
+ Ok(config)
+ }
+
+ async fn load_file_io(
+ &self,
+ metadata_location: Option<&str>,
+ extra_config: Option<HashMap<String, String>>,
+ ) -> Result<FileIO> {
+ let mut props = self.context().await?.config.props.clone();
+ if let Some(config) = extra_config {
+ props.extend(config);
+ }
+
+ // If the warehouse is a logical identifier instead of a URL we don't
want
+ // to raise an exception
+ let warehouse_path = match
self.context().await?.config.warehouse.as_deref() {
+ Some(url) if Url::parse(url).is_ok() => Some(url),
+ Some(_) => None,
+ None => None,
+ };
+
+ let file_io = match warehouse_path.or(metadata_location) {
+ Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
+ None => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unable to load file io, neither warehouse nor metadata
location is set!",
+ ))?
+ }
+ };
+
+ Ok(file_io)
+ }
}
#[async_trait]
@@ -196,14 +315,17 @@ impl Catalog for RestCatalog {
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
- let mut request = self
- .client
- .request(Method::GET, self.config.namespaces_endpoint());
+ let mut request = self.context().await?.client.request(
+ Method::GET,
+ self.context().await?.config.namespaces_endpoint(),
+ );
if let Some(ns) = parent {
request = request.query(&[("parent", ns.encode_in_url())]);
}
let resp = self
+ .context()
+ .await?
.client
.query::<ListNamespaceResponse, ErrorResponse,
OK>(request.build()?)
.await?;
@@ -221,8 +343,13 @@ impl Catalog for RestCatalog {
properties: HashMap<String, String>,
) -> Result<Namespace> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::POST, self.config.namespaces_endpoint())
+ .request(
+ Method::POST,
+ self.context().await?.config.namespaces_endpoint(),
+ )
.json(&NamespaceSerde {
namespace: namespace.as_ref().clone(),
properties: Some(properties),
@@ -230,6 +357,8 @@ impl Catalog for RestCatalog {
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;
@@ -240,11 +369,18 @@ impl Catalog for RestCatalog {
/// Get a namespace information from the catalog.
async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::GET, self.config.namespace_endpoint(namespace))
+ .request(
+ Method::GET,
+ self.context().await?.config.namespace_endpoint(namespace),
+ )
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;
@@ -269,11 +405,18 @@ impl Catalog for RestCatalog {
async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::HEAD, self.config.namespace_endpoint(ns))
+ .request(
+ Method::HEAD,
+ self.context().await?.config.namespace_endpoint(ns),
+ )
.build()?;
- self.client
+ self.context()
+ .await?
+ .client
.do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
StatusCode::NO_CONTENT => Some(true),
StatusCode::NOT_FOUND => Some(false),
@@ -285,11 +428,18 @@ impl Catalog for RestCatalog {
/// Drop a namespace from the catalog.
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::DELETE, self.config.namespace_endpoint(namespace))
+ .request(
+ Method::DELETE,
+ self.context().await?.config.namespace_endpoint(namespace),
+ )
.build()?;
- self.client
+ self.context()
+ .await?
+ .client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}
@@ -297,11 +447,18 @@ impl Catalog for RestCatalog {
/// List tables from namespace.
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::GET, self.config.tables_endpoint(namespace))
+ .request(
+ Method::GET,
+ self.context().await?.config.tables_endpoint(namespace),
+ )
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<ListTableResponse, ErrorResponse, OK>(request)
.await?;
@@ -318,8 +475,13 @@ impl Catalog for RestCatalog {
let table_ident = TableIdent::new(namespace.clone(),
creation.name.clone());
let request = self
+ .context()
+ .await?
.client
- .request(Method::POST, self.config.tables_endpoint(namespace))
+ .request(
+ Method::POST,
+ self.context().await?.config.tables_endpoint(namespace),
+ )
.json(&CreateTableRequest {
name: creation.name,
location: creation.location,
@@ -337,11 +499,15 @@ impl Catalog for RestCatalog {
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<LoadTableResponse, ErrorResponse, OK>(request)
.await?;
- let file_io = self.load_file_io(resp.metadata_location.as_deref(),
resp.config)?;
+ let file_io = self
+ .load_file_io(resp.metadata_location.as_deref(), resp.config)
+ .await?;
let table = Table::builder()
.identifier(table_ident)
@@ -361,16 +527,25 @@ impl Catalog for RestCatalog {
/// Load table from the catalog.
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::GET, self.config.table_endpoint(table))
+ .request(
+ Method::GET,
+ self.context().await?.config.table_endpoint(table),
+ )
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<LoadTableResponse, ErrorResponse, OK>(request)
.await?;
- let file_io = self.load_file_io(resp.metadata_location.as_deref(),
resp.config)?;
+ let file_io = self
+ .load_file_io(resp.metadata_location.as_deref(), resp.config)
+ .await?;
let table_builder = Table::builder()
.identifier(table.clone())
@@ -387,11 +562,18 @@ impl Catalog for RestCatalog {
/// Drop a table from the catalog.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::DELETE, self.config.table_endpoint(table))
+ .request(
+ Method::DELETE,
+ self.context().await?.config.table_endpoint(table),
+ )
.build()?;
- self.client
+ self.context()
+ .await?
+ .client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}
@@ -399,11 +581,18 @@ impl Catalog for RestCatalog {
/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::HEAD, self.config.table_endpoint(table))
+ .request(
+ Method::HEAD,
+ self.context().await?.config.table_endpoint(table),
+ )
.build()?;
- self.client
+ self.context()
+ .await?
+ .client
.do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
StatusCode::NO_CONTENT => Some(true),
StatusCode::NOT_FOUND => Some(false),
@@ -415,15 +604,22 @@ impl Catalog for RestCatalog {
/// Rename a table in the catalog.
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
let request = self
+ .context()
+ .await?
.client
- .request(Method::POST, self.config.rename_table_endpoint())
+ .request(
+ Method::POST,
+ self.context().await?.config.rename_table_endpoint(),
+ )
.json(&RenameTableRequest {
source: src.clone(),
destination: dest.clone(),
})
.build()?;
- self.client
+ self.context()
+ .await?
+ .client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}
@@ -431,10 +627,15 @@ impl Catalog for RestCatalog {
/// Update table.
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let request = self
+ .context()
+ .await?
.client
.request(
Method::POST,
- self.config.table_endpoint(commit.identifier()),
+ self.context()
+ .await?
+ .config
+ .table_endpoint(commit.identifier()),
)
.json(&CommitTableRequest {
identifier: commit.identifier().clone(),
@@ -444,11 +645,15 @@ impl Catalog for RestCatalog {
.build()?;
let resp = self
+ .context()
+ .await?
.client
.query::<CommitTableResponse, ErrorResponse, OK>(request)
.await?;
- let file_io = self.load_file_io(Some(&resp.metadata_location), None)?;
+ let file_io = self
+ .load_file_io(Some(&resp.metadata_location), None)
+ .await?;
Ok(Table::builder()
.identifier(commit.identifier().clone())
.file_io(file_io)
@@ -458,295 +663,6 @@ impl Catalog for RestCatalog {
}
}
-impl RestCatalog {
- /// Creates a rest catalog from config.
- pub async fn new(config: RestCatalogConfig) -> Result<Self> {
- let mut catalog = Self {
- client: config.try_create_rest_client()?,
- config,
- };
- catalog.fetch_access_token().await?;
- catalog.client = catalog.config.try_create_rest_client()?;
- catalog.update_config().await?;
- catalog.client = catalog.config.try_create_rest_client()?;
-
- Ok(catalog)
- }
-
- async fn fetch_access_token(&mut self) -> Result<()> {
- if self.config.props.contains_key("token") {
- return Ok(());
- }
- if let Some(credential) = self.config.props.get("credential") {
- let (client_id, client_secret) = if credential.contains(':') {
- let (client_id, client_secret) =
credential.split_once(':').unwrap();
- (Some(client_id), client_secret)
- } else {
- (None, credential.as_str())
- };
- let mut params = HashMap::with_capacity(4);
- params.insert("grant_type", "client_credentials");
- if let Some(client_id) = client_id {
- params.insert("client_id", client_id);
- }
- params.insert("client_secret", client_secret);
- let optional_oauth_params = self.config.optional_oauth_params();
- params.extend(optional_oauth_params);
- let req = self
- .client
- .request(Method::POST, self.config.get_token_endpoint())
- .form(¶ms)
- .build()?;
- let res = self
- .client
- .query::<TokenResponse, ErrorResponse, OK>(req)
- .await
- .map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to fetch access token from catalog server!",
- )
- .with_source(e)
- })?;
- let token = res.access_token;
- self.config.props.insert("token".to_string(), token);
- }
-
- Ok(())
- }
-
- async fn update_config(&mut self) -> Result<()> {
- let mut request = self
- .client
- .request(Method::GET, self.config.config_endpoint());
-
- if let Some(warehouse_location) = &self.config.warehouse {
- request = request.query(&[("warehouse", warehouse_location)]);
- }
-
- let mut config = self
- .client
- .query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
- .await?;
-
- let mut props = config.defaults;
- props.extend(self.config.props.clone());
- if let Some(uri) = config.overrides.remove("uri") {
- self.config.uri = uri;
- }
- props.extend(config.overrides);
-
- self.config.props = props;
-
- Ok(())
- }
-
- fn load_file_io(
- &self,
- metadata_location: Option<&str>,
- extra_config: Option<HashMap<String, String>>,
- ) -> Result<FileIO> {
- let mut props = self.config.props.clone();
- if let Some(config) = extra_config {
- props.extend(config);
- }
-
- // If the warehouse is a logical identifier instead of a URL we don't
want
- // to raise an exception
- let warehouse_path = match self.config.warehouse.as_deref() {
- Some(url) if Url::parse(url).is_ok() => Some(url),
- Some(_) => None,
- None => None,
- };
-
- let file_io = match warehouse_path.or(metadata_location) {
- Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
- None => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- "Unable to load file io, neither warehouse nor metadata
location is set!",
- ))?
- }
- };
-
- Ok(file_io)
- }
-}
-
-/// Requests and responses for rest api.
-mod _serde {
- use std::collections::HashMap;
-
- use serde_derive::{Deserialize, Serialize};
-
- use iceberg::spec::{Schema, SortOrder, TableMetadata,
UnboundPartitionSpec};
- use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement,
TableUpdate};
-
- pub(super) const OK: u16 = 200u16;
- pub(super) const NO_CONTENT: u16 = 204u16;
-
- #[derive(Clone, Debug, Serialize, Deserialize)]
- pub(super) struct CatalogConfig {
- pub(super) overrides: HashMap<String, String>,
- pub(super) defaults: HashMap<String, String>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct ErrorResponse {
- error: ErrorModel,
- }
-
- impl From<ErrorResponse> for Error {
- fn from(resp: ErrorResponse) -> Error {
- resp.error.into()
- }
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct ErrorModel {
- pub(super) message: String,
- pub(super) r#type: String,
- pub(super) code: u16,
- pub(super) stack: Option<Vec<String>>,
- }
-
- impl From<ErrorModel> for Error {
- fn from(value: ErrorModel) -> Self {
- let mut error = Error::new(ErrorKind::DataInvalid, value.message)
- .with_context("type", value.r#type)
- .with_context("code", format!("{}", value.code));
-
- if let Some(stack) = value.stack {
- error = error.with_context("stack", stack.join("\n"));
- }
-
- error
- }
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct OAuthError {
- pub(super) error: String,
- pub(super) error_description: Option<String>,
- pub(super) error_uri: Option<String>,
- }
-
- impl From<OAuthError> for Error {
- fn from(value: OAuthError) -> Self {
- let mut error = Error::new(
- ErrorKind::DataInvalid,
- format!("OAuthError: {}", value.error),
- );
-
- if let Some(desc) = value.error_description {
- error = error.with_context("description", desc);
- }
-
- if let Some(uri) = value.error_uri {
- error = error.with_context("uri", uri);
- }
-
- error
- }
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct TokenResponse {
- pub(super) access_token: String,
- pub(super) token_type: String,
- pub(super) expires_in: Option<u64>,
- pub(super) issued_token_type: Option<String>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct NamespaceSerde {
- pub(super) namespace: Vec<String>,
- pub(super) properties: Option<HashMap<String, String>>,
- }
-
- impl TryFrom<NamespaceSerde> for super::Namespace {
- type Error = Error;
- fn try_from(value: NamespaceSerde) -> std::result::Result<Self,
Self::Error> {
- Ok(super::Namespace::with_properties(
- super::NamespaceIdent::from_vec(value.namespace)?,
- value.properties.unwrap_or_default(),
- ))
- }
- }
-
- impl From<&Namespace> for NamespaceSerde {
- fn from(value: &Namespace) -> Self {
- Self {
- namespace: value.name().as_ref().clone(),
- properties: Some(value.properties().clone()),
- }
- }
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct ListNamespaceResponse {
- pub(super) namespaces: Vec<Vec<String>>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct UpdateNamespacePropsRequest {
- removals: Option<Vec<String>>,
- updates: Option<HashMap<String, String>>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct UpdateNamespacePropsResponse {
- updated: Vec<String>,
- removed: Vec<String>,
- missing: Option<Vec<String>>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct ListTableResponse {
- pub(super) identifiers: Vec<TableIdent>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct RenameTableRequest {
- pub(super) source: TableIdent,
- pub(super) destination: TableIdent,
- }
-
- #[derive(Debug, Deserialize)]
- #[serde(rename_all = "kebab-case")]
- pub(super) struct LoadTableResponse {
- pub(super) metadata_location: Option<String>,
- pub(super) metadata: TableMetadata,
- pub(super) config: Option<HashMap<String, String>>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- #[serde(rename_all = "kebab-case")]
- pub(super) struct CreateTableRequest {
- pub(super) name: String,
- pub(super) location: Option<String>,
- pub(super) schema: Schema,
- pub(super) partition_spec: Option<UnboundPartitionSpec>,
- pub(super) write_order: Option<SortOrder>,
- pub(super) stage_create: Option<bool>,
- pub(super) properties: Option<HashMap<String, String>>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- pub(super) struct CommitTableRequest {
- pub(super) identifier: TableIdent,
- pub(super) requirements: Vec<TableRequirement>,
- pub(super) updates: Vec<TableUpdate>,
- }
-
- #[derive(Debug, Serialize, Deserialize)]
- #[serde(rename_all = "kebab-case")]
- pub(super) struct CommitTableResponse {
- pub(super) metadata_location: String,
- pub(super) metadata: TableMetadata,
- }
-}
-
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
@@ -783,12 +699,16 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
assert_eq!(
- catalog.config.props.get("warehouse"),
+ catalog
+ .context()
+ .await
+ .unwrap()
+ .config
+ .props
+ .get("warehouse"),
Some(&"s3://iceberg-catalog".to_string())
);
@@ -827,6 +747,7 @@ mod tests {
"expires_in": 86400
}"#,
)
+ .expect(2)
.create_async()
.await
}
@@ -845,16 +766,12 @@ mod tests {
.uri(server.url())
.props(props)
.build(),
- )
- .await
- .unwrap();
+ );
+ let token = catalog.context().await.unwrap().client.token().await;
oauth_mock.assert_async().await;
config_mock.assert_async().await;
- assert_eq!(
- catalog.config.props.get("token"),
- Some(&"ey000000000000".to_string())
- );
+ assert_eq!(token, Some("ey000000000000".to_string()));
}
#[tokio::test]
@@ -884,6 +801,7 @@ mod tests {
"expires_in": 86400
}"#,
)
+ .expect(2)
.create_async()
.await;
@@ -894,16 +812,13 @@ mod tests {
.uri(server.url())
.props(props)
.build(),
- )
- .await
- .unwrap();
+ );
+
+ let token = catalog.context().await.unwrap().client.token().await;
oauth_mock.assert_async().await;
config_mock.assert_async().await;
- assert_eq!(
- catalog.config.props.get("token"),
- Some(&"ey000000000000".to_string())
- );
+ assert_eq!(token, Some("ey000000000000".to_string()));
}
#[tokio::test]
@@ -916,7 +831,7 @@ mod tests {
.uri(server.url())
.props(props)
.build();
- let headers: HeaderMap = config.http_headers().unwrap();
+ let headers: HeaderMap = config.extra_headers().unwrap();
let expected_headers = HeaderMap::from_iter([
(
@@ -953,12 +868,12 @@ mod tests {
.uri(server.url())
.props(props)
.build();
- let headers: HeaderMap = config.http_headers().unwrap();
+ let headers: HeaderMap = config.extra_headers().unwrap();
let expected_headers = HeaderMap::from_iter([
(
header::CONTENT_TYPE,
- HeaderValue::from_static("application/json"),
+ HeaderValue::from_static("application/yaml"),
),
(
HeaderName::from_static("x-client-version"),
@@ -997,16 +912,13 @@ mod tests {
.uri(server.url())
.props(props)
.build(),
- )
- .await
- .unwrap();
+ );
+
+ let token = catalog.context().await.unwrap().client.token().await;
oauth_mock.assert_async().await;
config_mock.assert_async().await;
- assert_eq!(
- catalog.config.props.get("token"),
- Some(&"ey000000000000".to_string())
- );
+ assert_eq!(token, Some("ey000000000000".to_string()));
}
#[tokio::test]
@@ -1044,9 +956,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let _namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1073,9 +983,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let namespaces = catalog.list_namespaces(None).await.unwrap();
@@ -1109,9 +1017,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let namespaces = catalog
.create_namespace(
@@ -1151,9 +1057,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let namespaces = catalog
.get_namespace(&NamespaceIdent::new("ns1".to_string()))
@@ -1183,9 +1087,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
assert!(catalog
.namespace_exists(&NamespaceIdent::new("ns1".to_string()))
@@ -1208,9 +1110,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
catalog
.drop_namespace(&NamespaceIdent::new("ns1".to_string()))
@@ -1247,9 +1147,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let tables = catalog
.list_tables(&NamespaceIdent::new("ns1".to_string()))
@@ -1279,9 +1177,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
catalog
.drop_table(&TableIdent::new(
@@ -1307,9 +1203,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
assert!(catalog
.table_exists(&TableIdent::new(
@@ -1335,9 +1229,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
catalog
.rename_table(
@@ -1368,9 +1260,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table = catalog
.load_table(&TableIdent::new(
@@ -1481,9 +1371,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table = catalog
.load_table(&TableIdent::new(
@@ -1520,9 +1408,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table_creation = TableCreation::builder()
.name("test1".to_string())
@@ -1662,9 +1548,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table_creation = TableCreation::builder()
.name("test1".to_string())
@@ -1717,9 +1601,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table1 = {
let file = File::open(format!(
@@ -1839,9 +1721,7 @@ mod tests {
.create_async()
.await;
- let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
- .await
- .unwrap();
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
let table1 = {
let file = File::open(format!(
diff --git a/crates/catalog/rest/src/client.rs
b/crates/catalog/rest/src/client.rs
index dbfd9de..43e14c7 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -15,25 +15,171 @@
// specific language governing permissions and limitations
// under the License.
+use crate::types::{ErrorResponse, TokenResponse, OK};
+use crate::RestCatalogConfig;
use iceberg::Result;
use iceberg::{Error, ErrorKind};
use reqwest::header::HeaderMap;
use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
use serde::de::DeserializeOwned;
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::Mutex;
-#[derive(Debug)]
-pub(crate) struct HttpClient(Client);
+pub(crate) struct HttpClient {
+ client: Client,
+
+ /// The token to be used for authentication.
+ ///
+ /// It's possible to fetch the token from the server while needed.
+ token: Mutex<Option<String>>,
+ /// The token endpoint to be used for authentication.
+ token_endpoint: String,
+ /// The credential to be used for authentication.
+ credential: Option<(Option<String>, String)>,
+ /// Extra headers to be added to each request.
+ extra_headers: HeaderMap,
+ /// Extra oauth parameters to be added to each authentication request.
+ extra_oauth_params: HashMap<String, String>,
+}
+
+impl Debug for HttpClient {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HttpClient")
+ .field("client", &self.client)
+ .field("extra_headers", &self.extra_headers)
+ .finish_non_exhaustive()
+ }
+}
impl HttpClient {
- pub fn try_create(default_headers: HeaderMap) -> Result<Self> {
- Ok(HttpClient(
- Client::builder().default_headers(default_headers).build()?,
- ))
+ pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
+ Ok(HttpClient {
+ client: Client::new(),
+
+ token: Mutex::new(cfg.token()),
+ token_endpoint: cfg.get_token_endpoint(),
+ credential: cfg.credential(),
+ extra_headers: cfg.extra_headers()?,
+ extra_oauth_params: cfg.extra_oauth_params(),
+ })
+ }
+
+ /// This API is testing only to assert the token.
+ #[cfg(test)]
+ pub(crate) async fn token(&self) -> Option<String> {
+ let mut req = self
+ .request(Method::GET, &self.token_endpoint)
+ .build()
+ .unwrap();
+ self.authenticate(&mut req).await.ok();
+ self.token.lock().unwrap().clone()
+ }
+
+ /// Authenticate the request by filling token.
+ ///
+ /// - If neither token nor credential is provided, this method will do
nothing.
+ /// - If only credential is provided, this method will try to fetch token
from the server.
+ /// - If token is provided, this method will use the token directly.
+ ///
+ /// # TODO
+ ///
+ /// Support refreshing token while needed.
+ async fn authenticate(&self, req: &mut Request) -> Result<()> {
+ // Clone the token from lock without holding the lock for entire
function.
+ let token = { self.token.lock().expect("lock poison").clone() };
+
+ if self.credential.is_none() && token.is_none() {
+ return Ok(());
+ }
+
+ // Use token if provided.
+ if let Some(token) = &token {
+ req.headers_mut().insert(
+ http::header::AUTHORIZATION,
+ format!("Bearer {token}").parse().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid token received from catalog server!",
+ )
+ .with_source(e)
+ })?,
+ );
+ return Ok(());
+ }
+
+ // Credential must exist here.
+ let (client_id, client_secret) =
self.credential.as_ref().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Credential must be provided for authentication",
+ )
+ })?;
+
+ let mut params = HashMap::with_capacity(4);
+ params.insert("grant_type", "client_credentials");
+ if let Some(client_id) = client_id {
+ params.insert("client_id", client_id);
+ }
+ params.insert("client_secret", client_secret);
+ params.extend(
+ self.extra_oauth_params
+ .iter()
+ .map(|(k, v)| (k.as_str(), v.as_str())),
+ );
+
+ let auth_req = self
+ .client
+ .request(Method::POST, &self.token_endpoint)
+ .form(¶ms)
+ .build()?;
+ let auth_resp = self.client.execute(auth_req).await?;
+
+ let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK {
+ let text = auth_resp.bytes().await?;
+ Ok(serde_json::from_slice(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?)
+ } else {
+ let code = auth_resp.status();
+ let text = auth_resp.bytes().await?;
+ let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_context("code", code.to_string())
+ .with_source(e)
+ })?;
+ Err(Error::from(e))
+ }?;
+ let token = auth_res.access_token;
+ // Update token.
+ *self.token.lock().expect("lock poison") = Some(token.clone());
+ // Insert token in request.
+ req.headers_mut().insert(
+ http::header::AUTHORIZATION,
+ format!("Bearer {token}").parse().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid token received from catalog server!",
+ )
+ .with_source(e)
+ })?,
+ );
+
+ Ok(())
}
#[inline]
pub fn request<U: IntoUrl>(&self, method: Method, url: U) ->
RequestBuilder {
- self.0.request(method, url)
+ self.client.request(method, url)
}
pub async fn query<
@@ -42,9 +188,11 @@ impl HttpClient {
const SUCCESS_CODE: u16,
>(
&self,
- request: Request,
+ mut request: Request,
) -> Result<R> {
- let resp = self.0.execute(request).await?;
+ self.authenticate(&mut request).await?;
+
+ let resp = self.client.execute(request).await?;
if resp.status().as_u16() == SUCCESS_CODE {
let text = resp.bytes().await?;
@@ -74,9 +222,11 @@ impl HttpClient {
pub async fn execute<E: DeserializeOwned + Into<Error>, const
SUCCESS_CODE: u16>(
&self,
- request: Request,
+ mut request: Request,
) -> Result<()> {
- let resp = self.0.execute(request).await?;
+ self.authenticate(&mut request).await?;
+
+ let resp = self.client.execute(request).await?;
if resp.status().as_u16() == SUCCESS_CODE {
Ok(())
@@ -99,10 +249,12 @@ impl HttpClient {
/// More generic logic handling for special cases like head.
pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
&self,
- request: Request,
+ mut request: Request,
handler: impl FnOnce(&Response) -> Option<R>,
) -> Result<R> {
- let resp = self.0.execute(request).await?;
+ self.authenticate(&mut request).await?;
+
+ let resp = self.client.execute(request).await?;
if let Some(ret) = handler(&resp) {
Ok(ret)
diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs
index 6b0a784..f94ee87 100644
--- a/crates/catalog/rest/src/lib.rs
+++ b/crates/catalog/rest/src/lib.rs
@@ -21,4 +21,6 @@
mod catalog;
mod client;
+mod types;
+
pub use catalog::*;
diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs
new file mode 100644
index 0000000..c8d704b
--- /dev/null
+++ b/crates/catalog/rest/src/types.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use serde_derive::{Deserialize, Serialize};
+
+use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
+use iceberg::{
+ Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement,
TableUpdate,
+};
+
+pub(super) const OK: u16 = 200u16;
+pub(super) const NO_CONTENT: u16 = 204u16;
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub(super) struct CatalogConfig {
+ pub(super) overrides: HashMap<String, String>,
+ pub(super) defaults: HashMap<String, String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ErrorResponse {
+ error: ErrorModel,
+}
+
+impl From<ErrorResponse> for Error {
+ fn from(resp: ErrorResponse) -> Error {
+ resp.error.into()
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ErrorModel {
+ pub(super) message: String,
+ pub(super) r#type: String,
+ pub(super) code: u16,
+ pub(super) stack: Option<Vec<String>>,
+}
+
+impl From<ErrorModel> for Error {
+ fn from(value: ErrorModel) -> Self {
+ let mut error = Error::new(ErrorKind::DataInvalid, value.message)
+ .with_context("type", value.r#type)
+ .with_context("code", format!("{}", value.code));
+
+ if let Some(stack) = value.stack {
+ error = error.with_context("stack", stack.join("\n"));
+ }
+
+ error
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct OAuthError {
+ pub(super) error: String,
+ pub(super) error_description: Option<String>,
+ pub(super) error_uri: Option<String>,
+}
+
+impl From<OAuthError> for Error {
+ fn from(value: OAuthError) -> Self {
+ let mut error = Error::new(
+ ErrorKind::DataInvalid,
+ format!("OAuthError: {}", value.error),
+ );
+
+ if let Some(desc) = value.error_description {
+ error = error.with_context("description", desc);
+ }
+
+ if let Some(uri) = value.error_uri {
+ error = error.with_context("uri", uri);
+ }
+
+ error
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct TokenResponse {
+ pub(super) access_token: String,
+ pub(super) token_type: String,
+ pub(super) expires_in: Option<u64>,
+ pub(super) issued_token_type: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct NamespaceSerde {
+ pub(super) namespace: Vec<String>,
+ pub(super) properties: Option<HashMap<String, String>>,
+}
+
+impl TryFrom<NamespaceSerde> for Namespace {
+ type Error = Error;
+ fn try_from(value: NamespaceSerde) -> std::result::Result<Self,
Self::Error> {
+ Ok(Namespace::with_properties(
+ NamespaceIdent::from_vec(value.namespace)?,
+ value.properties.unwrap_or_default(),
+ ))
+ }
+}
+
+impl From<&Namespace> for NamespaceSerde {
+ fn from(value: &Namespace) -> Self {
+ Self {
+ namespace: value.name().as_ref().clone(),
+ properties: Some(value.properties().clone()),
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ListNamespaceResponse {
+ pub(super) namespaces: Vec<Vec<String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct UpdateNamespacePropsRequest {
+ removals: Option<Vec<String>>,
+ updates: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct UpdateNamespacePropsResponse {
+ updated: Vec<String>,
+ removed: Vec<String>,
+ missing: Option<Vec<String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct ListTableResponse {
+ pub(super) identifiers: Vec<TableIdent>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct RenameTableRequest {
+ pub(super) source: TableIdent,
+ pub(super) destination: TableIdent,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct LoadTableResponse {
+ pub(super) metadata_location: Option<String>,
+ pub(super) metadata: TableMetadata,
+ pub(super) config: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct CreateTableRequest {
+ pub(super) name: String,
+ pub(super) location: Option<String>,
+ pub(super) schema: Schema,
+ pub(super) partition_spec: Option<UnboundPartitionSpec>,
+ pub(super) write_order: Option<SortOrder>,
+ pub(super) stage_create: Option<bool>,
+ pub(super) properties: Option<HashMap<String, String>>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub(super) struct CommitTableRequest {
+ pub(super) identifier: TableIdent,
+ pub(super) requirements: Vec<TableRequirement>,
+ pub(super) updates: Vec<TableUpdate>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+pub(super) struct CommitTableResponse {
+ pub(super) metadata_location: String,
+ pub(super) metadata: TableMetadata,
+}
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs
b/crates/catalog/rest/tests/rest_catalog_test.rs
index 205428d..3c22241 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -59,7 +59,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
let config = RestCatalogConfig::builder()
.uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
.build();
- let rest_catalog = RestCatalog::new(config).await.unwrap();
+ let rest_catalog = RestCatalog::new(config);
TestFixture {
_docker_compose: docker_compose,
diff --git a/crates/examples/src/rest_catalog_namespace.rs
b/crates/examples/src/rest_catalog_namespace.rs
index 0a3b00b..3716899 100644
--- a/crates/examples/src/rest_catalog_namespace.rs
+++ b/crates/examples/src/rest_catalog_namespace.rs
@@ -27,7 +27,7 @@ async fn main() {
.uri("http://localhost:8080".to_string())
.build();
- let catalog = RestCatalog::new(config).await.unwrap();
+ let catalog = RestCatalog::new(config);
// ANCHOR_END: create_catalog
// ANCHOR: list_all_namespace
diff --git a/crates/examples/src/rest_catalog_table.rs
b/crates/examples/src/rest_catalog_table.rs
index 9fb3dd7..f25ce45 100644
--- a/crates/examples/src/rest_catalog_table.rs
+++ b/crates/examples/src/rest_catalog_table.rs
@@ -27,7 +27,7 @@ async fn main() {
.uri("http://localhost:8080".to_string())
.build();
- let catalog = RestCatalog::new(config).await.unwrap();
+ let catalog = RestCatalog::new(config);
// ANCHOR: create_table
let table_id = TableIdent::from_strs(["default", "t1"]).unwrap();