This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new dd290ab7 feat(services/supabase): Add read/write/stat support for 
supabase (#2119)
dd290ab7 is described below

commit dd290ab72e9940fefca36407a373a9f5aacefa06
Author: xyJi <[email protected]>
AuthorDate: Thu Apr 27 20:07:12 2023 +0800

    feat(services/supabase): Add read/write/stat support for supabase (#2119)
    
    * add supabase most basic skeleton
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * minimal buildable skeleton core for supabase
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * minimal writer
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * remove pager
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * minimal read/write functioning supabase
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * mux the code and implement stat
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * polish key loading
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * more docs
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * uncomment the conditional compilation
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * fix rebase
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * comments
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * service key only
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * check content length
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * simplify logics and use key only
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    * minor refactor
    
    Signed-off-by: Ji-Xinyou <[email protected]>
    
    ---------
    
    Signed-off-by: Ji-Xinyou <[email protected]>
---
 core/Cargo.toml                       |   1 +
 core/src/services/mod.rs              |   5 +
 core/src/services/s3/core.rs          |   2 -
 core/src/services/supabase/backend.rs | 274 ++++++++++++++++++++++++++++++++++
 core/src/services/supabase/core.rs    | 200 +++++++++++++++++++++++++
 core/src/services/supabase/error.rs   |  64 ++++++++
 core/src/services/supabase/mod.rs     |  22 +++
 core/src/services/supabase/writer.rs  |  89 +++++++++++
 core/src/services/wasabi/backend.rs   |   2 +-
 core/src/types/scheme.rs              |   4 +
 10 files changed, 660 insertions(+), 3 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 52330e0b..b8bbd0e9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -134,6 +134,7 @@ services-s3 = [
   "reqsign?/reqwest_request",
 ]
 services-sled = ["dep:sled"]
+services-supabase = []
 services-wasabi = [
   "dep:reqsign",
   "reqsign?/services-aws",
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 4c1b7397..ab0aec0a 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -119,6 +119,11 @@ mod sled;
 #[cfg(feature = "services-sled")]
 pub use self::sled::Sled;
 
+// #[cfg(feature = "services-supabase")]
+mod supabase;
+// #[cfg(feature = "services-supabase")]
+pub use supabase::Supabase;
+
 #[cfg(feature = "services-wasabi")]
 mod wasabi;
 #[cfg(feature = "services-wasabi")]
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 0081656f..eba880c0 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -446,8 +446,6 @@ impl S3Core {
         self.send(req).await
     }
 
-    /// Make this functions as `pub(suber)` because `DirStream` depends
-    /// on this.
     pub async fn s3_list_objects(
         &self,
         path: &str,
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
new file mode 100644
index 00000000..ae252d08
--- /dev/null
+++ b/core/src/services/supabase/backend.rs
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+
+use super::core::*;
+use super::error::parse_error;
+use super::writer::*;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Supabase service
+///
+/// # Capabilities
+///
+/// - [x] read
+/// - [x] write
+/// - [ ] copy
+/// - [ ] list
+/// - [ ] scan
+/// - [ ] presign
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `root`: Set the work dir for backend.
+/// - `bucket`: Set the container name for backend.
+/// - `endpoint`: Set the endpoint for backend.
+/// - `key`: Set the authorization key for the backend, do not set if you want 
to read public bucket
+///
+/// ## Authorization keys
+///
+/// There are two types of key in the Supabase, one is anon_key(Client key), 
another one is
+/// service_role_key(Secret key). The former one can only write public 
resources while the latter one
+/// can access all resources. Note that if you want to read public resources, 
do not set the key.
+///
+/// # Example
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Supabase;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     let mut builder = Supabase::default();
+///     builder.root("/");
+///     builder.bucket("test_bucket");
+///     builder.endpoint("http://127.0.0.1:54321";);
+///     // this sets up the anon_key, which means this operator can only write 
public resource
+///     builder.key("some_anon_key");
+///
+///     let op: Operator = Operator::new(builder)?.finish();
+///    
+///     Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct SupabaseBuilder {
+    root: Option<String>,
+
+    bucket: String,
+    endpoint: Option<String>,
+
+    key: Option<String>,
+
+    // todo: optional public, currently true always
+    // todo: optional file_size_limit, currently 0
+    // todo: optional allowed_mime_types, currently only string
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for SupabaseBuilder {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SupabaseBuilder")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl SupabaseBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// Set bucket name of this backend.
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        self.bucket = bucket.to_string();
+        self
+    }
+
+    /// Set endpoint of this backend.
+    ///
+    /// Endpoint must be full uri
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        self.endpoint = if endpoint.is_empty() {
+            None
+        } else {
+            Some(endpoint.trim_end_matches('/').to_string())
+        };
+
+        self
+    }
+
+    /// Set the authorization key for this backend
+    /// Do not set this key if you want to read public bucket
+    pub fn key(&mut self, key: &str) -> &mut Self {
+        self.key = Some(key.to_string());
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for SupabaseBuilder {
+    const SCHEME: Scheme = Scheme::Supabase;
+    type Accessor = SupabaseBackend;
+
+    fn from_map(map: std::collections::HashMap<String, String>) -> Self {
+        let mut builder = SupabaseBuilder::default();
+
+        map.get("root").map(|v| builder.root(v));
+        map.get("bucket").map(|v| builder.bucket(v));
+        map.get("endpoint").map(|v| builder.endpoint(v));
+        map.get("key").map(|v| builder.key(v));
+
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        let root = normalize_root(&self.root.take().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        let bucket = &self.bucket;
+
+        let endpoint = self.endpoint.take().unwrap_or_default();
+
+        let http_client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::Supabase)
+            })?
+        };
+
+        let key = self.key.as_ref().map(|k| k.to_owned());
+
+        let core = SupabaseCore::new(&root, bucket, &endpoint, key, 
http_client);
+
+        let core = Arc::new(core);
+
+        Ok(SupabaseBackend { core })
+    }
+}
+
+#[derive(Debug)]
+pub struct SupabaseBackend {
+    core: Arc<SupabaseCore>,
+}
+
+#[async_trait]
+impl Accessor for SupabaseBackend {
+    type Reader = IncomingAsyncBody;
+    type BlockingReader = ();
+    type Writer = SupabaseWriter;
+    type BlockingWriter = ();
+    // todo: implement Pager to support list and scan
+    type Pager = ();
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Supabase)
+            .set_root(&self.core.root)
+            .set_name(&self.core.bucket)
+            .set_capability(Capability {
+                stat: true,
+                read: true,
+                write: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self.core.supabase_get_object(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let meta = parse_into_metadata(path, resp.headers())?;
+                Ok((RpRead::with_metadata(meta), resp.into_body()))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
+        Ok((
+            RpWrite::default(),
+            SupabaseWriter::new(self.core.clone(), path, args),
+        ))
+    }
+
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        // Stat root always returns a DIR.
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        let resp = self.core.supabase_get_object_info(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => parse_into_metadata(path, 
resp.headers()).map(RpStat::new),
+            StatusCode::NOT_FOUND if path.ends_with('/') => {
+                Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/services/supabase/core.rs 
b/core/src/services/supabase/core.rs
new file mode 100644
index 00000000..38391427
--- /dev/null
+++ b/core/src/services/supabase/core.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::HeaderValue;
+use http::Request;
+use http::Response;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct SupabaseCore {
+    pub root: String,
+    pub bucket: String,
+    pub endpoint: String,
+
+    /// The key used for authorization
+    /// If loaded, the read operation will always access the nonpublic 
resources.
+    /// If you want to read the public resources, please do not set the key.
+    pub key: Option<String>,
+
+    pub http_client: HttpClient,
+}
+
+impl Debug for SupabaseCore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SupabaseCore")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl SupabaseCore {
+    pub fn new(
+        root: &str,
+        bucket: &str,
+        endpoint: &str,
+        key: Option<String>,
+        client: HttpClient,
+    ) -> Self {
+        Self {
+            root: root.to_string(),
+            bucket: bucket.to_string(),
+            endpoint: endpoint.to_string(),
+            key,
+            http_client: client,
+        }
+    }
+
+    /// Add authorization header to the request if the key is set. Otherwise 
leave
+    /// the request as-is.
+    pub fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+        if let Some(k) = &self.key {
+            let v = HeaderValue::from_str(&format!("Bearer {}", k)).unwrap();
+            req.headers_mut().insert(http::header::AUTHORIZATION, v);
+        }
+        Ok(())
+    }
+}
+
+// requests
+impl SupabaseCore {
+    pub fn supabase_upload_object_request(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/storage/v1/object/{}/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+
+        if let Some(size) = size {
+            req = req.header(CONTENT_LENGTH, size)
+        }
+
+        if let Some(mime) = content_type {
+            req = req.header(CONTENT_TYPE, mime)
+        }
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub fn supabase_get_object_public_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/storage/v1/object/public/{}/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)
+    }
+
+    pub fn supabase_get_object_auth_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/storage/v1/object/authenticated/{}/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)
+    }
+
+    pub fn supabase_get_object_info_public_request(
+        &self,
+        path: &str,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/storage/v1/object/info/public/{}/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)
+    }
+
+    pub fn supabase_get_object_info_auth_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/storage/v1/object/info/authenticated/{}/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)
+    }
+}
+
+// core utils
+impl SupabaseCore {
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.http_client.send(req).await
+    }
+
+    pub async fn supabase_get_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let mut req = if self.key.is_some() {
+            self.supabase_get_object_auth_request(path)?
+        } else {
+            self.supabase_get_object_public_request(path)?
+        };
+        self.sign(&mut req)?;
+        self.send(req).await
+    }
+
+    pub async fn supabase_get_object_info(
+        &self,
+        path: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = if self.key.is_some() {
+            self.supabase_get_object_info_auth_request(path)?
+        } else {
+            self.supabase_get_object_info_public_request(path)?
+        };
+        self.sign(&mut req)?;
+        self.send(req).await
+    }
+}
diff --git a/core/src/services/supabase/error.rs 
b/core/src/services/supabase/error.rs
new file mode 100644
index 00000000..15017871
--- /dev/null
+++ b/core/src/services/supabase/error.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use http::Response;
+use http::StatusCode;
+use serde::Deserialize;
+use serde_json::from_slice;
+
+use crate::{raw::*, Error, ErrorKind, Result};
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "camelCase")]
+/// The error returned by Supabase
+struct SupabaseError {
+    status_code: String,
+    error: String,
+    message: String,
+}
+
+/// Parse the supabase error type to the OpenDAL error type
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    // todo: the supabase error has status code 4XX, handle all that
+    let (kind, retryable) = match parts.status {
+        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+            (ErrorKind::ConditionNotMatch, false)
+        }
+        StatusCode::INTERNAL_SERVER_ERROR
+        | StatusCode::BAD_GATEWAY
+        | StatusCode::SERVICE_UNAVAILABLE
+        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, _) = from_slice::<SupabaseError>(&bs)
+        .map(|sb_err| (format!("{sb_err:?}"), Some(sb_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    let mut err = Error::new(kind, &message).with_context("response", 
format!("{parts:?}"));
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
diff --git a/core/src/services/supabase/mod.rs 
b/core/src/services/supabase/mod.rs
new file mode 100644
index 00000000..89aeada8
--- /dev/null
+++ b/core/src/services/supabase/mod.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::SupabaseBuilder as Supabase;
+mod core;
+mod error;
+mod writer;
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
new file mode 100644
index 00000000..5d32a412
--- /dev/null
+++ b/core/src/services/supabase/writer.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct SupabaseWriter {
+    core: Arc<SupabaseCore>,
+
+    op: OpWrite,
+    path: String,
+}
+
+impl SupabaseWriter {
+    pub fn new(core: Arc<SupabaseCore>, path: &str, op: OpWrite) -> Self {
+        SupabaseWriter {
+            core,
+            op,
+            path: path.to_string(),
+        }
+    }
+
+    pub async fn upload(&self, bytes: Bytes) -> Result<()> {
+        let size = bytes.len();
+        let mut req = self.core.supabase_upload_object_request(
+            &self.path,
+            Some(size),
+            self.op.content_type(),
+            AsyncBody::Bytes(bytes),
+        )?;
+
+        self.core.sign(&mut req)?;
+
+        let resp = self.core.send(req).await?;
+
+        match resp.status() {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for SupabaseWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        if bs.is_empty() {
+            return Ok(());
+        }
+
+        self.upload(bs).await
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "The abort operation is not yet supported for Supabase backend",
+        ))
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 5190bb21..5560fe9f 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -1062,7 +1062,7 @@ impl Accessor for WasabiBackend {
         if ops.len() > 1000 {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "s3 services only allow delete up to 1000 keys at once",
+                "wasabi services only allow delete up to 1000 keys at once",
             )
             .with_context("length", ops.len().to_string()));
         }
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index aff72868..3673b172 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
     S3,
     /// [sled][crate::services::Sled]: Sled services
     Sled,
+    /// [Supabase][crate::services::Supabase]: Supabase storage service
+    Supabase,
     /// [wasabi][crate::services::Wasabi]: Wasabi service
     Wasabi,
     /// [webdav][crate::services::Webdav]: WebDAV support.
@@ -130,6 +132,7 @@ impl FromStr for Scheme {
             "rocksdb" => Ok(Scheme::Rocksdb),
             "s3" => Ok(Scheme::S3),
             "sled" => Ok(Scheme::Sled),
+            "supabase" => Ok(Scheme::Supabase),
             "oss" => Ok(Scheme::Oss),
             "wasabi" => Ok(Scheme::Wasabi),
             "webdav" => Ok(Scheme::Webdav),
@@ -161,6 +164,7 @@ impl From<Scheme> for &'static str {
             Scheme::Rocksdb => "rocksdb",
             Scheme::S3 => "s3",
             Scheme::Sled => "sled",
+            Scheme::Supabase => "supabase",
             Scheme::Oss => "oss",
             Scheme::Wasabi => "wasabi",
             Scheme::Webdav => "webdav",

Reply via email to