This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch add-user-for-webhdfs in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 81af05b19d12f9f9ef5ad554c3cec92628c03c1b Author: Xuanwo <[email protected]> AuthorDate: Wed Jan 22 11:42:07 2025 +0800 feat(services/webdfs): Add user.name support for webhdfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/webhdfs/backend.rs | 50 +++++++++++++++++++++++++++++++++--- core/src/services/webhdfs/config.rs | 3 +++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 75d2240b1..0b1961e5a 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -101,6 +101,16 @@ impl WebhdfsBuilder { self } + /// Set the username of this backend, + /// used for authentication + /// + pub fn username(mut self, username: &str) -> Self { + if !username.is_empty() { + self.config.user_name = Some(username.to_string()); + } + self + } + /// Set the delegation token of this backend, /// used for authentication /// @@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder { let backend = WebhdfsBackend { root, endpoint, + user_name: self.config.user_name, auth, client, root_checker: OnceCell::new(), @@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder { pub struct WebhdfsBackend { root: String, endpoint: String, + user_name: Option<String>, auth: Option<String>, root_checker: OnceCell<()>, @@ -212,6 +224,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -220,6 +235,7 @@ impl WebhdfsBackend { req.body(Buffer::new()).map_err(new_request_build_error) } + /// create object pub async fn webhdfs_create_object_request( &self, @@ -235,6 +251,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -277,6 +296,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -311,7 +333,9 @@ impl WebhdfsBackend { percent_encode_path(&from), percent_encode_path(&to) ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -330,7 +354,9 @@ impl WebhdfsBackend { body: Buffer, ) -> Result<Request<Buffer>> { let mut url = location.to_string(); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -362,7 +388,9 @@ impl WebhdfsBackend { percent_encode_path(&p), percent_encode_path(&sources), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -379,6 +407,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -404,6 +435,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -429,6 +463,9 @@ impl WebhdfsBackend { if !start_after.is_empty() { url += format!("&startAfter={}", start_after).as_str(); } + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -455,7 +492,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -474,6 +513,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } diff --git a/core/src/services/webhdfs/config.rs b/core/src/services/webhdfs/config.rs index 168ea7d0f..03dadfcaf 100644 --- a/core/src/services/webhdfs/config.rs +++ b/core/src/services/webhdfs/config.rs @@ -30,6 +30,8 @@ pub struct WebhdfsConfig { pub root: Option<String>, /// Endpoint for webhdfs. pub endpoint: Option<String>, + /// Name of the user for webhdfs. + pub user_name: Option<String>, /// Delegation token for webhdfs. pub delegation: Option<String>, /// Disable batch listing @@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig { f.debug_struct("WebhdfsConfig") .field("root", &self.root) .field("endpoint", &self.endpoint) + .field("user_name", &self.user_name) .field("atomic_write_dir", &self.atomic_write_dir) .finish_non_exhaustive() }
