This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 6c175925840d2c96859ec516c9b7ae169f2962d3 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:42:05 2023 +0800 Migrate ipmfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/ipmfs/backend.rs | 4 ++-- core/src/services/ipmfs/writer.rs | 17 +++++------------ 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index aebab57e5..c5fa4c7ba 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -290,7 +290,7 @@ impl IpmfsBackend { pub async fn ipmfs_write( &self, path: &str, - body: Bytes, + stream: oio::Streamer, ) -> Result<Response<IncomingAsyncBody>> { let p = build_rooted_abs_path(&self.root, path); @@ -300,7 +300,7 @@ impl IpmfsBackend { percent_encode_path(&p) ); - let multipart = Multipart::new().part(FormDataPart::new("data").content(body)); + let multipart = Multipart::new().part(FormDataPart::new("data").stream(stream)); let req: http::request::Builder = Request::post(url); let req = multipart.apply(req)?; diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 2e2d4c18e..5e5359b37 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -34,9 +34,12 @@ impl IpmfsWriter { pub fn new(backend: IpmfsBackend, path: String) -> Self { IpmfsWriter { backend, path } } +} - async fn write(&mut self, bs: Bytes) -> Result<()> { - let resp = self.backend.ipmfs_write(&self.path, bs).await?; +#[async_trait] +impl oio::Write for IpmfsWriter { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + let resp = self.backend.ipmfs_write(&self.path, s).await?; let status = resp.status(); @@ -48,16 +51,6 @@ impl IpmfsWriter { _ => Err(parse_error(resp).await?), } } -} - -#[async_trait] -impl oio::Write for IpmfsWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } async fn abort(&mut self) -> Result<()> { Ok(())
