This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit c870d073c6bcef823de85867d87917981f5c08b5
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:51:25 2023 +0800

    Migreate webhdfs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/webhdfs/backend.rs |  2 +-
 core/src/services/webhdfs/writer.rs  | 20 +++++++-------------
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index db8e0490a..89693b851 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -200,7 +200,7 @@ impl WebhdfsBackend {
     pub async fn webhdfs_create_object_request(
         &self,
         path: &str,
-        size: Option<usize>,
+        size: Option<u64>,
         content_type: Option<&str>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index d27a84480..3097395ae 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -21,6 +21,7 @@ use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
 use super::error::parse_error;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
 
@@ -35,15 +36,18 @@ impl WebhdfsWriter {
     pub fn new(backend: WebhdfsBackend, op: OpWrite, path: String) -> Self {
         WebhdfsWriter { backend, op, path }
     }
+}
 
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+#[async_trait]
+impl oio::Write for WebhdfsWriter {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
         let req = self
             .backend
             .webhdfs_create_object_request(
                 &self.path,
-                Some(bs.len()),
+                Some(s.size()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Stream(s),
             )
             .await?;
 
@@ -58,16 +62,6 @@ impl WebhdfsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
-
-#[async_trait]
-impl oio::Write for WebhdfsWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
 
     async fn abort(&mut self) -> Result<()> {
         Ok(())

Reply via email to