This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b7dd8cf96e76f7e9a5dab7376df32e4339166a58 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:40:47 2023 +0800 Migrate hdfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/hdfs/writer.rs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index a3928ed13..ceb575265 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -22,6 +22,7 @@ use bytes::Bytes; use futures::AsyncWriteExt; use super::error::parse_io_error; +use crate::raw::oio::StreamExt; use crate::raw::*; use crate::*; @@ -39,14 +40,13 @@ impl<F> HdfsWriter<F> { } } -impl HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - while self.pos < bs.len() { - let n = self - .f - .write(&bs[self.pos..]) - .await - .map_err(parse_io_error)?; +#[async_trait] +impl oio::Write for HdfsWriter<hdrs::AsyncFile> { + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + while let Some(bs) = s.next().await.transpose()? { + let n = bs.len(); + + self.f.write_all(&bs).await.map_err(parse_io_error)?; self.pos += n; } // Reset pos to 0 for next write. @@ -54,16 +54,6 @@ impl HdfsWriter<hdrs::AsyncFile> { Ok(()) } -} - -#[async_trait] -impl oio::Write for HdfsWriter<hdrs::AsyncFile> { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } async fn abort(&mut self) -> Result<()> { Err(Error::new(
