This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0d6fae0 refactor(catalog/rest): Split http client logic to seperate
mod (#423)
0d6fae0 is described below
commit 0d6fae04e2e488383be554dbdedcf54cb0d5b220
Author: Xuanwo <[email protected]>
AuthorDate: Fri Jun 28 10:55:26 2024 +0800
refactor(catalog/rest): Split http client logic to seperate mod (#423)
Signed-off-by: Xuanwo <[email protected]>
---
crates/catalog/rest/src/catalog.rs | 147 ++++++-------------------------------
crates/catalog/rest/src/client.rs | 124 +++++++++++++++++++++++++++++++
crates/catalog/rest/src/lib.rs | 1 +
3 files changed, 149 insertions(+), 123 deletions(-)
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index efea9cb..2e5b1b1 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -23,14 +23,14 @@ use std::str::FromStr;
use async_trait::async_trait;
use itertools::Itertools;
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
-use reqwest::{Client, Request, Response, StatusCode, Url};
-use serde::de::DeserializeOwned;
+use reqwest::{Method, StatusCode, Url};
use typed_builder::TypedBuilder;
use urlencoding::encode;
use crate::catalog::_serde::{
CommitTableRequest, CommitTableResponse, CreateTableRequest,
LoadTableResponse,
};
+use crate::client::HttpClient;
use iceberg::io::FileIO;
use iceberg::table::Table;
use iceberg::Result;
@@ -162,10 +162,7 @@ impl RestCatalogConfig {
fn try_create_rest_client(&self) -> Result<HttpClient> {
// TODO: We will add ssl config, sigv4 later
let headers = self.http_headers()?;
-
- Ok(HttpClient(
- Client::builder().default_headers(headers).build()?,
- ))
+ HttpClient::try_create(headers)
}
fn optional_oauth_params(&self) -> HashMap<&str, &str> {
@@ -185,97 +182,6 @@ impl RestCatalogConfig {
}
}
-#[derive(Debug)]
-struct HttpClient(Client);
-
-impl HttpClient {
- async fn query<
- R: DeserializeOwned,
- E: DeserializeOwned + Into<Error>,
- const SUCCESS_CODE: u16,
- >(
- &self,
- request: Request,
- ) -> Result<R> {
- let resp = self.0.execute(request).await?;
-
- if resp.status().as_u16() == SUCCESS_CODE {
- let text = resp.bytes().await?;
- Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to parse response from rest catalog server!",
- )
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?)
- } else {
- let code = resp.status();
- let text = resp.bytes().await?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to parse response from rest catalog server!",
- )
- .with_context("json", String::from_utf8_lossy(&text))
- .with_context("code", code.to_string())
- .with_source(e)
- })?;
- Err(e.into())
- }
- }
-
- async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE:
u16>(
- &self,
- request: Request,
- ) -> Result<()> {
- let resp = self.0.execute(request).await?;
-
- if resp.status().as_u16() == SUCCESS_CODE {
- Ok(())
- } else {
- let code = resp.status();
- let text = resp.bytes().await?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to parse response from rest catalog server!",
- )
- .with_context("json", String::from_utf8_lossy(&text))
- .with_context("code", code.to_string())
- .with_source(e)
- })?;
- Err(e.into())
- }
- }
-
- /// More generic logic handling for special cases like head.
- async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
- &self,
- request: Request,
- handler: impl FnOnce(&Response) -> Option<R>,
- ) -> Result<R> {
- let resp = self.0.execute(request).await?;
-
- if let Some(ret) = handler(&resp) {
- Ok(ret)
- } else {
- let code = resp.status();
- let text = resp.bytes().await?;
- let e = serde_json::from_slice::<E>(&text).map_err(|e| {
- Error::new(
- ErrorKind::Unexpected,
- "Failed to parse response from rest catalog server!",
- )
- .with_context("code", code.to_string())
- .with_context("json", String::from_utf8_lossy(&text))
- .with_source(e)
- })?;
- Err(e.into())
- }
- }
-}
-
/// Rest catalog implementation.
#[derive(Debug)]
pub struct RestCatalog {
@@ -290,7 +196,9 @@ impl Catalog for RestCatalog {
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
- let mut request = self.client.0.get(self.config.namespaces_endpoint());
+ let mut request = self
+ .client
+ .request(Method::GET, self.config.namespaces_endpoint());
if let Some(ns) = parent {
request = request.query(&[("parent", ns.encode_in_url())]);
}
@@ -314,8 +222,7 @@ impl Catalog for RestCatalog {
) -> Result<Namespace> {
let request = self
.client
- .0
- .post(self.config.namespaces_endpoint())
+ .request(Method::POST, self.config.namespaces_endpoint())
.json(&NamespaceSerde {
namespace: namespace.as_ref().clone(),
properties: Some(properties),
@@ -334,8 +241,7 @@ impl Catalog for RestCatalog {
async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
let request = self
.client
- .0
- .get(self.config.namespace_endpoint(namespace))
+ .request(Method::GET, self.config.namespace_endpoint(namespace))
.build()?;
let resp = self
@@ -364,8 +270,7 @@ impl Catalog for RestCatalog {
async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result<bool> {
let request = self
.client
- .0
- .head(self.config.namespace_endpoint(ns))
+ .request(Method::HEAD, self.config.namespace_endpoint(ns))
.build()?;
self.client
@@ -381,8 +286,7 @@ impl Catalog for RestCatalog {
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let request = self
.client
- .0
- .delete(self.config.namespace_endpoint(namespace))
+ .request(Method::DELETE, self.config.namespace_endpoint(namespace))
.build()?;
self.client
@@ -394,8 +298,7 @@ impl Catalog for RestCatalog {
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
let request = self
.client
- .0
- .get(self.config.tables_endpoint(namespace))
+ .request(Method::GET, self.config.tables_endpoint(namespace))
.build()?;
let resp = self
@@ -416,8 +319,7 @@ impl Catalog for RestCatalog {
let request = self
.client
- .0
- .post(self.config.tables_endpoint(namespace))
+ .request(Method::POST, self.config.tables_endpoint(namespace))
.json(&CreateTableRequest {
name: creation.name,
location: creation.location,
@@ -460,8 +362,7 @@ impl Catalog for RestCatalog {
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let request = self
.client
- .0
- .get(self.config.table_endpoint(table))
+ .request(Method::GET, self.config.table_endpoint(table))
.build()?;
let resp = self
@@ -487,8 +388,7 @@ impl Catalog for RestCatalog {
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let request = self
.client
- .0
- .delete(self.config.table_endpoint(table))
+ .request(Method::DELETE, self.config.table_endpoint(table))
.build()?;
self.client
@@ -500,8 +400,7 @@ impl Catalog for RestCatalog {
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let request = self
.client
- .0
- .head(self.config.table_endpoint(table))
+ .request(Method::HEAD, self.config.table_endpoint(table))
.build()?;
self.client
@@ -517,8 +416,7 @@ impl Catalog for RestCatalog {
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
let request = self
.client
- .0
- .post(self.config.rename_table_endpoint())
+ .request(Method::POST, self.config.rename_table_endpoint())
.json(&RenameTableRequest {
source: src.clone(),
destination: dest.clone(),
@@ -534,8 +432,10 @@ impl Catalog for RestCatalog {
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let request = self
.client
- .0
- .post(self.config.table_endpoint(commit.identifier()))
+ .request(
+ Method::POST,
+ self.config.table_endpoint(commit.identifier()),
+ )
.json(&CommitTableRequest {
identifier: commit.identifier().clone(),
requirements: commit.take_requirements(),
@@ -594,8 +494,7 @@ impl RestCatalog {
params.extend(optional_oauth_params);
let req = self
.client
- .0
- .post(self.config.get_token_endpoint())
+ .request(Method::POST, self.config.get_token_endpoint())
.form(¶ms)
.build()?;
let res = self
@@ -617,7 +516,9 @@ impl RestCatalog {
}
async fn update_config(&mut self) -> Result<()> {
- let mut request = self.client.0.get(self.config.config_endpoint());
+ let mut request = self
+ .client
+ .request(Method::GET, self.config.config_endpoint());
if let Some(warehouse_location) = &self.config.warehouse {
request = request.query(&[("warehouse", warehouse_location)]);
diff --git a/crates/catalog/rest/src/client.rs
b/crates/catalog/rest/src/client.rs
new file mode 100644
index 0000000..dbfd9de
--- /dev/null
+++ b/crates/catalog/rest/src/client.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iceberg::Result;
+use iceberg::{Error, ErrorKind};
+use reqwest::header::HeaderMap;
+use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
+use serde::de::DeserializeOwned;
+
+#[derive(Debug)]
+pub(crate) struct HttpClient(Client);
+
+impl HttpClient {
+ pub fn try_create(default_headers: HeaderMap) -> Result<Self> {
+ Ok(HttpClient(
+ Client::builder().default_headers(default_headers).build()?,
+ ))
+ }
+
+ #[inline]
+ pub fn request<U: IntoUrl>(&self, method: Method, url: U) ->
RequestBuilder {
+ self.0.request(method, url)
+ }
+
+ pub async fn query<
+ R: DeserializeOwned,
+ E: DeserializeOwned + Into<Error>,
+ const SUCCESS_CODE: u16,
+ >(
+ &self,
+ request: Request,
+ ) -> Result<R> {
+ let resp = self.0.execute(request).await?;
+
+ if resp.status().as_u16() == SUCCESS_CODE {
+ let text = resp.bytes().await?;
+ Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?)
+ } else {
+ let code = resp.status();
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_context("code", code.to_string())
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+
+ pub async fn execute<E: DeserializeOwned + Into<Error>, const
SUCCESS_CODE: u16>(
+ &self,
+ request: Request,
+ ) -> Result<()> {
+ let resp = self.0.execute(request).await?;
+
+ if resp.status().as_u16() == SUCCESS_CODE {
+ Ok(())
+ } else {
+ let code = resp.status();
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_context("code", code.to_string())
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+
+ /// More generic logic handling for special cases like head.
+ pub async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
+ &self,
+ request: Request,
+ handler: impl FnOnce(&Response) -> Option<R>,
+ ) -> Result<R> {
+ let resp = self.0.execute(request).await?;
+
+ if let Some(ret) = handler(&resp) {
+ Ok(ret)
+ } else {
+ let code = resp.status();
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("code", code.to_string())
+ .with_context("json", String::from_utf8_lossy(&text))
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+}
diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs
index 023fe7a..6b0a784 100644
--- a/crates/catalog/rest/src/lib.rs
+++ b/crates/catalog/rest/src/lib.rs
@@ -20,4 +20,5 @@
#![deny(missing_docs)]
mod catalog;
+mod client;
pub use catalog::*;