This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 84f7d1f11 refactor(services/webhdfs): Refactor raw request send in
writer and backend (#6113)
84f7d1f11 is described below
commit 84f7d1f110eaa5e725bdd8b17ba6db2c15c26d4c
Author: Jorge Hermo <[email protected]>
AuthorDate: Mon Apr 28 07:05:14 2025 +0200
refactor(services/webhdfs): Refactor raw request send in writer and backend
(#6113)
---
core/src/services/webhdfs/backend.rs | 6 +-
core/src/services/webhdfs/core.rs | 148 ++++++++++++++++++-----------------
core/src/services/webhdfs/lister.rs | 4 +-
core/src/services/webhdfs/writer.rs | 30 +++----
4 files changed, 90 insertions(+), 98 deletions(-)
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 40faab27b..fd2de26e2 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -272,12 +272,9 @@ impl Access for WebhdfsBackend {
/// Create a file or directory
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
- let req = self.core.webhdfs_create_dir_request(path)?;
-
- let resp = self.info().http_client().send(req).await?;
+ let resp = self.core.webhdfs_create_dir(path).await?;
let status = resp.status();
-
// WebHDFS's has a two-step create/append to prevent clients to send
out
// data before creating it.
// According to the redirect policy of `reqwest` HTTP Client we are
using,
@@ -339,7 +336,6 @@ impl Access for WebhdfsBackend {
let resp = self.core.webhdfs_read_file(path, args.range()).await?;
let status = resp.status();
-
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
Ok((RpRead::default(), resp.into_body()))
diff --git a/core/src/services/webhdfs/core.rs
b/core/src/services/webhdfs/core.rs
index 99605a7f9..c87d14a9d 100644
--- a/core/src/services/webhdfs/core.rs
+++ b/core/src/services/webhdfs/core.rs
@@ -54,7 +54,7 @@ impl Debug for WebhdfsCore {
}
impl WebhdfsCore {
- pub fn webhdfs_create_dir_request(&self, path: &str) ->
Result<Request<Buffer>> {
+ pub async fn webhdfs_create_dir(&self, path: &str) ->
Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
@@ -71,19 +71,22 @@ impl WebhdfsCore {
let req = Request::put(&url);
- req.extension(Operation::CreateDir)
+ let req = req
+ .extension(Operation::CreateDir)
.body(Buffer::new())
- .map_err(new_request_build_error)
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
}
/// create object
- pub async fn webhdfs_create_object_request(
+ pub async fn webhdfs_create_object(
&self,
path: &str,
size: Option<u64>,
args: &OpWrite,
body: Buffer,
- ) -> Result<Request<Buffer>> {
+ ) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
@@ -132,10 +135,34 @@ impl WebhdfsCore {
.body(body)
.map_err(new_request_build_error)?;
- Ok(req)
+ self.info.http_client().send(req).await
}
- pub async fn webhdfs_init_append_request(&self, path: &str) ->
Result<String> {
+ pub async fn webhdfs_rename_object(&self, from: &str, to: &str) ->
Result<Response<Buffer>> {
+ let from = build_abs_path(&self.root, from);
+ let to = build_rooted_abs_path(&self.root, to);
+
+ let mut url = format!(
+ "{}/webhdfs/v1/{}?op=RENAME&destination={}",
+ self.endpoint,
+ percent_encode_path(&from),
+ percent_encode_path(&to)
+ );
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
+ if let Some(auth) = &self.auth {
+ url += &format!("&{auth}");
+ }
+
+ let req = Request::put(&url)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
"{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
@@ -170,38 +197,13 @@ impl WebhdfsCore {
}
}
- pub async fn webhdfs_rename_object(&self, from: &str, to: &str) ->
Result<Response<Buffer>> {
- let from = build_abs_path(&self.root, from);
- let to = build_rooted_abs_path(&self.root, to);
-
- let mut url = format!(
- "{}/webhdfs/v1/{}?op=RENAME&destination={}",
- self.endpoint,
- percent_encode_path(&from),
- percent_encode_path(&to)
- );
- if let Some(user) = &self.user_name {
- url += format!("&user.name={user}").as_str();
- }
- if let Some(auth) = &self.auth {
- url += &format!("&{auth}");
- }
-
- let req = Request::put(&url)
- .extension(Operation::Rename)
- .body(Buffer::new())
- .map_err(new_request_build_error)?;
-
- self.info.http_client().send(req).await
- }
-
- pub fn webhdfs_append_request(
+ pub async fn webhdfs_append(
&self,
- location: &str,
+ path: &str,
size: u64,
body: Buffer,
- ) -> Result<Request<Buffer>> {
- let mut url = location.to_string();
+ ) -> Result<Response<Buffer>> {
+ let mut url = self.webhdfs_init_append(path).await?;
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
@@ -213,17 +215,20 @@ impl WebhdfsCore {
req = req.header(CONTENT_LENGTH, size.to_string());
- req.extension(Operation::Write)
+ let req = req
+ .extension(Operation::Write)
.body(body)
- .map_err(new_request_build_error)
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
}
/// CONCAT will concat sources to the path
- pub fn webhdfs_concat_request(
+ pub async fn webhdfs_concat(
&self,
path: &str,
sources: Vec<String>,
- ) -> Result<Request<Buffer>> {
+ ) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let sources = sources
@@ -247,15 +252,18 @@ impl WebhdfsCore {
let req = Request::post(url);
- req.extension(Operation::Write)
+ let req = req
+ .extension(Operation::Write)
.body(Buffer::new())
- .map_err(new_request_build_error)
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
}
- fn webhdfs_open_request(&self, path: &str, range: &BytesRange) ->
Result<Request<Buffer>> {
+ pub async fn webhdfs_list_status(&self, path: &str) ->
Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
let mut url = format!(
- "{}/webhdfs/v1/{}?op=OPEN",
+ "{}/webhdfs/v1/{}?op=LISTSTATUS",
self.endpoint,
percent_encode_path(&p),
);
@@ -263,31 +271,30 @@ impl WebhdfsCore {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
- url += &format!("&{auth}");
- }
-
- if !range.is_full() {
- url += &format!("&offset={}", range.offset());
- if let Some(size) = range.size() {
- url += &format!("&length={size}")
- }
+ url += format!("&{auth}").as_str();
}
let req = Request::get(&url)
- .extension(Operation::Read)
.body(Buffer::new())
.map_err(new_request_build_error)?;
-
- Ok(req)
+ self.info.http_client().send(req).await
}
- pub async fn webhdfs_list_status_request(&self, path: &str) ->
Result<Response<Buffer>> {
+ pub async fn webhdfs_list_status_batch(
+ &self,
+ path: &str,
+ start_after: &str,
+ ) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
+
let mut url = format!(
- "{}/webhdfs/v1/{}?op=LISTSTATUS",
+ "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
self.endpoint,
percent_encode_path(&p),
);
+ if !start_after.is_empty() {
+ url += format!("&startAfter={}", start_after).as_str();
+ }
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
@@ -296,39 +303,38 @@ impl WebhdfsCore {
}
let req = Request::get(&url)
- .extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
self.info.http_client().send(req).await
}
- pub async fn webhdfs_list_status_batch_request(
- &self,
- path: &str,
- start_after: &str,
- ) -> Result<Response<Buffer>> {
+ fn webhdfs_open_request(&self, path: &str, range: &BytesRange) ->
Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);
-
let mut url = format!(
- "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
+ "{}/webhdfs/v1/{}?op=OPEN",
self.endpoint,
percent_encode_path(&p),
);
- if !start_after.is_empty() {
- url += format!("&startAfter={}", start_after).as_str();
- }
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
- url += format!("&{auth}").as_str();
+ url += &format!("&{auth}");
+ }
+
+ if !range.is_full() {
+ url += &format!("&offset={}", range.offset());
+ if let Some(size) = range.size() {
+ url += &format!("&length={size}")
+ }
}
let req = Request::get(&url)
- .extension(Operation::List)
+ .extension(Operation::Read)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.info.http_client().send(req).await
+
+ Ok(req)
}
pub async fn webhdfs_read_file(
diff --git a/core/src/services/webhdfs/lister.rs
b/core/src/services/webhdfs/lister.rs
index 319ae0e3f..1d8228d79 100644
--- a/core/src/services/webhdfs/lister.rs
+++ b/core/src/services/webhdfs/lister.rs
@@ -43,7 +43,7 @@ impl WebhdfsLister {
impl oio::PageList for WebhdfsLister {
async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
let file_status = if self.core.disable_list_batch {
- let resp =
self.core.webhdfs_list_status_request(&self.path).await?;
+ let resp = self.core.webhdfs_list_status(&self.path).await?;
match resp.status() {
StatusCode::OK => {
ctx.done = true;
@@ -67,7 +67,7 @@ impl oio::PageList for WebhdfsLister {
} else {
let resp = self
.core
- .webhdfs_list_status_batch_request(&self.path, &ctx.token)
+ .webhdfs_list_status_batch(&self.path, &ctx.token)
.await?;
match resp.status() {
StatusCode::OK => {
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 573cdcc77..fa5c335cf 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -45,13 +45,11 @@ impl WebhdfsWriter {
impl oio::BlockWrite for WebhdfsWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
- let req = self
+ let resp = self
.core
- .webhdfs_create_object_request(&self.path, Some(size), &self.op,
body)
+ .webhdfs_create_object(&self.path, Some(size), &self.op, body)
.await?;
- let resp = self.core.info.http_client().send(req).await?;
-
let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
@@ -66,9 +64,9 @@ impl oio::BlockWrite for WebhdfsWriter {
"write multi is not supported when atomic is not set",
));
};
- let req = self
+ let resp = self
.core
- .webhdfs_create_object_request(
+ .webhdfs_create_object(
&format!("{}{}", atomic_write_dir, block_id),
Some(size),
&self.op,
@@ -76,8 +74,6 @@ impl oio::BlockWrite for WebhdfsWriter {
)
.await?;
- let resp = self.core.info.http_client().send(req).await?;
-
let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => Ok(()),
@@ -99,18 +95,16 @@ impl oio::BlockWrite for WebhdfsWriter {
.map(|s| format!("{}{}", atomic_write_dir, s))
.collect();
// concat blocks
- let req = self.core.webhdfs_concat_request(&first_block_id,
sources)?;
-
- let resp = self.core.info.http_client().send(req).await?;
+ let resp = self.core.webhdfs_concat(&first_block_id,
sources).await?;
let status = resp.status();
-
if status != StatusCode::OK {
return Err(parse_error(resp));
}
}
// delete the path file
let resp = self.core.webhdfs_delete(&self.path).await?;
+
let status = resp.status();
if status != StatusCode::OK {
return Err(parse_error(resp));
@@ -123,7 +117,6 @@ impl oio::BlockWrite for WebhdfsWriter {
.await?;
let status = resp.status();
-
match status {
StatusCode::OK => Ok(Metadata::default()),
_ => Err(parse_error(resp)),
@@ -145,6 +138,7 @@ impl oio::BlockWrite for WebhdfsWriter {
impl oio::AppendWrite for WebhdfsWriter {
async fn offset(&self) -> Result<u64> {
let resp = self.core.webhdfs_get_file_status(&self.path).await?;
+
let status = resp.status();
match status {
StatusCode::OK => {
@@ -156,14 +150,12 @@ impl oio::AppendWrite for WebhdfsWriter {
Ok(file_status.length)
}
StatusCode::NOT_FOUND => {
- let req = self
+ let resp = self
.core
- .webhdfs_create_object_request(&self.path, None, &self.op,
Buffer::new())
+ .webhdfs_create_object(&self.path, None, &self.op,
Buffer::new())
.await?;
- let resp = self.core.info.http_client().send(req).await?;
let status = resp.status();
-
match status {
StatusCode::CREATED | StatusCode::OK => Ok(0),
_ => Err(parse_error(resp)),
@@ -174,9 +166,7 @@ impl oio::AppendWrite for WebhdfsWriter {
}
async fn append(&self, _offset: u64, size: u64, body: Buffer) ->
Result<Metadata> {
- let location =
self.core.webhdfs_init_append_request(&self.path).await?;
- let req = self.core.webhdfs_append_request(&location, size, body)?;
- let resp = self.core.info.http_client().send(req).await?;
+ let resp = self.core.webhdfs_append(&self.path, size, body).await?;
let status = resp.status();
match status {