This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit a2575f825c8394fa2e089b880c14c50ac7bcccdc Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 16:19:01 2023 +0800 Fix build Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/one_shot_write.rs | 12 ++++++++++-- core/src/services/supabase/writer.rs | 29 ++++++----------------------- core/src/services/wasabi/writer.rs | 1 + 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index ee15c438f..a0a743915 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use bytes::Bytes; use futures::future::BoxFuture; use std::task::{ready, Context, Poll}; @@ -33,7 +34,7 @@ pub trait OneShotWrite: Send + Sync + Unpin { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. - async fn write_once(&self, body: &dyn oio::WriteBuf) -> Result<()>; + async fn write_once(&self, bs: Bytes) -> Result<()>; } /// OneShotWrite is used to implement [`Write`] based on one shot. @@ -62,7 +63,14 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { match &mut self.state { State::Idle(w) => { let w = w.take().expect("writer must be valid"); - let fut = w.write_once(bs); + + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); + let fut = async { + let res = w.write_once(bs).await; + + (w, res.map(|_| size)) + }; self.state = State::Write(Box::pin(fut)); } diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index d8ba4cc16..3b692fd08 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; @@ -41,14 +42,16 @@ impl SupabaseWriter { path: path.to_string(), } } +} - pub async fn upload(&self, bytes: Bytes) -> Result<()> { - let size = bytes.len(); +impl oio::OneShotWrite for SupabaseWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { + let size = bs.len(); let mut req = self.core.supabase_upload_object_request( &self.path, Some(size), self.op.content_type(), - AsyncBody::Bytes(bytes), + AsyncBody::Bytes(bs), )?; self.core.sign(&mut req)?; @@ -64,23 +67,3 @@ impl SupabaseWriter { } } } - -#[async_trait] -impl oio::Write for SupabaseWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - self.upload(bs.copy_to_bytes(size)).await?; - Ok(size) - } - - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Err(Error::new( - ErrorKind::Unsupported, - "The abort operation is not yet supported for Supabase backend", - )) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) - } -} diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 3eeaab12d..1c7b5ed2e 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::task::Context; use async_trait::async_trait; use http::StatusCode;
