This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c870d073c6bcef823de85867d87917981f5c08b5 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:51:25 2023 +0800 Migreate webhdfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/webhdfs/backend.rs | 2 +- core/src/services/webhdfs/writer.rs | 20 +++++++------------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index db8e0490a..89693b851 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -200,7 +200,7 @@ impl WebhdfsBackend { pub async fn webhdfs_create_object_request( &self, path: &str, - size: Option<usize>, + size: Option<u64>, content_type: Option<&str>, body: AsyncBody, ) -> Result<Request<AsyncBody>> { diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index d27a84480..3097395ae 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -21,6 +21,7 @@ use http::StatusCode; use super::backend::WebhdfsBackend; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -35,15 +36,18 @@ impl WebhdfsWriter { pub fn new(backend: WebhdfsBackend, op: OpWrite, path: String) -> Self { WebhdfsWriter { backend, op, path } } +} - async fn write(&mut self, bs: Bytes) -> Result<()> { +#[async_trait] +impl oio::Write for WebhdfsWriter { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let req = self .backend .webhdfs_create_object_request( &self.path, - Some(bs.len()), + Some(s.size()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::Stream(s), ) .await?; @@ -58,16 +62,6 @@ impl WebhdfsWriter { _ => Err(parse_error(resp).await?), } } -} - -#[async_trait] -impl oio::Write for WebhdfsWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } async fn abort(&mut self) -> Result<()> { Ok(())
