This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b8a3b7aa0 feat(services/webdfs): Add user.name support for webhdfs 
(#5567)
b8a3b7aa0 is described below

commit b8a3b7aa093d8695d89d579e146907850bb6565c
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 22 12:09:22 2025 +0800

    feat(services/webdfs): Add user.name support for webhdfs (#5567)
---
 .../webhdfs/webhdfs_with_user_name/action.yml      | 37 ++++++++++++++++
 core/src/services/webhdfs/backend.rs               | 50 ++++++++++++++++++++--
 core/src/services/webhdfs/config.rs                |  3 ++
 3 files changed, 86 insertions(+), 4 deletions(-)

diff --git a/.github/services/webhdfs/webhdfs_with_user_name/action.yml 
b/.github/services/webhdfs/webhdfs_with_user_name/action.yml
new file mode 100644
index 000000000..842ec48cb
--- /dev/null
+++ b/.github/services/webhdfs/webhdfs_with_user_name/action.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: webhdfs
+description: "Behavior test for webhdfs with user name specified"
+
+runs:
+  using: "composite"
+  steps:
+    - name: Setup webhdfs
+      shell: bash
+      working-directory: fixtures/webhdfs
+      run: |
+        docker compose -f docker-compose-webhdfs.yml up -d --wait
+    - name: Setup
+      shell: bash
+      run: |
+        cat << EOF >> $GITHUB_ENV
+        OPENDAL_WEBHDFS_ROOT=/
+        OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
+        OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
+        OPENDAL_WEBHDFS_USER_NAME=root
+        EOF
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 75d2240b1..afd165afd 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -101,6 +101,16 @@ impl WebhdfsBuilder {
         self
     }
 
+    /// Set the username of this backend,
+    /// used for authentication
+    ///
+    pub fn user_name(mut self, user_name: &str) -> Self {
+        if !user_name.is_empty() {
+            self.config.user_name = Some(user_name.to_string());
+        }
+        self
+    }
+
     /// Set the delegation token of this backend,
     /// used for authentication
     ///
@@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder {
         let backend = WebhdfsBackend {
             root,
             endpoint,
+            user_name: self.config.user_name,
             auth,
             client,
             root_checker: OnceCell::new(),
@@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder {
 pub struct WebhdfsBackend {
     root: String,
     endpoint: String,
+    user_name: Option<String>,
     auth: Option<String>,
     root_checker: OnceCell<()>,
 
@@ -212,6 +224,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -220,6 +235,7 @@ impl WebhdfsBackend {
 
         req.body(Buffer::new()).map_err(new_request_build_error)
     }
+
     /// create object
     pub async fn webhdfs_create_object_request(
         &self,
@@ -235,6 +251,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -277,6 +296,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -311,7 +333,9 @@ impl WebhdfsBackend {
             percent_encode_path(&from),
             percent_encode_path(&to)
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -330,7 +354,9 @@ impl WebhdfsBackend {
         body: Buffer,
     ) -> Result<Request<Buffer>> {
         let mut url = location.to_string();
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -362,7 +388,9 @@ impl WebhdfsBackend {
             percent_encode_path(&p),
             percent_encode_path(&sources),
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -379,6 +407,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += &format!("&{auth}");
         }
@@ -404,6 +435,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -429,6 +463,9 @@ impl WebhdfsBackend {
         if !start_after.is_empty() {
             url += format!("&startAfter={}", start_after).as_str();
         }
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -455,7 +492,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
-
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -474,6 +513,9 @@ impl WebhdfsBackend {
             self.endpoint,
             percent_encode_path(&p),
         );
+        if let Some(user) = &self.user_name {
+            url += format!("&user.name={user}").as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
diff --git a/core/src/services/webhdfs/config.rs 
b/core/src/services/webhdfs/config.rs
index 168ea7d0f..03dadfcaf 100644
--- a/core/src/services/webhdfs/config.rs
+++ b/core/src/services/webhdfs/config.rs
@@ -30,6 +30,8 @@ pub struct WebhdfsConfig {
     pub root: Option<String>,
     /// Endpoint for webhdfs.
     pub endpoint: Option<String>,
+    /// Name of the user for webhdfs.
+    pub user_name: Option<String>,
     /// Delegation token for webhdfs.
     pub delegation: Option<String>,
     /// Disable batch listing
@@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig {
         f.debug_struct("WebhdfsConfig")
             .field("root", &self.root)
             .field("endpoint", &self.endpoint)
+            .field("user_name", &self.user_name)
             .field("atomic_write_dir", &self.atomic_write_dir)
             .finish_non_exhaustive()
     }

Reply via email to