This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new e8293428 refactor: Polish the implementaion of webhdfs (#1935)
e8293428 is described below

commit e82934283f43f229acf76d151d0eedd6379d6078
Author: Xuanwo <[email protected]>
AuthorDate: Thu Apr 13 19:41:48 2023 +0800

    refactor: Polish the implementaion of webhdfs (#1935)
    
    * Fix file could have trailing /
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * refactor: Polish the implementaion of webhdfs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make clippy happy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/webhdfs/backend.rs | 333 +++++++++++++----------------------
 core/src/services/webhdfs/message.rs |  21 +--
 core/src/services/webhdfs/pager.rs   |  15 +-
 core/src/services/webhdfs/writer.rs  |   2 +-
 4 files changed, 136 insertions(+), 235 deletions(-)

diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 3bd32631..295ef755 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -19,15 +19,12 @@ use core::fmt::Debug;
 use std::collections::HashMap;
 
 use async_trait::async_trait;
-use bytes::Buf;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
-use http::response::Parts;
 use http::Request;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use log::error;
 use tokio::sync::OnceCell;
 
 use super::error::parse_error;
@@ -76,6 +73,7 @@ const WEBHDFS_DEFAULT_ENDPOINT: &str = 
"http://127.0.0.1:9870";;
 /// # Examples
 ///
 /// ## Via Builder
+///
 /// ```no_run
 /// use std::sync::Arc;
 ///
@@ -114,13 +112,10 @@ pub struct WebhdfsBuilder {
 
 impl Debug for WebhdfsBuilder {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Builder");
-        ds.field("root", &self.root)
-            .field("endpoint", &self.endpoint);
-        if self.delegation.is_some() {
-            ds.field("delegation", &"<redacted>");
-        }
-        ds.finish()
+        f.debug_struct("Builder")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
     }
 }
 
@@ -128,15 +123,14 @@ impl WebhdfsBuilder {
     /// Set the working directory of this backend
     ///
     /// All operations will happen under this root
+    ///
     /// # Note
+    ///
     /// The root will be automatically created if not exists.
-    /// If the root is occupied by a file, building of directory will fail
     pub fn root(&mut self, root: &str) -> &mut Self {
-        self.root = if root.is_empty() {
-            None
-        } else {
-            Some(root.to_string())
-        };
+        if !root.is_empty() {
+            self.root = Some(root.to_string())
+        }
 
         self
     }
@@ -161,6 +155,7 @@ impl WebhdfsBuilder {
 
     /// Set the delegation token of this backend,
     /// used for authentication
+    ///
     /// # Note
     /// The builder prefers using delegation token over username.
     /// If both are set, delegation token will be used.
@@ -172,15 +167,6 @@ impl WebhdfsBuilder {
     }
 }
 
-impl WebhdfsBuilder {
-    fn auth_str(&mut self) -> Option<String> {
-        if let Some(dt) = self.delegation.take() {
-            return Some(format!("delegation_token={dt}"));
-        }
-        None
-    }
-}
-
 impl Builder for WebhdfsBuilder {
     const SCHEME: Scheme = Scheme::Webhdfs;
     type Accessor = WebhdfsBackend;
@@ -188,28 +174,22 @@ impl Builder for WebhdfsBuilder {
     fn from_map(map: HashMap<String, String>) -> Self {
         let mut builder = WebhdfsBuilder::default();
 
-        for (k, v) in map.iter() {
-            let v = v.as_str();
-            match k.as_str() {
-                "root" => builder.root(v),
-                "endpoint" => builder.endpoint(v),
-                "delegation" => builder.delegation(v),
-                _ => continue,
-            };
-        }
+        map.get("root").map(|v| builder.root(v));
+        map.get("endpoint").map(|v| builder.endpoint(v));
+        map.get("delegation").map(|v| builder.delegation(v));
 
         builder
     }
 
     /// build the backend
     ///
-    /// # Note:
+    /// # Note
+    ///
     /// when building backend, the built backend will check if the root 
directory
     /// exits.
     /// if the directory does not exits, the directory will be automatically 
created
-    /// if the root path is occupied by a file, a failure will be returned
     fn build(&mut self) -> Result<Self::Accessor> {
-        debug!("building backend: {:?}", self);
+        debug!("start building backend: {:?}", self);
 
         let root = normalize_root(&self.root.take().unwrap_or_default());
         debug!("backend use root {root}");
@@ -227,19 +207,21 @@ impl Builder for WebhdfsBuilder {
         };
         debug!("backend use endpoint {}", endpoint);
 
-        let auth = self.auth_str();
+        let auth = self
+            .delegation
+            .take()
+            .map(|dt| format!("delegation_token={dt}"));
+
         let client = HttpClient::new()?;
 
         let backend = WebhdfsBackend {
-            root: root.clone(),
+            root,
             endpoint,
             auth,
             client,
             root_checker: OnceCell::new(),
         };
 
-        debug!("checking working directory: {}", root);
-
         Ok(backend)
     }
 }
@@ -249,14 +231,17 @@ impl Builder for WebhdfsBuilder {
 pub struct WebhdfsBackend {
     root: String,
     endpoint: String,
-    pub client: HttpClient,
     auth: Option<String>,
     root_checker: OnceCell<()>,
+
+    pub client: HttpClient,
 }
 
 impl WebhdfsBackend {
-    // create object or make a directory
-    pub async fn webhdfs_create_object_req(
+    /// create object or make a directory
+    ///
+    /// TODO: we should split it into mkdir and create
+    pub async fn webhdfs_create_object_request(
         &self,
         path: &str,
         size: Option<usize>,
@@ -307,7 +292,11 @@ impl WebhdfsBackend {
         re_builder.body(body).map_err(new_request_build_error)
     }
 
-    async fn webhdfs_open_req(&self, path: &str, range: &BytesRange) -> 
Result<Request<AsyncBody>> {
+    async fn webhdfs_open_request(
+        &self,
+        path: &str,
+        range: &BytesRange,
+    ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=OPEN",
@@ -315,40 +304,23 @@ impl WebhdfsBackend {
             percent_encode_path(&p),
         );
         if let Some(auth) = &self.auth {
-            url += format!("&{auth}").as_str();
+            url += &format!("&{auth}");
         }
 
-        // make a Webhdfs compatible bytes range
-        //
-        // Webhdfs does not support read from end
-        // have to solve manually
-        let range = match (range.offset(), range.size()) {
-            // avoiding reading the whole file
-            (None, Some(size)) => {
-                debug!("converting bytes range to webhdfs compatible");
-                let status = self.stat(path, OpStat::default()).await?;
-                let total_size = status.into_metadata().content_length();
-                let offset = total_size - size;
-                BytesRange::new(Some(offset), Some(size))
-            }
-            _ => *range,
-        };
-
-        let (offset, size) = (range.offset(), range.size());
+        if !range.is_full() {
+            // Webhdfs does not support read from end
+            if range.offset().is_none() && range.size().is_some() {
+                return Err(Error::new(
+                    ErrorKind::Unsupported,
+                    "webhdfs doesn't support read with suffix range",
+                ));
+            };
 
-        match (offset, size) {
-            (Some(offset), Some(size)) => {
-                url += format!("&offset={offset}&length={size}").as_str();
+            if let Some(offset) = range.offset() {
+                url += &format!("&offset={offset}");
             }
-            (Some(offset), None) => {
-                url += format!("&offset={offset}").as_str();
-            }
-            (None, None) => {
-                // read all, do nothing
-            }
-            (None, Some(_)) => {
-                // already handled
-                unreachable!()
+            if let Some(size) = range.size() {
+                url += &format!("&length={size}")
             }
         }
 
@@ -359,7 +331,7 @@ impl WebhdfsBackend {
         Ok(req)
     }
 
-    fn webhdfs_list_status_req(&self, path: &str) -> 
Result<Request<AsyncBody>> {
+    fn webhdfs_list_status_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=LISTSTATUS",
@@ -370,62 +342,52 @@ impl WebhdfsBackend {
             url += format!("&{auth}").as_str();
         }
 
-        let req = Request::get(&url);
-        let req = req
+        let req = Request::get(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
         Ok(req)
     }
-}
 
-impl WebhdfsBackend {
-    /// get object from webhdfs
-    ///
-    /// # Notes
-    ///
-    /// looks like webhdfs doesn't support range request from file end.
-    /// so if we want to read the tail of object, the whole object should be 
transferred.
-    async fn webhdfs_get_object(
+    async fn webhdfs_read_file(
         &self,
         path: &str,
         range: BytesRange,
     ) -> Result<Response<IncomingAsyncBody>> {
-        let req = self.webhdfs_open_req(path, &range).await?;
+        let req = self.webhdfs_open_request(path, &range).await?;
         let resp = self.client.send(req).await?;
 
-        // this should be a 307 redirect
+        // webhdfs namenode will redirect us to datanode for data transfer.
         if resp.status() != StatusCode::TEMPORARY_REDIRECT {
             return Err(parse_error(resp).await?);
         }
 
-        let re_url = self.follow_redirect(resp)?;
-        let re_req = Request::get(&re_url)
+        let location = self.follow_redirect(resp)?;
+        let req = Request::get(&location)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
-        self.client.send(re_req).await
+        self.client.send(req).await
     }
 
-    async fn webhdfs_status_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+    async fn webhdfs_get_file_status(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=GETFILESTATUS",
             self.endpoint,
             percent_encode_path(&p),
         );
-        debug!("webhdfs status url: {}", url);
+
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
 
-        let req = Request::get(&url);
-        let req = req
+        let req = Request::get(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
         self.client.send(req).await
     }
 
-    async fn webhdfs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+    async fn webhdfs_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=DELETE&recursive=false",
@@ -436,94 +398,56 @@ impl WebhdfsBackend {
             url += format!("&{auth}").as_str();
         }
 
-        let req = Request::delete(&url);
-        let req = req
+        let req = Request::delete(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
         self.client.send(req).await
     }
-}
 
-impl WebhdfsBackend {
     /// get redirect destination from 307 TEMPORARY_REDIRECT http response
     fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> 
Result<String> {
-        let loc = match parse_location(resp.headers())? {
-            Some(p) => {
-                if !p.starts_with('/') {
-                    // is not relative path
-                    p.to_string()
-                } else {
-                    // is relative path
-                    // prefix with endpoint url
-                    let url = self.endpoint.clone();
-                    format!("{url}/{p}")
-                }
-            }
-            None => {
-                let err = Error::new(
-                    ErrorKind::Unexpected,
-                    "redirection fail: no location header",
-                );
-                return Err(err);
-            }
-        };
-        Ok(loc)
-    }
-
-    fn consume_success_mkdir(&self, path: &str, parts: Parts, body: &str) -> 
Result<RpCreate> {
-        let mkdir_rsp = serde_json::from_str::<BooleanResp>(body).map_err(|e| {
-            Error::new(ErrorKind::Unexpected, "cannot parse mkdir response")
-                .set_temporary()
-                .with_context("service", Scheme::Webhdfs)
-                .with_context("response", format!("{parts:?}"))
-                .set_source(e)
+        let location = parse_location(resp.headers())?.ok_or_else(|| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "webhdfs expect to have redirect location but got none",
+            )
         })?;
 
-        if mkdir_rsp.boolean {
-            Ok(RpCreate::default())
+        let location = if location.starts_with('/') {
+            // location starts with `/` means it's a relative path to current
+            // endpoint. We should prepend the endpoint to it so that we can
+            // send request to the correct location.
+            format!("{}/{location}", self.endpoint)
         } else {
-            Err(Error::new(
-                ErrorKind::Unexpected,
-                &format!("mkdir failed: {path}"),
-            ))
-        }
+            location.to_string()
+        };
+
+        Ok(location)
     }
 
     async fn check_root(&self) -> Result<()> {
-        let resp = self.webhdfs_status_object("/").await?;
+        let resp = self.webhdfs_get_file_status("/").await?;
         match resp.status() {
             StatusCode::OK => {
-                let body_bs = resp.into_body().bytes().await?;
-
-                let file_status = serde_json::from_reader::<_, 
FileStatusWrapper>(body_bs.reader())
-                    .map_err(|e| {
-                        Error::new(ErrorKind::Unexpected, "cannot parse 
returned json")
-                            .with_context("service", Scheme::Webhdfs)
-                            .set_source(e)
-                    })?
+                let bs = resp.into_body().bytes().await?;
+
+                let file_status = 
serde_json::from_slice::<FileStatusWrapper>(&bs)
+                    .map_err(new_json_deserialize_error)?
                     .file_status;
 
-                match file_status.ty {
-                    FileStatusType::File => {
-                        error!("working directory is occupied!");
-                        return Err(Error::new(ErrorKind::ConfigInvalid, "root 
is occupied!")
-                            .with_context("service", Scheme::Webhdfs));
-                    }
-                    FileStatusType::Directory => {
-                        debug!("working directory exists, do nothing");
-                    }
+                if file_status.ty == FileStatusType::File {
+                    return Err(Error::new(
+                        ErrorKind::ConfigInvalid,
+                        "root path must be dir",
+                    ));
                 }
             }
-
             StatusCode::NOT_FOUND => {
-                debug!("working directory does not exists, creating...");
                 self.create("/", OpCreate::new(EntryMode::DIR)).await?;
             }
-
             _ => return Err(parse_error(resp).await?),
         }
-        debug!("working directory is ready!");
         Ok(())
     }
 }
@@ -549,19 +473,9 @@ impl Accessor for WebhdfsBackend {
     }
 
     /// Create a file or directory
-    async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
-        // if the path ends with '/', it will be treated as a directory
-        // otherwise, it will be treated as a file
-        let path = if args.mode().is_file() && path.ends_with('/') {
-            path.trim_end_matches('/').to_owned()
-        } else if args.mode().is_dir() && !path.ends_with('/') {
-            path.to_owned() + "/"
-        } else {
-            path.to_owned()
-        };
-
+    async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
         let req = self
-            .webhdfs_create_object_req(&path, Some(0), None, AsyncBody::Empty)
+            .webhdfs_create_object_request(path, Some(0), None, 
AsyncBody::Empty)
             .await?;
 
         let resp = self.client.send(req).await?;
@@ -574,15 +488,19 @@ impl Accessor for WebhdfsBackend {
         // the redirection should be done automatically.
         match status {
             StatusCode::CREATED | StatusCode::OK => {
-                if !path.ends_with('/') {
-                    // create file's http resp could be ignored
-                    resp.into_body().consume().await?;
-                    return Ok(RpCreate::default());
+                let bs = resp.into_body().bytes().await?;
+
+                let resp = serde_json::from_slice::<BooleanResp>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                if resp.boolean {
+                    Ok(RpCreate::default())
+                } else {
+                    Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "webhdfs create dir failed",
+                    ))
                 }
-                let (parts, body) = resp.into_parts();
-                let bs = body.bytes().await?;
-                let s = String::from_utf8_lossy(&bs);
-                self.consume_success_mkdir(&path, parts, &s)
             }
             _ => Err(parse_error(resp).await?),
         }
@@ -590,14 +508,12 @@ impl Accessor for WebhdfsBackend {
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let range = args.range();
-        let resp = self.webhdfs_get_object(path, range).await?;
+        let resp = self.webhdfs_read_file(path, range).await?;
         match resp.status() {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let meta = parse_into_metadata(path, resp.headers())?;
                 Ok((RpRead::with_metadata(meta), resp.into_body()))
             }
-            StatusCode::NOT_FOUND => Err(Error::new(ErrorKind::NotFound, 
"object not found")
-                .with_context("service", Scheme::Webhdfs)),
             _ => Err(parse_error(resp).await?),
         }
     }
@@ -622,28 +538,25 @@ impl Accessor for WebhdfsBackend {
             .get_or_try_init(|| async { self.check_root().await })
             .await?;
 
-        let resp = self.webhdfs_status_object(path).await?;
+        let resp = self.webhdfs_get_file_status(path).await?;
         let status = resp.status();
         match status {
             StatusCode::OK => {
-                debug!("stat object: {} ok", path);
-                let mut meta = parse_into_metadata(path, resp.headers())?;
-                let body_bs = resp.into_body().bytes().await?;
-
-                let file_status = serde_json::from_reader::<_, 
FileStatusWrapper>(body_bs.reader())
-                    .map_err(|e| {
-                        Error::new(ErrorKind::Unexpected, "cannot parse 
returned json")
-                            .with_context("service", Scheme::Webhdfs)
-                            .set_source(e)
-                    })?
+                let bs = resp.into_body().bytes().await?;
+
+                let file_status = 
serde_json::from_slice::<FileStatusWrapper>(&bs)
+                    .map_err(new_json_deserialize_error)?
                     .file_status;
-                debug!("file status: {:?}", file_status);
-                let status_meta: Metadata = file_status.try_into()?;
 
-                // is ok to unwrap here
-                // all metadata field of status meta is present and checked by 
`TryFrom`
-                meta.set_last_modified(status_meta.last_modified().unwrap())
-                    .set_content_length(status_meta.content_length());
+                let meta = match file_status.ty {
+                    FileStatusType::Directory => Metadata::new(EntryMode::DIR),
+                    FileStatusType::File => Metadata::new(EntryMode::FILE)
+                        .with_content_length(file_status.length)
+                        
.with_last_modified(parse_datetime_from_from_timestamp_millis(
+                            file_status.modification_time,
+                        )?),
+                };
+
                 Ok(RpStat::new(meta))
             }
 
@@ -652,7 +565,8 @@ impl Accessor for WebhdfsBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.webhdfs_delete_object(path).await?;
+        let resp = self.webhdfs_delete(path).await?;
+
         match resp.status() {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
@@ -664,21 +578,16 @@ impl Accessor for WebhdfsBackend {
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Pager)> {
         let path = path.trim_end_matches('/');
-        let req = self.webhdfs_list_status_req(path)?;
+        let req = self.webhdfs_list_status_request(path)?;
 
         let resp = self.client.send(req).await?;
         match resp.status() {
             StatusCode::OK => {
-                let body_bs = resp.into_body().bytes().await?;
-                let file_statuses =
-                    serde_json::from_reader::<_, 
FileStatusesWrapper>(body_bs.reader())
-                        .map_err(|e| {
-                            Error::new(ErrorKind::Unexpected, "cannot parse 
returned json")
-                                .with_context("service", Scheme::Webhdfs)
-                                .set_source(e)
-                        })?
-                        .file_statuses
-                        .file_status;
+                let bs = resp.into_body().bytes().await?;
+                let file_statuses = 
serde_json::from_slice::<FileStatusesWrapper>(&bs)
+                    .map_err(new_json_deserialize_error)?
+                    .file_statuses
+                    .file_status;
 
                 let objects = WebhdfsPager::new(path, file_statuses);
                 Ok((RpList::default(), objects))
diff --git a/core/src/services/webhdfs/message.rs 
b/core/src/services/webhdfs/message.rs
index 9abc4385..e5008830 100644
--- a/core/src/services/webhdfs/message.rs
+++ b/core/src/services/webhdfs/message.rs
@@ -15,13 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! HTTP response messages
+//! WebHDFS response messages
 
 use serde::Deserialize;
 
-use crate::raw::*;
-use crate::*;
-
 #[derive(Debug, Deserialize)]
 pub(super) struct BooleanResp {
     pub boolean: bool,
@@ -56,22 +53,6 @@ pub struct FileStatus {
     pub ty: FileStatusType,
 }
 
-impl TryFrom<FileStatus> for Metadata {
-    type Error = Error;
-    fn try_from(value: FileStatus) -> Result<Self> {
-        let mut meta = match value.ty {
-            FileStatusType::Directory => Metadata::new(EntryMode::DIR),
-            FileStatusType::File => Metadata::new(EntryMode::FILE),
-        };
-
-        meta.set_last_modified(parse_datetime_from_from_timestamp_millis(
-            value.modification_time,
-        )?)
-        .set_content_length(value.length);
-        Ok(meta)
-    }
-}
-
 #[derive(Debug, Deserialize, PartialEq, Eq)]
 #[serde(rename_all = "UPPERCASE")]
 pub enum FileStatusType {
diff --git a/core/src/services/webhdfs/pager.rs 
b/core/src/services/webhdfs/pager.rs
index 17fc626a..e58330cd 100644
--- a/core/src/services/webhdfs/pager.rs
+++ b/core/src/services/webhdfs/pager.rs
@@ -17,7 +17,7 @@
 
 use async_trait::async_trait;
 
-use super::message::FileStatus;
+use super::message::{FileStatus, FileStatusType};
 use crate::raw::*;
 use crate::*;
 
@@ -51,7 +51,18 @@ impl oio::Page for WebhdfsPager {
                 format!("{}/{}", self.path, status.path_suffix)
             };
 
-            let meta: Metadata = status.try_into()?;
+            let meta = match status.ty {
+                FileStatusType::Directory => Metadata::new(EntryMode::DIR),
+                FileStatusType::File => Metadata::new(EntryMode::FILE)
+                    .with_content_length(status.length)
+                    
.with_last_modified(parse_datetime_from_from_timestamp_millis(
+                        status.modification_time,
+                    )?),
+            };
+
+            if meta.mode().is_file() {
+                path = path.trim_end_matches('/').to_string();
+            }
             if meta.mode().is_dir() {
                 path += "/"
             }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 8fea27a6..1bdf7bd9 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -43,7 +43,7 @@ impl oio::Write for WebhdfsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let req = self
             .backend
-            .webhdfs_create_object_req(
+            .webhdfs_create_object_request(
                 &self.path,
                 Some(bs.len()),
                 self.op.content_type(),

Reply via email to