This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b3c1e41cd2097b329e9baf032d53b11fe79e7860 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:48:21 2023 +0800 Migrate supabase Signed-off-by: Xuanwo <[email protected]> --- core/src/services/supabase/core.rs | 2 +- core/src/services/supabase/writer.rs | 29 +++++++---------------------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/core/src/services/supabase/core.rs b/core/src/services/supabase/core.rs index ccdb2e1ab..16616ce5e 100644 --- a/core/src/services/supabase/core.rs +++ b/core/src/services/supabase/core.rs @@ -82,7 +82,7 @@ impl SupabaseCore { pub fn supabase_upload_object_request( &self, path: &str, - size: Option<usize>, + size: Option<u64>, content_type: Option<&str>, body: AsyncBody, ) -> Result<Request<AsyncBody>> { diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 9eb736c73..bda8dd3e4 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -41,14 +42,16 @@ impl SupabaseWriter { path: path.to_string(), } } +} - pub async fn upload(&self, bytes: Bytes) -> Result<()> { - let size = bytes.len(); +#[async_trait] +impl oio::Write for SupabaseWriter { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let mut req = self.core.supabase_upload_object_request( &self.path, - Some(size), + Some(s.size()), self.op.content_type(), - AsyncBody::Bytes(bytes), + AsyncBody::Stream(s), )?; self.core.sign(&mut req)?; @@ -64,24 +67,6 @@ impl SupabaseWriter { } } - async fn write(&mut self, bs: Bytes) -> Result<()> { - if bs.is_empty() { - return Ok(()); - } - - self.upload(bs).await - } -} - -#[async_trait] -impl oio::Write for SupabaseWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } - async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported,
