This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new e8293428 refactor: Polish the implementaion of webhdfs (#1935)
e8293428 is described below
commit e82934283f43f229acf76d151d0eedd6379d6078
Author: Xuanwo <[email protected]>
AuthorDate: Thu Apr 13 19:41:48 2023 +0800
refactor: Polish the implementaion of webhdfs (#1935)
* Fix file could have trailing /
Signed-off-by: Xuanwo <[email protected]>
* refactor: Polish the implementaion of webhdfs
Signed-off-by: Xuanwo <[email protected]>
* Make clippy happy
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/services/webhdfs/backend.rs | 333 +++++++++++++----------------------
core/src/services/webhdfs/message.rs | 21 +--
core/src/services/webhdfs/pager.rs | 15 +-
core/src/services/webhdfs/writer.rs | 2 +-
4 files changed, 136 insertions(+), 235 deletions(-)
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 3bd32631..295ef755 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -19,15 +19,12 @@ use core::fmt::Debug;
use std::collections::HashMap;
use async_trait::async_trait;
-use bytes::Buf;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
-use http::response::Parts;
use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
-use log::error;
use tokio::sync::OnceCell;
use super::error::parse_error;
@@ -76,6 +73,7 @@ const WEBHDFS_DEFAULT_ENDPOINT: &str =
"http://127.0.0.1:9870";
/// # Examples
///
/// ## Via Builder
+///
/// ```no_run
/// use std::sync::Arc;
///
@@ -114,13 +112,10 @@ pub struct WebhdfsBuilder {
impl Debug for WebhdfsBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Builder");
- ds.field("root", &self.root)
- .field("endpoint", &self.endpoint);
- if self.delegation.is_some() {
- ds.field("delegation", &"<redacted>");
- }
- ds.finish()
+ f.debug_struct("Builder")
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
}
}
@@ -128,15 +123,14 @@ impl WebhdfsBuilder {
/// Set the working directory of this backend
///
/// All operations will happen under this root
+ ///
/// # Note
+ ///
/// The root will be automatically created if not exists.
- /// If the root is occupied by a file, building of directory will fail
pub fn root(&mut self, root: &str) -> &mut Self {
- self.root = if root.is_empty() {
- None
- } else {
- Some(root.to_string())
- };
+ if !root.is_empty() {
+ self.root = Some(root.to_string())
+ }
self
}
@@ -161,6 +155,7 @@ impl WebhdfsBuilder {
/// Set the delegation token of this backend,
/// used for authentication
+ ///
/// # Note
/// The builder prefers using delegation token over username.
/// If both are set, delegation token will be used.
@@ -172,15 +167,6 @@ impl WebhdfsBuilder {
}
}
-impl WebhdfsBuilder {
- fn auth_str(&mut self) -> Option<String> {
- if let Some(dt) = self.delegation.take() {
- return Some(format!("delegation_token={dt}"));
- }
- None
- }
-}
-
impl Builder for WebhdfsBuilder {
const SCHEME: Scheme = Scheme::Webhdfs;
type Accessor = WebhdfsBackend;
@@ -188,28 +174,22 @@ impl Builder for WebhdfsBuilder {
fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = WebhdfsBuilder::default();
- for (k, v) in map.iter() {
- let v = v.as_str();
- match k.as_str() {
- "root" => builder.root(v),
- "endpoint" => builder.endpoint(v),
- "delegation" => builder.delegation(v),
- _ => continue,
- };
- }
+ map.get("root").map(|v| builder.root(v));
+ map.get("endpoint").map(|v| builder.endpoint(v));
+ map.get("delegation").map(|v| builder.delegation(v));
builder
}
/// build the backend
///
- /// # Note:
+ /// # Note
+ ///
/// when building backend, the built backend will check if the root
directory
/// exits.
/// if the directory does not exits, the directory will be automatically
created
- /// if the root path is occupied by a file, a failure will be returned
fn build(&mut self) -> Result<Self::Accessor> {
- debug!("building backend: {:?}", self);
+ debug!("start building backend: {:?}", self);
let root = normalize_root(&self.root.take().unwrap_or_default());
debug!("backend use root {root}");
@@ -227,19 +207,21 @@ impl Builder for WebhdfsBuilder {
};
debug!("backend use endpoint {}", endpoint);
- let auth = self.auth_str();
+ let auth = self
+ .delegation
+ .take()
+ .map(|dt| format!("delegation_token={dt}"));
+
let client = HttpClient::new()?;
let backend = WebhdfsBackend {
- root: root.clone(),
+ root,
endpoint,
auth,
client,
root_checker: OnceCell::new(),
};
- debug!("checking working directory: {}", root);
-
Ok(backend)
}
}
@@ -249,14 +231,17 @@ impl Builder for WebhdfsBuilder {
pub struct WebhdfsBackend {
root: String,
endpoint: String,
- pub client: HttpClient,
auth: Option<String>,
root_checker: OnceCell<()>,
+
+ pub client: HttpClient,
}
impl WebhdfsBackend {
- // create object or make a directory
- pub async fn webhdfs_create_object_req(
+ /// create object or make a directory
+ ///
+ /// TODO: we should split it into mkdir and create
+ pub async fn webhdfs_create_object_request(
&self,
path: &str,
size: Option<usize>,
@@ -307,7 +292,11 @@ impl WebhdfsBackend {
re_builder.body(body).map_err(new_request_build_error)
}
- async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) ->
Result<Request<AsyncBody>> {
+ async fn webhdfs_open_request(
+ &self,
+ path: &str,
+ range: &BytesRange,
+ ) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=OPEN",
@@ -315,40 +304,23 @@ impl WebhdfsBackend {
percent_encode_path(&p),
);
if let Some(auth) = &self.auth {
- url += format!("&{auth}").as_str();
+ url += &format!("&{auth}");
}
- // make a Webhdfs compatible bytes range
- //
- // Webhdfs does not support read from end
- // have to solve manually
- let range = match (range.offset(), range.size()) {
- // avoiding reading the whole file
- (None, Some(size)) => {
- debug!("converting bytes range to webhdfs compatible");
- let status = self.stat(path, OpStat::default()).await?;
- let total_size = status.into_metadata().content_length();
- let offset = total_size - size;
- BytesRange::new(Some(offset), Some(size))
- }
- _ => *range,
- };
-
- let (offset, size) = (range.offset(), range.size());
+ if !range.is_full() {
+ // Webhdfs does not support read from end
+ if range.offset().is_none() && range.size().is_some() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "webhdfs doesn't support read with suffix range",
+ ));
+ };
- match (offset, size) {
- (Some(offset), Some(size)) => {
- url += format!("&offset={offset}&length={size}").as_str();
+ if let Some(offset) = range.offset() {
+ url += &format!("&offset={offset}");
}
- (Some(offset), None) => {
- url += format!("&offset={offset}").as_str();
- }
- (None, None) => {
- // read all, do nothing
- }
- (None, Some(_)) => {
- // already handled
- unreachable!()
+ if let Some(size) = range.size() {
+ url += &format!("&length={size}")
}
}
@@ -359,7 +331,7 @@ impl WebhdfsBackend {
Ok(req)
}
- fn webhdfs_list_status_req(&self, path: &str) ->
Result<Request<AsyncBody>> {
+ fn webhdfs_list_status_request(&self, path: &str) ->
Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=LISTSTATUS",
@@ -370,62 +342,52 @@ impl WebhdfsBackend {
url += format!("&{auth}").as_str();
}
- let req = Request::get(&url);
- let req = req
+ let req = Request::get(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
Ok(req)
}
-}
-impl WebhdfsBackend {
- /// get object from webhdfs
- ///
- /// # Notes
- ///
- /// looks like webhdfs doesn't support range request from file end.
- /// so if we want to read the tail of object, the whole object should be
transferred.
- async fn webhdfs_get_object(
+ async fn webhdfs_read_file(
&self,
path: &str,
range: BytesRange,
) -> Result<Response<IncomingAsyncBody>> {
- let req = self.webhdfs_open_req(path, &range).await?;
+ let req = self.webhdfs_open_request(path, &range).await?;
let resp = self.client.send(req).await?;
- // this should be a 307 redirect
+ // webhdfs namenode will redirect us to datanode for data transfer.
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
return Err(parse_error(resp).await?);
}
- let re_url = self.follow_redirect(resp)?;
- let re_req = Request::get(&re_url)
+ let location = self.follow_redirect(resp)?;
+ let req = Request::get(&location)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send(re_req).await
+ self.client.send(req).await
}
- async fn webhdfs_status_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ async fn webhdfs_get_file_status(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=GETFILESTATUS",
self.endpoint,
percent_encode_path(&p),
);
- debug!("webhdfs status url: {}", url);
+
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
- let req = Request::get(&url);
- let req = req
+ let req = Request::get(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send(req).await
}
- async fn webhdfs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ async fn webhdfs_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=DELETE&recursive=false",
@@ -436,94 +398,56 @@ impl WebhdfsBackend {
url += format!("&{auth}").as_str();
}
- let req = Request::delete(&url);
- let req = req
+ let req = Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send(req).await
}
-}
-impl WebhdfsBackend {
/// get redirect destination from 307 TEMPORARY_REDIRECT http response
fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) ->
Result<String> {
- let loc = match parse_location(resp.headers())? {
- Some(p) => {
- if !p.starts_with('/') {
- // is not relative path
- p.to_string()
- } else {
- // is relative path
- // prefix with endpoint url
- let url = self.endpoint.clone();
- format!("{url}/{p}")
- }
- }
- None => {
- let err = Error::new(
- ErrorKind::Unexpected,
- "redirection fail: no location header",
- );
- return Err(err);
- }
- };
- Ok(loc)
- }
-
- fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) ->
Result<RpCreate> {
- let mkdir_rsp = serde_json::from_str::<BooleanResp>(body).map_err(|e| {
- Error::new(ErrorKind::Unexpected, "cannot parse mkdir response")
- .set_temporary()
- .with_context("service", Scheme::Webhdfs)
- .with_context("response", format!("{parts:?}"))
- .set_source(e)
+ let location = parse_location(resp.headers())?.ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "webhdfs expect to have redirect location but got none",
+ )
})?;
- if mkdir_rsp.boolean {
- Ok(RpCreate::default())
+ let location = if location.starts_with('/') {
+ // location starts with `/` means it's a relative path to current
+ // endpoint. We should prepend the endpoint to it so that we can
+ // send request to the correct location.
+ format!("{}/{location}", self.endpoint)
} else {
- Err(Error::new(
- ErrorKind::Unexpected,
- &format!("mkdir failed: {path}"),
- ))
- }
+ location.to_string()
+ };
+
+ Ok(location)
}
async fn check_root(&self) -> Result<()> {
- let resp = self.webhdfs_status_object("/").await?;
+ let resp = self.webhdfs_get_file_status("/").await?;
match resp.status() {
StatusCode::OK => {
- let body_bs = resp.into_body().bytes().await?;
-
- let file_status = serde_json::from_reader::<_,
FileStatusWrapper>(body_bs.reader())
- .map_err(|e| {
- Error::new(ErrorKind::Unexpected, "cannot parse
returned json")
- .with_context("service", Scheme::Webhdfs)
- .set_source(e)
- })?
+ let bs = resp.into_body().bytes().await?;
+
+ let file_status =
serde_json::from_slice::<FileStatusWrapper>(&bs)
+ .map_err(new_json_deserialize_error)?
.file_status;
- match file_status.ty {
- FileStatusType::File => {
- error!("working directory is occupied!");
- return Err(Error::new(ErrorKind::ConfigInvalid, "root
is occupied!")
- .with_context("service", Scheme::Webhdfs));
- }
- FileStatusType::Directory => {
- debug!("working directory exists, do nothing");
- }
+ if file_status.ty == FileStatusType::File {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "root path must be dir",
+ ));
}
}
-
StatusCode::NOT_FOUND => {
- debug!("working directory does not exists, creating...");
self.create("/", OpCreate::new(EntryMode::DIR)).await?;
}
-
_ => return Err(parse_error(resp).await?),
}
- debug!("working directory is ready!");
Ok(())
}
}
@@ -549,19 +473,9 @@ impl Accessor for WebhdfsBackend {
}
/// Create a file or directory
- async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
- // if the path ends with '/', it will be treated as a directory
- // otherwise, it will be treated as a file
- let path = if args.mode().is_file() && path.ends_with('/') {
- path.trim_end_matches('/').to_owned()
- } else if args.mode().is_dir() && !path.ends_with('/') {
- path.to_owned() + "/"
- } else {
- path.to_owned()
- };
-
+ async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
let req = self
- .webhdfs_create_object_req(&path, Some(0), None, AsyncBody::Empty)
+ .webhdfs_create_object_request(path, Some(0), None,
AsyncBody::Empty)
.await?;
let resp = self.client.send(req).await?;
@@ -574,15 +488,19 @@ impl Accessor for WebhdfsBackend {
// the redirection should be done automatically.
match status {
StatusCode::CREATED | StatusCode::OK => {
- if !path.ends_with('/') {
- // create file's http resp could be ignored
- resp.into_body().consume().await?;
- return Ok(RpCreate::default());
+ let bs = resp.into_body().bytes().await?;
+
+ let resp = serde_json::from_slice::<BooleanResp>(&bs)
+ .map_err(new_json_deserialize_error)?;
+
+ if resp.boolean {
+ Ok(RpCreate::default())
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "webhdfs create dir failed",
+ ))
}
- let (parts, body) = resp.into_parts();
- let bs = body.bytes().await?;
- let s = String::from_utf8_lossy(&bs);
- self.consume_success_mkdir(&path, parts, &s)
}
_ => Err(parse_error(resp).await?),
}
@@ -590,14 +508,12 @@ impl Accessor for WebhdfsBackend {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
let range = args.range();
- let resp = self.webhdfs_get_object(path, range).await?;
+ let resp = self.webhdfs_read_file(path, range).await?;
match resp.status() {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}
- StatusCode::NOT_FOUND => Err(Error::new(ErrorKind::NotFound,
"object not found")
- .with_context("service", Scheme::Webhdfs)),
_ => Err(parse_error(resp).await?),
}
}
@@ -622,28 +538,25 @@ impl Accessor for WebhdfsBackend {
.get_or_try_init(|| async { self.check_root().await })
.await?;
- let resp = self.webhdfs_status_object(path).await?;
+ let resp = self.webhdfs_get_file_status(path).await?;
let status = resp.status();
match status {
StatusCode::OK => {
- debug!("stat object: {} ok", path);
- let mut meta = parse_into_metadata(path, resp.headers())?;
- let body_bs = resp.into_body().bytes().await?;
-
- let file_status = serde_json::from_reader::<_,
FileStatusWrapper>(body_bs.reader())
- .map_err(|e| {
- Error::new(ErrorKind::Unexpected, "cannot parse
returned json")
- .with_context("service", Scheme::Webhdfs)
- .set_source(e)
- })?
+ let bs = resp.into_body().bytes().await?;
+
+ let file_status =
serde_json::from_slice::<FileStatusWrapper>(&bs)
+ .map_err(new_json_deserialize_error)?
.file_status;
- debug!("file status: {:?}", file_status);
- let status_meta: Metadata = file_status.try_into()?;
- // is ok to unwrap here
- // all metadata field of status meta is present and checked by
`TryFrom`
- meta.set_last_modified(status_meta.last_modified().unwrap())
- .set_content_length(status_meta.content_length());
+ let meta = match file_status.ty {
+ FileStatusType::Directory => Metadata::new(EntryMode::DIR),
+ FileStatusType::File => Metadata::new(EntryMode::FILE)
+ .with_content_length(file_status.length)
+
.with_last_modified(parse_datetime_from_from_timestamp_millis(
+ file_status.modification_time,
+ )?),
+ };
+
Ok(RpStat::new(meta))
}
@@ -652,7 +565,8 @@ impl Accessor for WebhdfsBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.webhdfs_delete_object(path).await?;
+ let resp = self.webhdfs_delete(path).await?;
+
match resp.status() {
StatusCode::OK => {
resp.into_body().consume().await?;
@@ -664,21 +578,16 @@ impl Accessor for WebhdfsBackend {
async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Pager)> {
let path = path.trim_end_matches('/');
- let req = self.webhdfs_list_status_req(path)?;
+ let req = self.webhdfs_list_status_request(path)?;
let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
- let body_bs = resp.into_body().bytes().await?;
- let file_statuses =
- serde_json::from_reader::<_,
FileStatusesWrapper>(body_bs.reader())
- .map_err(|e| {
- Error::new(ErrorKind::Unexpected, "cannot parse
returned json")
- .with_context("service", Scheme::Webhdfs)
- .set_source(e)
- })?
- .file_statuses
- .file_status;
+ let bs = resp.into_body().bytes().await?;
+ let file_statuses =
serde_json::from_slice::<FileStatusesWrapper>(&bs)
+ .map_err(new_json_deserialize_error)?
+ .file_statuses
+ .file_status;
let objects = WebhdfsPager::new(path, file_statuses);
Ok((RpList::default(), objects))
diff --git a/core/src/services/webhdfs/message.rs
b/core/src/services/webhdfs/message.rs
index 9abc4385..e5008830 100644
--- a/core/src/services/webhdfs/message.rs
+++ b/core/src/services/webhdfs/message.rs
@@ -15,13 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! HTTP response messages
+//! WebHDFS response messages
use serde::Deserialize;
-use crate::raw::*;
-use crate::*;
-
#[derive(Debug, Deserialize)]
pub(super) struct BooleanResp {
pub boolean: bool,
@@ -56,22 +53,6 @@ pub struct FileStatus {
pub ty: FileStatusType,
}
-impl TryFrom<FileStatus> for Metadata {
- type Error = Error;
- fn try_from(value: FileStatus) -> Result<Self> {
- let mut meta = match value.ty {
- FileStatusType::Directory => Metadata::new(EntryMode::DIR),
- FileStatusType::File => Metadata::new(EntryMode::FILE),
- };
-
- meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
- value.modification_time,
- )?)
- .set_content_length(value.length);
- Ok(meta)
- }
-}
-
#[derive(Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FileStatusType {
diff --git a/core/src/services/webhdfs/pager.rs
b/core/src/services/webhdfs/pager.rs
index 17fc626a..e58330cd 100644
--- a/core/src/services/webhdfs/pager.rs
+++ b/core/src/services/webhdfs/pager.rs
@@ -17,7 +17,7 @@
use async_trait::async_trait;
-use super::message::FileStatus;
+use super::message::{FileStatus, FileStatusType};
use crate::raw::*;
use crate::*;
@@ -51,7 +51,18 @@ impl oio::Page for WebhdfsPager {
format!("{}/{}", self.path, status.path_suffix)
};
- let meta: Metadata = status.try_into()?;
+ let meta = match status.ty {
+ FileStatusType::Directory => Metadata::new(EntryMode::DIR),
+ FileStatusType::File => Metadata::new(EntryMode::FILE)
+ .with_content_length(status.length)
+
.with_last_modified(parse_datetime_from_from_timestamp_millis(
+ status.modification_time,
+ )?),
+ };
+
+ if meta.mode().is_file() {
+ path = path.trim_end_matches('/').to_string();
+ }
if meta.mode().is_dir() {
path += "/"
}
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 8fea27a6..1bdf7bd9 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for WebhdfsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let req = self
.backend
- .webhdfs_create_object_req(
+ .webhdfs_create_object_request(
&self.path,
Some(bs.len()),
self.op.content_type(),