This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b8a3b7aa0 feat(services/webdfs): Add user.name support for webhdfs
(#5567)
b8a3b7aa0 is described below
commit b8a3b7aa093d8695d89d579e146907850bb6565c
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jan 22 12:09:22 2025 +0800
feat(services/webdfs): Add user.name support for webhdfs (#5567)
---
.../webhdfs/webhdfs_with_user_name/action.yml | 37 ++++++++++++++++
core/src/services/webhdfs/backend.rs | 50 ++++++++++++++++++++--
core/src/services/webhdfs/config.rs | 3 ++
3 files changed, 86 insertions(+), 4 deletions(-)
diff --git a/.github/services/webhdfs/webhdfs_with_user_name/action.yml
b/.github/services/webhdfs/webhdfs_with_user_name/action.yml
new file mode 100644
index 000000000..842ec48cb
--- /dev/null
+++ b/.github/services/webhdfs/webhdfs_with_user_name/action.yml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: webhdfs
+description: "Behavior test for webhdfs with user name specified"
+
+runs:
+ using: "composite"
+ steps:
+ - name: Setup webhdfs
+ shell: bash
+ working-directory: fixtures/webhdfs
+ run: |
+ docker compose -f docker-compose-webhdfs.yml up -d --wait
+ - name: Setup
+ shell: bash
+ run: |
+ cat << EOF >> $GITHUB_ENV
+ OPENDAL_WEBHDFS_ROOT=/
+ OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
+ OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
+ OPENDAL_WEBHDFS_USER_NAME=root
+ EOF
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 75d2240b1..afd165afd 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -101,6 +101,16 @@ impl WebhdfsBuilder {
self
}
+ /// Set the username of this backend,
+ /// used for authentication
+ ///
+ pub fn user_name(mut self, user_name: &str) -> Self {
+ if !user_name.is_empty() {
+ self.config.user_name = Some(user_name.to_string());
+ }
+ self
+ }
+
/// Set the delegation token of this backend,
/// used for authentication
///
@@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder {
let backend = WebhdfsBackend {
root,
endpoint,
+ user_name: self.config.user_name,
auth,
client,
root_checker: OnceCell::new(),
@@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder {
pub struct WebhdfsBackend {
root: String,
endpoint: String,
+ user_name: Option<String>,
auth: Option<String>,
root_checker: OnceCell<()>,
@@ -212,6 +224,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
@@ -220,6 +235,7 @@ impl WebhdfsBackend {
req.body(Buffer::new()).map_err(new_request_build_error)
}
+
/// create object
pub async fn webhdfs_create_object_request(
&self,
@@ -235,6 +251,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
@@ -277,6 +296,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
@@ -311,7 +333,9 @@ impl WebhdfsBackend {
percent_encode_path(&from),
percent_encode_path(&to)
);
-
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
@@ -330,7 +354,9 @@ impl WebhdfsBackend {
body: Buffer,
) -> Result<Request<Buffer>> {
let mut url = location.to_string();
-
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
@@ -362,7 +388,9 @@ impl WebhdfsBackend {
percent_encode_path(&p),
percent_encode_path(&sources),
);
-
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
@@ -379,6 +407,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
@@ -404,6 +435,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
@@ -429,6 +463,9 @@ impl WebhdfsBackend {
if !start_after.is_empty() {
url += format!("&startAfter={}", start_after).as_str();
}
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
@@ -455,7 +492,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
-
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
@@ -474,6 +513,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
+ if let Some(user) = &self.user_name {
+ url += format!("&user.name={user}").as_str();
+ }
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
diff --git a/core/src/services/webhdfs/config.rs
b/core/src/services/webhdfs/config.rs
index 168ea7d0f..03dadfcaf 100644
--- a/core/src/services/webhdfs/config.rs
+++ b/core/src/services/webhdfs/config.rs
@@ -30,6 +30,8 @@ pub struct WebhdfsConfig {
pub root: Option<String>,
/// Endpoint for webhdfs.
pub endpoint: Option<String>,
+ /// Name of the user for webhdfs.
+ pub user_name: Option<String>,
/// Delegation token for webhdfs.
pub delegation: Option<String>,
/// Disable batch listing
@@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig {
f.debug_struct("WebhdfsConfig")
.field("root", &self.root)
.field("endpoint", &self.endpoint)
+ .field("user_name", &self.user_name)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}