This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new dd290ab7 feat(services/supabase): Add read/write/stat support for
supabase (#2119)
dd290ab7 is described below
commit dd290ab72e9940fefca36407a373a9f5aacefa06
Author: xyJi <[email protected]>
AuthorDate: Thu Apr 27 20:07:12 2023 +0800
feat(services/supabase): Add read/write/stat support for supabase (#2119)
* add supabase most basic skeleton
Signed-off-by: Ji-Xinyou <[email protected]>
* minimal buildable skeleton core for supabase
Signed-off-by: Ji-Xinyou <[email protected]>
* minimal writer
Signed-off-by: Ji-Xinyou <[email protected]>
* remove pager
Signed-off-by: Ji-Xinyou <[email protected]>
* minimal read/write functioning supabase
Signed-off-by: Ji-Xinyou <[email protected]>
* mux the code and implement stat
Signed-off-by: Ji-Xinyou <[email protected]>
* polish key loading
Signed-off-by: Ji-Xinyou <[email protected]>
* more docs
Signed-off-by: Ji-Xinyou <[email protected]>
* uncomment the conditional compilation
Signed-off-by: Ji-Xinyou <[email protected]>
* fix rebase
Signed-off-by: Ji-Xinyou <[email protected]>
* comments
Signed-off-by: Ji-Xinyou <[email protected]>
* service key only
Signed-off-by: Ji-Xinyou <[email protected]>
* check content length
Signed-off-by: Ji-Xinyou <[email protected]>
* simplify logics and use key only
Signed-off-by: Ji-Xinyou <[email protected]>
* minor refactor
Signed-off-by: Ji-Xinyou <[email protected]>
---------
Signed-off-by: Ji-Xinyou <[email protected]>
---
core/Cargo.toml | 1 +
core/src/services/mod.rs | 5 +
core/src/services/s3/core.rs | 2 -
core/src/services/supabase/backend.rs | 274 ++++++++++++++++++++++++++++++++++
core/src/services/supabase/core.rs | 200 +++++++++++++++++++++++++
core/src/services/supabase/error.rs | 64 ++++++++
core/src/services/supabase/mod.rs | 22 +++
core/src/services/supabase/writer.rs | 89 +++++++++++
core/src/services/wasabi/backend.rs | 2 +-
core/src/types/scheme.rs | 4 +
10 files changed, 660 insertions(+), 3 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 52330e0b..b8bbd0e9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -134,6 +134,7 @@ services-s3 = [
"reqsign?/reqwest_request",
]
services-sled = ["dep:sled"]
+services-supabase = []
services-wasabi = [
"dep:reqsign",
"reqsign?/services-aws",
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 4c1b7397..ab0aec0a 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -119,6 +119,11 @@ mod sled;
#[cfg(feature = "services-sled")]
pub use self::sled::Sled;
+// #[cfg(feature = "services-supabase")]
+mod supabase;
+// #[cfg(feature = "services-supabase")]
+pub use supabase::Supabase;
+
#[cfg(feature = "services-wasabi")]
mod wasabi;
#[cfg(feature = "services-wasabi")]
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 0081656f..eba880c0 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -446,8 +446,6 @@ impl S3Core {
self.send(req).await
}
- /// Make this functions as `pub(suber)` because `DirStream` depends
- /// on this.
pub async fn s3_list_objects(
&self,
path: &str,
diff --git a/core/src/services/supabase/backend.rs
b/core/src/services/supabase/backend.rs
new file mode 100644
index 00000000..ae252d08
--- /dev/null
+++ b/core/src/services/supabase/backend.rs
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+
+use super::core::*;
+use super::error::parse_error;
+use super::writer::*;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Supabase service
+///
+/// # Capabilities
+///
+/// - [x] read
+/// - [x] write
+/// - [ ] copy
+/// - [ ] list
+/// - [ ] scan
+/// - [ ] presign
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `root`: Set the work dir for backend.
+/// - `bucket`: Set the container name for backend.
+/// - `endpoint`: Set the endpoint for backend.
+/// - `key`: Set the authorization key for the backend, do not set if you want
to read public bucket
+///
+/// ## Authorization keys
+///
+/// There are two types of key in the Supabase, one is anon_key(Client key),
another one is
+/// service_role_key(Secret key). The former one can only write public
resources while the latter one
+/// can access all resources. Note that if you want to read public resources,
do not set the key.
+///
+/// # Example
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Supabase;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// let mut builder = Supabase::default();
+/// builder.root("/");
+/// builder.bucket("test_bucket");
+/// builder.endpoint("http://127.0.0.1:54321");
+/// // this sets up the anon_key, which means this operator can only write
public resource
+/// builder.key("some_anon_key");
+///
+/// let op: Operator = Operator::new(builder)?.finish();
+///
+/// Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct SupabaseBuilder {
+ root: Option<String>,
+
+ bucket: String,
+ endpoint: Option<String>,
+
+ key: Option<String>,
+
+ // todo: optional public, currently true always
+ // todo: optional file_size_limit, currently 0
+ // todo: optional allowed_mime_types, currently only string
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for SupabaseBuilder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SupabaseBuilder")
+ .field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl SupabaseBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// Set bucket name of this backend.
+ pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+ self.bucket = bucket.to_string();
+ self
+ }
+
+ /// Set endpoint of this backend.
+ ///
+ /// Endpoint must be full uri
+ pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+ self.endpoint = if endpoint.is_empty() {
+ None
+ } else {
+ Some(endpoint.trim_end_matches('/').to_string())
+ };
+
+ self
+ }
+
+ /// Set the authorization key for this backend
+ /// Do not set this key if you want to read public bucket
+ pub fn key(&mut self, key: &str) -> &mut Self {
+ self.key = Some(key.to_string());
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+}
+
+impl Builder for SupabaseBuilder {
+ const SCHEME: Scheme = Scheme::Supabase;
+ type Accessor = SupabaseBackend;
+
+ fn from_map(map: std::collections::HashMap<String, String>) -> Self {
+ let mut builder = SupabaseBuilder::default();
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("bucket").map(|v| builder.bucket(v));
+ map.get("endpoint").map(|v| builder.endpoint(v));
+ map.get("key").map(|v| builder.key(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let root = normalize_root(&self.root.take().unwrap_or_default());
+ debug!("backend use root {}", &root);
+
+ let bucket = &self.bucket;
+
+ let endpoint = self.endpoint.take().unwrap_or_default();
+
+ let http_client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::Supabase)
+ })?
+ };
+
+ let key = self.key.as_ref().map(|k| k.to_owned());
+
+ let core = SupabaseCore::new(&root, bucket, &endpoint, key,
http_client);
+
+ let core = Arc::new(core);
+
+ Ok(SupabaseBackend { core })
+ }
+}
+
+#[derive(Debug)]
+pub struct SupabaseBackend {
+ core: Arc<SupabaseCore>,
+}
+
+#[async_trait]
+impl Accessor for SupabaseBackend {
+ type Reader = IncomingAsyncBody;
+ type BlockingReader = ();
+ type Writer = SupabaseWriter;
+ type BlockingWriter = ();
+ // todo: implement Pager to support list and scan
+ type Pager = ();
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::Supabase)
+ .set_root(&self.core.root)
+ .set_name(&self.core.bucket)
+ .set_capability(Capability {
+ stat: true,
+ read: true,
+ write: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let resp = self.core.supabase_get_object(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let meta = parse_into_metadata(path, resp.headers())?;
+ Ok((RpRead::with_metadata(meta), resp.into_body()))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
+ Ok((
+ RpWrite::default(),
+ SupabaseWriter::new(self.core.clone(), path, args),
+ ))
+ }
+
+ async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let resp = self.core.supabase_get_object_info(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => parse_into_metadata(path,
resp.headers()).map(RpStat::new),
+ StatusCode::NOT_FOUND if path.ends_with('/') => {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
diff --git a/core/src/services/supabase/core.rs
b/core/src/services/supabase/core.rs
new file mode 100644
index 00000000..38391427
--- /dev/null
+++ b/core/src/services/supabase/core.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::HeaderValue;
+use http::Request;
+use http::Response;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct SupabaseCore {
+ pub root: String,
+ pub bucket: String,
+ pub endpoint: String,
+
+ /// The key used for authorization
+ /// If loaded, the read operation will always access the nonpublic
resources.
+ /// If you want to read the public resources, please do not set the key.
+ pub key: Option<String>,
+
+ pub http_client: HttpClient,
+}
+
+impl Debug for SupabaseCore {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SupabaseCore")
+ .field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl SupabaseCore {
+ pub fn new(
+ root: &str,
+ bucket: &str,
+ endpoint: &str,
+ key: Option<String>,
+ client: HttpClient,
+ ) -> Self {
+ Self {
+ root: root.to_string(),
+ bucket: bucket.to_string(),
+ endpoint: endpoint.to_string(),
+ key,
+ http_client: client,
+ }
+ }
+
+ /// Add authorization header to the request if the key is set. Otherwise
leave
+ /// the request as-is.
+ pub fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ if let Some(k) = &self.key {
+ let v = HeaderValue::from_str(&format!("Bearer {}", k)).unwrap();
+ req.headers_mut().insert(http::header::AUTHORIZATION, v);
+ }
+ Ok(())
+ }
+}
+
+// requests
+impl SupabaseCore {
+ pub fn supabase_upload_object_request(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/storage/v1/object/{}/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::post(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+
+ if let Some(mime) = content_type {
+ req = req.header(CONTENT_TYPE, mime)
+ }
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub fn supabase_get_object_public_request(&self, path: &str) ->
Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/storage/v1/object/public/{}/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)
+ }
+
+ pub fn supabase_get_object_auth_request(&self, path: &str) ->
Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/storage/v1/object/authenticated/{}/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)
+ }
+
+ pub fn supabase_get_object_info_public_request(
+ &self,
+ path: &str,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/storage/v1/object/info/public/{}/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)
+ }
+
+ pub fn supabase_get_object_info_auth_request(&self, path: &str) ->
Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/storage/v1/object/info/authenticated/{}/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)
+ }
+}
+
+// core utils
+impl SupabaseCore {
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.http_client.send(req).await
+ }
+
+ pub async fn supabase_get_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let mut req = if self.key.is_some() {
+ self.supabase_get_object_auth_request(path)?
+ } else {
+ self.supabase_get_object_public_request(path)?
+ };
+ self.sign(&mut req)?;
+ self.send(req).await
+ }
+
+ pub async fn supabase_get_object_info(
+ &self,
+ path: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = if self.key.is_some() {
+ self.supabase_get_object_info_auth_request(path)?
+ } else {
+ self.supabase_get_object_info_public_request(path)?
+ };
+ self.sign(&mut req)?;
+ self.send(req).await
+ }
+}
diff --git a/core/src/services/supabase/error.rs
b/core/src/services/supabase/error.rs
new file mode 100644
index 00000000..15017871
--- /dev/null
+++ b/core/src/services/supabase/error.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use http::Response;
+use http::StatusCode;
+use serde::Deserialize;
+use serde_json::from_slice;
+
+use crate::{raw::*, Error, ErrorKind, Result};
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "camelCase")]
+/// The error returned by Supabase
+struct SupabaseError {
+ status_code: String,
+ error: String,
+ message: String,
+}
+
+/// Parse the supabase error type to the OpenDAL error type
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ // todo: the supabase error has status code 4XX, handle all that
+ let (kind, retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ (ErrorKind::ConditionNotMatch, false)
+ }
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let (message, _) = from_slice::<SupabaseError>(&bs)
+ .map(|sb_err| (format!("{sb_err:?}"), Some(sb_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ let mut err = Error::new(kind, &message).with_context("response",
format!("{parts:?}"));
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
diff --git a/core/src/services/supabase/mod.rs
b/core/src/services/supabase/mod.rs
new file mode 100644
index 00000000..89aeada8
--- /dev/null
+++ b/core/src/services/supabase/mod.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::SupabaseBuilder as Supabase;
+mod core;
+mod error;
+mod writer;
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
new file mode 100644
index 00000000..5d32a412
--- /dev/null
+++ b/core/src/services/supabase/writer.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct SupabaseWriter {
+ core: Arc<SupabaseCore>,
+
+ op: OpWrite,
+ path: String,
+}
+
+impl SupabaseWriter {
+ pub fn new(core: Arc<SupabaseCore>, path: &str, op: OpWrite) -> Self {
+ SupabaseWriter {
+ core,
+ op,
+ path: path.to_string(),
+ }
+ }
+
+ pub async fn upload(&self, bytes: Bytes) -> Result<()> {
+ let size = bytes.len();
+ let mut req = self.core.supabase_upload_object_request(
+ &self.path,
+ Some(size),
+ self.op.content_type(),
+ AsyncBody::Bytes(bytes),
+ )?;
+
+ self.core.sign(&mut req)?;
+
+ let resp = self.core.send(req).await?;
+
+ match resp.status() {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Write for SupabaseWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ if bs.is_empty() {
+ return Ok(());
+ }
+
+ self.upload(bs).await
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "The abort operation is not yet supported for Supabase backend",
+ ))
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
index 5190bb21..5560fe9f 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -1062,7 +1062,7 @@ impl Accessor for WasabiBackend {
if ops.len() > 1000 {
return Err(Error::new(
ErrorKind::Unsupported,
- "s3 services only allow delete up to 1000 keys at once",
+ "wasabi services only allow delete up to 1000 keys at once",
)
.with_context("length", ops.len().to_string()));
}
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index aff72868..3673b172 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
S3,
/// [sled][crate::services::Sled]: Sled services
Sled,
+ /// [Supabase][crate::services::Supabase]: Supabase storage service
+ Supabase,
/// [wasabi][crate::services::Wasabi]: Wasabi service
Wasabi,
/// [webdav][crate::services::Webdav]: WebDAV support.
@@ -130,6 +132,7 @@ impl FromStr for Scheme {
"rocksdb" => Ok(Scheme::Rocksdb),
"s3" => Ok(Scheme::S3),
"sled" => Ok(Scheme::Sled),
+ "supabase" => Ok(Scheme::Supabase),
"oss" => Ok(Scheme::Oss),
"wasabi" => Ok(Scheme::Wasabi),
"webdav" => Ok(Scheme::Webdav),
@@ -161,6 +164,7 @@ impl From<Scheme> for &'static str {
Scheme::Rocksdb => "rocksdb",
Scheme::S3 => "s3",
Scheme::Sled => "sled",
+ Scheme::Supabase => "supabase",
Scheme::Oss => "oss",
Scheme::Wasabi => "wasabi",
Scheme::Webdav => "webdav",