This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9b4b76251 refactor(services/webhdfs): Rewrite `webhdfs` methods
signature by using `OpXxxx` (#3109)
9b4b76251 is described below
commit 9b4b76251956508a79fa227b5b0b85d92c1bf5e1
Author: Yi-Sheng Lien <[email protected]>
AuthorDate: Mon Sep 18 17:59:16 2023 +0800
refactor(services/webhdfs): Rewrite `webhdfs` methods signature by using
`OpXxxx` (#3109)
* refactor(service/webHDFS): Passing OpWrite instead of content_type
* refactor(service/webHDFS): Passing OpList instead of start_after
refactor(service/webHDFS): Passing OpList instead of start_after
* use existing API of raw::ops
* format the code snippets of modification
---
core/src/services/webhdfs/backend.rs | 12 ++++++------
core/src/services/webhdfs/pager.rs | 6 +++++-
core/src/services/webhdfs/writer.rs | 2 +-
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index b1bfed8ae..4adbc6ab9 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -201,7 +201,7 @@ impl WebhdfsBackend {
&self,
path: &str,
size: Option<usize>,
- content_type: Option<&str>,
+ args: &OpWrite,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -230,7 +230,7 @@ impl WebhdfsBackend {
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string());
}
- if let Some(content_type) = content_type {
+ if let Some(content_type) = args.content_type() {
req = req.header(CONTENT_TYPE, content_type);
}
@@ -296,12 +296,12 @@ impl WebhdfsBackend {
pub(super) fn webhdfs_list_status_batch_request(
&self,
path: &str,
- start_after: &Option<String>,
+ args: &OpList,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
// if it's not the first time to call LISTSTATUS_BATCH, we will add
&startAfter=<CHILD>
- let start_after_param = match start_after {
+ let start_after_param = match args.start_after() {
Some(sa) if sa.is_empty() => String::new(),
Some(sa) => format!("&startAfter={}", sa),
None => String::new(),
@@ -430,7 +430,7 @@ impl Accessor for WebhdfsBackend {
/// Create a file or directory
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
let req = self
- .webhdfs_create_object_request(path, Some(0), None,
AsyncBody::Empty)
+ .webhdfs_create_object_request(path, Some(0), &OpWrite::default(),
AsyncBody::Empty)
.await?;
let resp = self.client.send(req).await?;
@@ -535,7 +535,7 @@ impl Accessor for WebhdfsBackend {
let path = path.trim_end_matches('/');
if !self.disable_list_batch {
- let req = self.webhdfs_list_status_batch_request(path, &None)?;
+ let req = self.webhdfs_list_status_batch_request(path,
&OpList::default())?;
let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
diff --git a/core/src/services/webhdfs/pager.rs
b/core/src/services/webhdfs/pager.rs
index e9f45df3f..c55252c3d 100644
--- a/core/src/services/webhdfs/pager.rs
+++ b/core/src/services/webhdfs/pager.rs
@@ -63,9 +63,13 @@ impl oio::Page for WebhdfsPager {
return match self.backend.disable_list_batch {
true => self.webhdfs_get_next_list_statuses(),
false => {
+ let args = OpList::with_start_after(
+ OpList::default(),
+ &self.batch_start_after.clone().unwrap(),
+ );
let req = self
.backend
- .webhdfs_list_status_batch_request(&self.path,
&self.batch_start_after)?;
+ .webhdfs_list_status_batch_request(&self.path, &args)?;
let resp = self.backend.client.send(req).await?;
match resp.status() {
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index ddc8dd328..495fa7fc2 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -48,7 +48,7 @@ impl oio::OneShotWrite for WebhdfsWriter {
.webhdfs_create_object_request(
&self.path,
Some(bs.len()),
- self.op.content_type(),
+ &self.op,
AsyncBody::Bytes(bs),
)
.await?;