This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch add-user-for-webhdfs
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 81af05b19d12f9f9ef5ad554c3cec92628c03c1b
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 22 11:42:07 2025 +0800

    feat(services/webdfs): Add user.name support for webhdfs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/webhdfs/backend.rs | 50 +++++++++++++++++++++++++++++++++---
 core/src/services/webhdfs/config.rs  |  3 +++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 75d2240b1..0b1961e5a 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -101,6 +101,16 @@ impl WebhdfsBuilder {
         self
     }
 
+    /// Set the username of this backend,
+    /// used for authentication
+    ///
+    pub fn username(mut self, username: &str) -> Self {
+        if !username.is_empty() {
+            self.config.user_name = Some(username.to_string());
+        }
+        self
+    }
+
     /// Set the delegation token of this backend,
     /// used for authentication
     ///
@@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder {
         let backend = WebhdfsBackend {
             root,
             endpoint,
+            user_name: self.config.user_name,
             auth,
             client,
             root_checker: OnceCell::new(),
@@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder {
 pub struct WebhdfsBackend {
     root: String,
     endpoint: String,
+    user_name: Option<String>,
     auth: Option<String>,
     root_checker: OnceCell<()>,
 
@@ -212,6 +224,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -220,6 +235,7 @@ impl WebhdfsBackend {
 
         req.body(Buffer::new()).map_err(new_request_build_error)
     }
+
     /// create object
     pub async fn webhdfs_create_object_request(
         &self,
@@ -235,6 +251,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -277,6 +296,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -311,7 +333,9 @@ impl WebhdfsBackend {
             percent_encode_path(&from),
             percent_encode_path(&to)
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -330,7 +354,9 @@ impl WebhdfsBackend {
         body: Buffer,
     ) -> Result<Request<Buffer>> {
         let mut url = location.to_string();
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -362,7 +388,9 @@ impl WebhdfsBackend {
             percent_encode_path(&p),
             percent_encode_path(&sources),
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -379,6 +407,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -404,6 +435,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -429,6 +463,9 @@ impl WebhdfsBackend {
         if !start_after.is_empty() {
             url += format!("&startAfter={}", start_after).as_str();
         }
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -455,7 +492,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -474,6 +513,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
diff --git a/core/src/services/webhdfs/config.rs 
b/core/src/services/webhdfs/config.rs
index 168ea7d0f..03dadfcaf 100644
--- a/core/src/services/webhdfs/config.rs
+++ b/core/src/services/webhdfs/config.rs
@@ -30,6 +30,8 @@ pub struct WebhdfsConfig {
     pub root: Option<String>,
     /// Endpoint for webhdfs.
     pub endpoint: Option<String>,
+    /// Name of the user for webhdfs.
+    pub user_name: Option<String>,
     /// Delegation token for webhdfs.
     pub delegation: Option<String>,
     /// Disable batch listing
@@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig {
         f.debug_struct("WebhdfsConfig")
             .field("root", &self.root)
             .field("endpoint", &self.endpoint)
+            .field("user_name", &self.user_name)
             .field("atomic_write_dir", &self.atomic_write_dir)
             .finish_non_exhaustive()
     }

Reply via email to