This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b7dd8cf96e76f7e9a5dab7376df32e4339166a58
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:40:47 2023 +0800

    Migrate hdfs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/hdfs/writer.rs | 26 ++++++++------------------
 1 file changed, 8 insertions(+), 18 deletions(-)

diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index a3928ed13..ceb575265 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -22,6 +22,7 @@ use bytes::Bytes;
 use futures::AsyncWriteExt;
 
 use super::error::parse_io_error;
+use crate::raw::oio::StreamExt;
 use crate::raw::*;
 use crate::*;
 
@@ -39,14 +40,13 @@ impl<F> HdfsWriter<F> {
     }
 }
 
-impl HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        while self.pos < bs.len() {
-            let n = self
-                .f
-                .write(&bs[self.pos..])
-                .await
-                .map_err(parse_io_error)?;
+#[async_trait]
+impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
+        while let Some(bs) = s.next().await.transpose()? {
+            let n = bs.len();
+
+            self.f.write_all(&bs).await.map_err(parse_io_error)?;
             self.pos += n;
         }
         // Reset pos to 0 for next write.
@@ -54,16 +54,6 @@ impl HdfsWriter<hdrs::AsyncFile> {
 
         Ok(())
     }
-}
-
-#[async_trait]
-impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
 
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(

Reply via email to