This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 84f7d1f11 refactor(services/webhdfs): Refactor raw request send in 
writer and backend (#6113)
84f7d1f11 is described below

commit 84f7d1f110eaa5e725bdd8b17ba6db2c15c26d4c
Author: Jorge Hermo <[email protected]>
AuthorDate: Mon Apr 28 07:05:14 2025 +0200

    refactor(services/webhdfs): Refactor raw request send in writer and backend 
(#6113)
---
 core/src/services/webhdfs/backend.rs |   6 +-
 core/src/services/webhdfs/core.rs    | 148 ++++++++++++++++++-----------------
 core/src/services/webhdfs/lister.rs  |   4 +-
 core/src/services/webhdfs/writer.rs  |  30 +++----
 4 files changed, 90 insertions(+), 98 deletions(-)

diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 40faab27b..fd2de26e2 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -272,12 +272,9 @@ impl Access for WebhdfsBackend {
 
     /// Create a file or directory
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let req = self.core.webhdfs_create_dir_request(path)?;
-
-        let resp = self.info().http_client().send(req).await?;
+        let resp = self.core.webhdfs_create_dir(path).await?;
 
         let status = resp.status();
-
         // WebHDFS's has a two-step create/append to prevent clients to send 
out
         // data before creating it.
         // According to the redirect policy of `reqwest` HTTP Client we are 
using,
@@ -339,7 +336,6 @@ impl Access for WebhdfsBackend {
         let resp = self.core.webhdfs_read_file(path, args.range()).await?;
 
         let status = resp.status();
-
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 Ok((RpRead::default(), resp.into_body()))
diff --git a/core/src/services/webhdfs/core.rs 
b/core/src/services/webhdfs/core.rs
index 99605a7f9..c87d14a9d 100644
--- a/core/src/services/webhdfs/core.rs
+++ b/core/src/services/webhdfs/core.rs
@@ -54,7 +54,7 @@ impl Debug for WebhdfsCore {
 }
 
 impl WebhdfsCore {
-    pub fn webhdfs_create_dir_request(&self, path: &str) -> 
Result<Request<Buffer>> {
+    pub async fn webhdfs_create_dir(&self, path: &str) -> 
Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
         let mut url = format!(
@@ -71,19 +71,22 @@ impl WebhdfsCore {
 
         let req = Request::put(&url);
 
-        req.extension(Operation::CreateDir)
+        let req = req
+            .extension(Operation::CreateDir)
             .body(Buffer::new())
-            .map_err(new_request_build_error)
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
     }
 
     /// create object
-    pub async fn webhdfs_create_object_request(
+    pub async fn webhdfs_create_object(
         &self,
         path: &str,
         size: Option<u64>,
         args: &OpWrite,
         body: Buffer,
-    ) -> Result<Request<Buffer>> {
+    ) -> Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
         let mut url = format!(
@@ -132,10 +135,34 @@ impl WebhdfsCore {
             .body(body)
             .map_err(new_request_build_error)?;
 
-        Ok(req)
+        self.info.http_client().send(req).await
     }
 
-    pub async fn webhdfs_init_append_request(&self, path: &str) -> 
Result<String> {
+    pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> 
Result<Response<Buffer>> {
+        let from = build_abs_path(&self.root, from);
+        let to = build_rooted_abs_path(&self.root, to);
+
+        let mut url = format!(
+            "{}/webhdfs/v1/{}?op=RENAME&destination={}",
+            self.endpoint,
+            percent_encode_path(&from),
+            percent_encode_path(&to)
+        );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
+        if let Some(auth) = &self.auth {
+            url += &format!("&{auth}");
+        }
+
+        let req = Request::put(&url)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    async fn webhdfs_init_append(&self, path: &str) -> Result<String> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=APPEND&noredirect=true",
@@ -170,38 +197,13 @@ impl WebhdfsCore {
         }
     }
 
-    pub async fn webhdfs_rename_object(&self, from: &str, to: &str) -> 
Result<Response<Buffer>> {
-        let from = build_abs_path(&self.root, from);
-        let to = build_rooted_abs_path(&self.root, to);
-
-        let mut url = format!(
-            "{}/webhdfs/v1/{}?op=RENAME&destination={}",
-            self.endpoint,
-            percent_encode_path(&from),
-            percent_encode_path(&to)
-        );
-        if let Some(user) = &self.user_name {
-            url += format!("&user.name={user}").as_str();
-        }
-        if let Some(auth) = &self.auth {
-            url += &format!("&{auth}");
-        }
-
-        let req = Request::put(&url)
-            .extension(Operation::Rename)
-            .body(Buffer::new())
-            .map_err(new_request_build_error)?;
-
-        self.info.http_client().send(req).await
-    }
-
-    pub fn webhdfs_append_request(
+    pub async fn webhdfs_append(
         &self,
-        location: &str,
+        path: &str,
         size: u64,
         body: Buffer,
-    ) -> Result<Request<Buffer>> {
-        let mut url = location.to_string();
+    ) -> Result<Response<Buffer>> {
+        let mut url = self.webhdfs_init_append(path).await?;
         if let Some(user) = &self.user_name {
             url += format!("&user.name={user}").as_str();
         }
@@ -213,17 +215,20 @@ impl WebhdfsCore {
 
         req = req.header(CONTENT_LENGTH, size.to_string());
 
-        req.extension(Operation::Write)
+        let req = req
+            .extension(Operation::Write)
             .body(body)
-            .map_err(new_request_build_error)
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
     }
 
     /// CONCAT will concat sources to the path
-    pub fn webhdfs_concat_request(
+    pub async fn webhdfs_concat(
         &self,
         path: &str,
         sources: Vec<String>,
-    ) -> Result<Request<Buffer>> {
+    ) -> Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
         let sources = sources
@@ -247,15 +252,18 @@ impl WebhdfsCore {
 
         let req = Request::post(url);
 
-        req.extension(Operation::Write)
+        let req = req
+            .extension(Operation::Write)
             .body(Buffer::new())
-            .map_err(new_request_build_error)
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
     }
 
-    fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> 
Result<Request<Buffer>> {
+    pub async fn webhdfs_list_status(&self, path: &str) -> 
Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
-            "{}/webhdfs/v1/{}?op=OPEN",
+            "{}/webhdfs/v1/{}?op=LISTSTATUS",
             self.endpoint,
             percent_encode_path(&p),
         );
@@ -263,31 +271,30 @@ impl WebhdfsCore {
             url += format!("&user.name={user}").as_str();
         }
         if let Some(auth) = &self.auth {
-            url += &format!("&{auth}");
-        }
-
-        if !range.is_full() {
-            url += &format!("&offset={}", range.offset());
-            if let Some(size) = range.size() {
-                url += &format!("&length={size}")
-            }
+            url += format!("&{auth}").as_str();
         }
 
         let req = Request::get(&url)
-            .extension(Operation::Read)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
-
-        Ok(req)
+        self.info.http_client().send(req).await
     }
 
-    pub async fn webhdfs_list_status_request(&self, path: &str) -> 
Result<Response<Buffer>> {
+    pub async fn webhdfs_list_status_batch(
+        &self,
+        path: &str,
+        start_after: &str,
+    ) -> Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
+
         let mut url = format!(
-            "{}/webhdfs/v1/{}?op=LISTSTATUS",
+            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
             self.endpoint,
             percent_encode_path(&p),
         );
+        if !start_after.is_empty() {
+            url += format!("&startAfter={}", start_after).as_str();
+        }
         if let Some(user) = &self.user_name {
             url += format!("&user.name={user}").as_str();
         }
@@ -296,39 +303,38 @@ impl WebhdfsCore {
         }
 
         let req = Request::get(&url)
-            .extension(Operation::List)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
         self.info.http_client().send(req).await
     }
 
-    pub async fn webhdfs_list_status_batch_request(
-        &self,
-        path: &str,
-        start_after: &str,
-    ) -> Result<Response<Buffer>> {
+    fn webhdfs_open_request(&self, path: &str, range: &BytesRange) -> 
Result<Request<Buffer>> {
         let p = build_abs_path(&self.root, path);
-
         let mut url = format!(
-            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
+            "{}/webhdfs/v1/{}?op=OPEN",
             self.endpoint,
             percent_encode_path(&p),
         );
-        if !start_after.is_empty() {
-            url += format!("&startAfter={}", start_after).as_str();
-        }
         if let Some(user) = &self.user_name {
             url += format!("&user.name={user}").as_str();
         }
         if let Some(auth) = &self.auth {
-            url += format!("&{auth}").as_str();
+            url += &format!("&{auth}");
+        }
+
+        if !range.is_full() {
+            url += &format!("&offset={}", range.offset());
+            if let Some(size) = range.size() {
+                url += &format!("&length={size}")
+            }
         }
 
         let req = Request::get(&url)
-            .extension(Operation::List)
+            .extension(Operation::Read)
             .body(Buffer::new())
             .map_err(new_request_build_error)?;
-        self.info.http_client().send(req).await
+
+        Ok(req)
     }
 
     pub async fn webhdfs_read_file(
diff --git a/core/src/services/webhdfs/lister.rs 
b/core/src/services/webhdfs/lister.rs
index 319ae0e3f..1d8228d79 100644
--- a/core/src/services/webhdfs/lister.rs
+++ b/core/src/services/webhdfs/lister.rs
@@ -43,7 +43,7 @@ impl WebhdfsLister {
 impl oio::PageList for WebhdfsLister {
     async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let file_status = if self.core.disable_list_batch {
-            let resp = 
self.core.webhdfs_list_status_request(&self.path).await?;
+            let resp = self.core.webhdfs_list_status(&self.path).await?;
             match resp.status() {
                 StatusCode::OK => {
                     ctx.done = true;
@@ -67,7 +67,7 @@ impl oio::PageList for WebhdfsLister {
         } else {
             let resp = self
                 .core
-                .webhdfs_list_status_batch_request(&self.path, &ctx.token)
+                .webhdfs_list_status_batch(&self.path, &ctx.token)
                 .await?;
             match resp.status() {
                 StatusCode::OK => {
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 573cdcc77..fa5c335cf 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -45,13 +45,11 @@ impl WebhdfsWriter {
 
 impl oio::BlockWrite for WebhdfsWriter {
     async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
-        let req = self
+        let resp = self
             .core
-            .webhdfs_create_object_request(&self.path, Some(size), &self.op, 
body)
+            .webhdfs_create_object(&self.path, Some(size), &self.op, body)
             .await?;
 
-        let resp = self.core.info.http_client().send(req).await?;
-
         let status = resp.status();
         match status {
             StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
@@ -66,9 +64,9 @@ impl oio::BlockWrite for WebhdfsWriter {
                 "write multi is not supported when atomic is not set",
             ));
         };
-        let req = self
+        let resp = self
             .core
-            .webhdfs_create_object_request(
+            .webhdfs_create_object(
                 &format!("{}{}", atomic_write_dir, block_id),
                 Some(size),
                 &self.op,
@@ -76,8 +74,6 @@ impl oio::BlockWrite for WebhdfsWriter {
             )
             .await?;
 
-        let resp = self.core.info.http_client().send(req).await?;
-
         let status = resp.status();
         match status {
             StatusCode::CREATED | StatusCode::OK => Ok(()),
@@ -99,18 +95,16 @@ impl oio::BlockWrite for WebhdfsWriter {
                 .map(|s| format!("{}{}", atomic_write_dir, s))
                 .collect();
             // concat blocks
-            let req = self.core.webhdfs_concat_request(&first_block_id, 
sources)?;
-
-            let resp = self.core.info.http_client().send(req).await?;
+            let resp = self.core.webhdfs_concat(&first_block_id, 
sources).await?;
 
             let status = resp.status();
-
             if status != StatusCode::OK {
                 return Err(parse_error(resp));
             }
         }
         // delete the path file
         let resp = self.core.webhdfs_delete(&self.path).await?;
+
         let status = resp.status();
         if status != StatusCode::OK {
             return Err(parse_error(resp));
@@ -123,7 +117,6 @@ impl oio::BlockWrite for WebhdfsWriter {
             .await?;
 
         let status = resp.status();
-
         match status {
             StatusCode::OK => Ok(Metadata::default()),
             _ => Err(parse_error(resp)),
@@ -145,6 +138,7 @@ impl oio::BlockWrite for WebhdfsWriter {
 impl oio::AppendWrite for WebhdfsWriter {
     async fn offset(&self) -> Result<u64> {
         let resp = self.core.webhdfs_get_file_status(&self.path).await?;
+
         let status = resp.status();
         match status {
             StatusCode::OK => {
@@ -156,14 +150,12 @@ impl oio::AppendWrite for WebhdfsWriter {
                 Ok(file_status.length)
             }
             StatusCode::NOT_FOUND => {
-                let req = self
+                let resp = self
                     .core
-                    .webhdfs_create_object_request(&self.path, None, &self.op, 
Buffer::new())
+                    .webhdfs_create_object(&self.path, None, &self.op, 
Buffer::new())
                     .await?;
 
-                let resp = self.core.info.http_client().send(req).await?;
                 let status = resp.status();
-
                 match status {
                     StatusCode::CREATED | StatusCode::OK => Ok(0),
                     _ => Err(parse_error(resp)),
@@ -174,9 +166,7 @@ impl oio::AppendWrite for WebhdfsWriter {
     }
 
     async fn append(&self, _offset: u64, size: u64, body: Buffer) -> 
Result<Metadata> {
-        let location = 
self.core.webhdfs_init_append_request(&self.path).await?;
-        let req = self.core.webhdfs_append_request(&location, size, body)?;
-        let resp = self.core.info.http_client().send(req).await?;
+        let resp = self.core.webhdfs_append(&self.path, size, body).await?;
 
         let status = resp.status();
         match status {

Reply via email to