This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-object-store in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 363a70ef29d3870596c7cdc853472cee43d99c5e Author: Xuanwo <[email protected]> AuthorDate: Thu Apr 13 19:24:17 2023 +0800 refactor: Polish the implementaion of webhdfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/webhdfs/backend.rs | 333 +++++++++++++---------------------- core/src/services/webhdfs/message.rs | 21 +-- core/src/services/webhdfs/pager.rs | 12 +- core/src/services/webhdfs/writer.rs | 2 +- 4 files changed, 133 insertions(+), 235 deletions(-) diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 01f9f4c3..4e10f980 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -19,15 +19,12 @@ use core::fmt::Debug; use std::collections::HashMap; use async_trait::async_trait; -use bytes::Buf; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; -use http::response::Parts; use http::Request; use http::Response; use http::StatusCode; use log::debug; -use log::error; use tokio::sync::OnceCell; use super::error::parse_error; @@ -76,6 +73,7 @@ const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870"; /// # Examples /// /// ## Via Builder +/// /// ```no_run /// use std::sync::Arc; /// @@ -114,13 +112,10 @@ pub struct WebhdfsBuilder { impl Debug for WebhdfsBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Builder"); - ds.field("root", &self.root) - .field("endpoint", &self.endpoint); - if self.delegation.is_some() { - ds.field("delegation", &"<redacted>"); - } - ds.finish() + f.debug_struct("Builder") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() } } @@ -128,15 +123,14 @@ impl WebhdfsBuilder { /// Set the working directory of this backend /// /// All operations will happen under this root + /// /// # Note + /// /// The root will be automatically created if not exists. - /// If the root is occupied by a file, building of directory will fail pub fn root(&mut self, root: &str) -> &mut Self { - self.root = if root.is_empty() { - None - } else { - Some(root.to_string()) - }; + if !root.is_empty() { + self.root = Some(root.to_string()) + } self } @@ -161,6 +155,7 @@ impl WebhdfsBuilder { /// Set the delegation token of this backend, /// used for authentication + /// /// # Note /// The builder prefers using delegation token over username. /// If both are set, delegation token will be used. @@ -172,15 +167,6 @@ impl WebhdfsBuilder { } } -impl WebhdfsBuilder { - fn auth_str(&mut self) -> Option<String> { - if let Some(dt) = self.delegation.take() { - return Some(format!("delegation_token={dt}")); - } - None - } -} - impl Builder for WebhdfsBuilder { const SCHEME: Scheme = Scheme::Webhdfs; type Accessor = WebhdfsBackend; @@ -188,28 +174,22 @@ impl Builder for WebhdfsBuilder { fn from_map(map: HashMap<String, String>) -> Self { let mut builder = WebhdfsBuilder::default(); - for (k, v) in map.iter() { - let v = v.as_str(); - match k.as_str() { - "root" => builder.root(v), - "endpoint" => builder.endpoint(v), - "delegation" => builder.delegation(v), - _ => continue, - }; - } + map.get("root").map(|v| builder.root(v)); + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("delegation").map(|v| builder.delegation(v)); builder } /// build the backend /// - /// # Note: + /// # Note + /// /// when building backend, the built backend will check if the root directory /// exits. /// if the directory does not exits, the directory will be automatically created - /// if the root path is occupied by a file, a failure will be returned fn build(&mut self) -> Result<Self::Accessor> { - debug!("building backend: {:?}", self); + debug!("start building backend: {:?}", self); let root = normalize_root(&self.root.take().unwrap_or_default()); debug!("backend use root {root}"); @@ -227,19 +207,21 @@ impl Builder for WebhdfsBuilder { }; debug!("backend use endpoint {}", endpoint); - let auth = self.auth_str(); + let auth = self + .delegation + .take() + .map(|dt| format!("delegation_token={dt}")); + let client = HttpClient::new()?; let backend = WebhdfsBackend { - root: root.clone(), + root, endpoint, auth, client, root_checker: OnceCell::new(), }; - debug!("checking working directory: {}", root); - Ok(backend) } } @@ -249,14 +231,17 @@ impl Builder for WebhdfsBuilder { pub struct WebhdfsBackend { root: String, endpoint: String, - pub client: HttpClient, auth: Option<String>, root_checker: OnceCell<()>, + + pub client: HttpClient, } impl WebhdfsBackend { - // create object or make a directory - pub async fn webhdfs_create_object_req( + /// create object or make a directory + /// + /// TODO: we should split it into mkdir and create + pub async fn webhdfs_create_object_request( &self, path: &str, size: Option<usize>, @@ -307,7 +292,11 @@ impl WebhdfsBackend { re_builder.body(body).map_err(new_request_build_error) } - async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) -> Result<Request<AsyncBody>> { + async fn webhdfs_open_request( + &self, + path: &str, + range: &BytesRange, + ) -> Result<Request<AsyncBody>> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=OPEN", @@ -315,40 +304,23 @@ impl WebhdfsBackend { percent_encode_path(&p), ); if let Some(auth) = &self.auth { - url += format!("&{auth}").as_str(); + url += &format!("&{auth}"); } - // make a Webhdfs compatible bytes range - // - // Webhdfs does not support read from end - // have to solve manually - let range = match (range.offset(), range.size()) { - // avoiding reading the whole file - (None, Some(size)) => { - debug!("converting bytes range to webhdfs compatible"); - let status = self.stat(path, OpStat::default()).await?; - let total_size = status.into_metadata().content_length(); - let offset = total_size - size; - BytesRange::new(Some(offset), Some(size)) - } - _ => *range, - }; - - let (offset, size) = (range.offset(), range.size()); + if !range.is_full() { + // Webhdfs does not support read from end + if range.offset().is_none() && range.size().is_some() { + return Err(Error::new( + ErrorKind::Unsupported, + "webhdfs doesn't support read with suffix range", + )); + }; - match (offset, size) { - (Some(offset), Some(size)) => { - url += format!("&offset={offset}&length={size}").as_str(); + if let Some(offset) = range.offset() { + url += &format!("&offset={offset}"); } - (Some(offset), None) => { - url += format!("&offset={offset}").as_str(); - } - (None, None) => { - // read all, do nothing - } - (None, Some(_)) => { - // already handled - unreachable!() + if let Some(size) = range.size() { + url += &format!("&length={size}") } } @@ -359,7 +331,7 @@ impl WebhdfsBackend { Ok(req) } - fn webhdfs_list_status_req(&self, path: &str) -> Result<Request<AsyncBody>> { + fn webhdfs_list_status_request(&self, path: &str) -> Result<Request<AsyncBody>> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=LISTSTATUS", @@ -370,62 +342,52 @@ impl WebhdfsBackend { url += format!("&{auth}").as_str(); } - let req = Request::get(&url); - let req = req + let req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; Ok(req) } -} -impl WebhdfsBackend { - /// get object from webhdfs - /// - /// # Notes - /// - /// looks like webhdfs doesn't support range request from file end. - /// so if we want to read the tail of object, the whole object should be transferred. - async fn webhdfs_get_object( + async fn webhdfs_read_file( &self, path: &str, range: BytesRange, ) -> Result<Response<IncomingAsyncBody>> { - let req = self.webhdfs_open_req(path, &range).await?; + let req = self.webhdfs_open_request(path, &range).await?; let resp = self.client.send(req).await?; - // this should be a 307 redirect + // webhdfs namenode will redirect us to datanode for data transfer. if resp.status() != StatusCode::TEMPORARY_REDIRECT { return Err(parse_error(resp).await?); } - let re_url = self.follow_redirect(resp)?; - let re_req = Request::get(&re_url) + let location = self.follow_redirect(resp)?; + let req = Request::get(&location) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.client.send(re_req).await + self.client.send(req).await } - async fn webhdfs_status_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + async fn webhdfs_get_file_status(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=GETFILESTATUS", self.endpoint, percent_encode_path(&p), ); - debug!("webhdfs status url: {}", url); + if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } - let req = Request::get(&url); - let req = req + let req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; self.client.send(req).await } - async fn webhdfs_delete_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + async fn webhdfs_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=DELETE&recursive=false", @@ -436,94 +398,56 @@ impl WebhdfsBackend { url += format!("&{auth}").as_str(); } - let req = Request::delete(&url); - let req = req + let req = Request::delete(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; self.client.send(req).await } -} -impl WebhdfsBackend { /// get redirect destination from 307 TEMPORARY_REDIRECT http response fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> { - let loc = match parse_location(resp.headers())? { - Some(p) => { - if !p.starts_with('/') { - // is not relative path - p.to_string() - } else { - // is relative path - // prefix with endpoint url - let url = self.endpoint.clone(); - format!("{url}/{p}") - } - } - None => { - let err = Error::new( - ErrorKind::Unexpected, - "redirection fail: no location header", - ); - return Err(err); - } - }; - Ok(loc) - } - - fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) -> Result<RpCreate> { - let mkdir_rsp = serde_json::from_str::<BooleanResp>(body).map_err(|e| { - Error::new(ErrorKind::Unexpected, "cannot parse mkdir response") - .set_temporary() - .with_context("service", Scheme::Webhdfs) - .with_context("response", format!("{parts:?}")) - .set_source(e) + let location = parse_location(resp.headers())?.ok_or({ + Error::new( + ErrorKind::Unexpected, + "webhdfs expect to have redirect location but got none", + ) })?; - if mkdir_rsp.boolean { - Ok(RpCreate::default()) + let location = if location.starts_with('/') { + // location starts with `/` means it's a relative path to current + // endpoint. We should prepend the endpoint to it so that we can + // send request to the correct location. + format!("{}/{location}", self.endpoint) } else { - Err(Error::new( - ErrorKind::Unexpected, - &format!("mkdir failed: {path}"), - )) - } + location.to_string() + }; + + Ok(location) } async fn check_root(&self) -> Result<()> { - let resp = self.webhdfs_status_object("/").await?; + let resp = self.webhdfs_get_file_status("/").await?; match resp.status() { StatusCode::OK => { - let body_bs = resp.into_body().bytes().await?; - - let file_status = serde_json::from_reader::<_, FileStatusWrapper>(body_bs.reader()) - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "cannot parse returned json") - .with_context("service", Scheme::Webhdfs) - .set_source(e) - })? + let bs = resp.into_body().bytes().await?; + + let file_status = serde_json::from_slice::<FileStatusWrapper>(&bs) + .map_err(new_json_deserialize_error)? .file_status; - match file_status.ty { - FileStatusType::File => { - error!("working directory is occupied!"); - return Err(Error::new(ErrorKind::ConfigInvalid, "root is occupied!") - .with_context("service", Scheme::Webhdfs)); - } - FileStatusType::Directory => { - debug!("working directory exists, do nothing"); - } + if file_status.ty == FileStatusType::File { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "root path must be dir", + )); } } - StatusCode::NOT_FOUND => { - debug!("working directory does not exists, creating..."); self.create("/", OpCreate::new(EntryMode::DIR)).await?; } - _ => return Err(parse_error(resp).await?), } - debug!("working directory is ready!"); Ok(()) } } @@ -549,19 +473,9 @@ impl Accessor for WebhdfsBackend { } /// Create a file or directory - async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> { - // if the path ends with '/', it will be treated as a directory - // otherwise, it will be treated as a file - let path = if args.mode().is_file() && path.ends_with('/') { - path.trim_end_matches('/').to_owned() - } else if args.mode().is_dir() && !path.ends_with('/') { - path.to_owned() + "/" - } else { - path.to_owned() - }; - + async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> { let req = self - .webhdfs_create_object_req(&path, Some(0), None, AsyncBody::Empty) + .webhdfs_create_object_request(path, Some(0), None, AsyncBody::Empty) .await?; let resp = self.client.send(req).await?; @@ -574,15 +488,19 @@ impl Accessor for WebhdfsBackend { // the redirection should be done automatically. match status { StatusCode::CREATED | StatusCode::OK => { - if !path.ends_with('/') { - // create file's http resp could be ignored - resp.into_body().consume().await?; - return Ok(RpCreate::default()); + let bs = resp.into_body().bytes().await?; + + let resp = serde_json::from_slice::<BooleanResp>(&bs) + .map_err(new_json_deserialize_error)?; + + if resp.boolean { + Ok(RpCreate::default()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "webhdfs create dir failed", + )) } - let (parts, body) = resp.into_parts(); - let bs = body.bytes().await?; - let s = String::from_utf8_lossy(&bs); - self.consume_success_mkdir(&path, parts, &s) } _ => Err(parse_error(resp).await?), } @@ -590,14 +508,12 @@ impl Accessor for WebhdfsBackend { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let range = args.range(); - let resp = self.webhdfs_get_object(path, range).await?; + let resp = self.webhdfs_read_file(path, range).await?; match resp.status() { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { let meta = parse_into_metadata(path, resp.headers())?; Ok((RpRead::with_metadata(meta), resp.into_body())) } - StatusCode::NOT_FOUND => Err(Error::new(ErrorKind::NotFound, "object not found") - .with_context("service", Scheme::Webhdfs)), _ => Err(parse_error(resp).await?), } } @@ -622,28 +538,25 @@ impl Accessor for WebhdfsBackend { .get_or_try_init(|| async { self.check_root().await }) .await?; - let resp = self.webhdfs_status_object(path).await?; + let resp = self.webhdfs_get_file_status(path).await?; let status = resp.status(); match status { StatusCode::OK => { - debug!("stat object: {} ok", path); - let mut meta = parse_into_metadata(path, resp.headers())?; - let body_bs = resp.into_body().bytes().await?; - - let file_status = serde_json::from_reader::<_, FileStatusWrapper>(body_bs.reader()) - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "cannot parse returned json") - .with_context("service", Scheme::Webhdfs) - .set_source(e) - })? + let bs = resp.into_body().bytes().await?; + + let file_status = serde_json::from_slice::<FileStatusWrapper>(&bs) + .map_err(new_json_deserialize_error)? .file_status; - debug!("file status: {:?}", file_status); - let status_meta: Metadata = file_status.try_into()?; - // is ok to unwrap here - // all metadata field of status meta is present and checked by `TryFrom` - meta.set_last_modified(status_meta.last_modified().unwrap()) - .set_content_length(status_meta.content_length()); + let meta = match file_status.ty { + FileStatusType::Directory => Metadata::new(EntryMode::DIR), + FileStatusType::File => Metadata::new(EntryMode::FILE) + .with_content_length(file_status.length) + .with_last_modified(parse_datetime_from_from_timestamp_millis( + file_status.modification_time, + )?), + }; + Ok(RpStat::new(meta)) } @@ -652,7 +565,8 @@ impl Accessor for WebhdfsBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { - let resp = self.webhdfs_delete_object(path).await?; + let resp = self.webhdfs_delete(path).await?; + match resp.status() { StatusCode::OK => { resp.into_body().consume().await?; @@ -664,21 +578,16 @@ impl Accessor for WebhdfsBackend { async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { let path = path.trim_end_matches('/'); - let req = self.webhdfs_list_status_req(path)?; + let req = self.webhdfs_list_status_request(path)?; let resp = self.client.send(req).await?; match resp.status() { StatusCode::OK => { - let body_bs = resp.into_body().bytes().await?; - let mut file_statuses = - serde_json::from_reader::<_, FileStatusesWrapper>(body_bs.reader()) - .map_err(|e| { - Error::new(ErrorKind::Unexpected, "cannot parse returned json") - .with_context("service", Scheme::Webhdfs) - .set_source(e) - })? - .file_statuses - .file_status; + let bs = resp.into_body().bytes().await?; + let file_statuses = serde_json::from_slice::<FileStatusesWrapper>(&bs) + .map_err(new_json_deserialize_error)? + .file_statuses + .file_status; let objects = WebhdfsPager::new(path, file_statuses); Ok((RpList::default(), objects)) diff --git a/core/src/services/webhdfs/message.rs b/core/src/services/webhdfs/message.rs index 9abc4385..e5008830 100644 --- a/core/src/services/webhdfs/message.rs +++ b/core/src/services/webhdfs/message.rs @@ -15,13 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! HTTP response messages +//! WebHDFS response messages use serde::Deserialize; -use crate::raw::*; -use crate::*; - #[derive(Debug, Deserialize)] pub(super) struct BooleanResp { pub boolean: bool, @@ -56,22 +53,6 @@ pub struct FileStatus { pub ty: FileStatusType, } -impl TryFrom<FileStatus> for Metadata { - type Error = Error; - fn try_from(value: FileStatus) -> Result<Self> { - let mut meta = match value.ty { - FileStatusType::Directory => Metadata::new(EntryMode::DIR), - FileStatusType::File => Metadata::new(EntryMode::FILE), - }; - - meta.set_last_modified(parse_datetime_from_from_timestamp_millis( - value.modification_time, - )?) - .set_content_length(value.length); - Ok(meta) - } -} - #[derive(Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "UPPERCASE")] pub enum FileStatusType { diff --git a/core/src/services/webhdfs/pager.rs b/core/src/services/webhdfs/pager.rs index e0881ca1..e58330cd 100644 --- a/core/src/services/webhdfs/pager.rs +++ b/core/src/services/webhdfs/pager.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; -use super::message::FileStatus; +use super::message::{FileStatus, FileStatusType}; use crate::raw::*; use crate::*; @@ -51,7 +51,15 @@ impl oio::Page for WebhdfsPager { format!("{}/{}", self.path, status.path_suffix) }; - let meta: Metadata = status.try_into()?; + let meta = match status.ty { + FileStatusType::Directory => Metadata::new(EntryMode::DIR), + FileStatusType::File => Metadata::new(EntryMode::FILE) + .with_content_length(status.length) + .with_last_modified(parse_datetime_from_from_timestamp_millis( + status.modification_time, + )?), + }; + if meta.mode().is_file() { path = path.trim_end_matches('/').to_string(); } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 8fea27a6..1bdf7bd9 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -43,7 +43,7 @@ impl oio::Write for WebhdfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { let req = self .backend - .webhdfs_create_object_req( + .webhdfs_create_object_request( &self.path, Some(bs.len()), self.op.content_type(),
