This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 18bdf3af35 feat(services/vercel_blob): support vercel blob (#4103)
18bdf3af35 is described below

commit 18bdf3af359965bb001ebd95b9ec51cb6660b2d0
Author: hoslo <[email protected]>
AuthorDate: Thu Feb 1 01:24:05 2024 +0800

    feat(services/vercel_blob): support vercel blob (#4103)
---
 .env.example                             |   3 +
 core/Cargo.toml                          |   1 +
 core/src/services/mod.rs                 |   7 +
 core/src/services/vercel_blob/backend.rs | 283 ++++++++++++++++++++
 core/src/services/vercel_blob/core.rs    | 433 +++++++++++++++++++++++++++++++
 core/src/services/vercel_blob/docs.md    |  47 ++++
 core/src/services/vercel_blob/error.rs   | 118 +++++++++
 core/src/services/vercel_blob/lister.rs  |  72 +++++
 core/src/services/vercel_blob/mod.rs     |  25 ++
 core/src/services/vercel_blob/writer.rs  | 148 +++++++++++
 core/src/types/operator/builder.rs       |   2 +
 core/src/types/scheme.rs                 |   6 +
 core/tests/behavior/async_write.rs       |   5 +
 13 files changed, 1150 insertions(+)

diff --git a/.env.example b/.env.example
index bcb6952d97..083076a969 100644
--- a/.env.example
+++ b/.env.example
@@ -216,3 +216,6 @@ OPENDAL_ICLOUD_PASSWORD=<password>
 OPENDAL_ICLOUD_TRUST_TOKEN=<trust_token>
 OPENDAL_ICLOUD_DS_WEB_AUTH_TOKEN=<ds_web_auth_token>
 OPENDAL_ICLOUD_IS_CHINA_MAINLAND=true
+# vercel blob
+OPENDAL_VERCEL_BLOB_ROOT=/path/to/dir
+OPENDAL_VERCEL_BLOB_TOKEN=<token>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 81a1668eee..40ffb52de7 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -199,6 +199,7 @@ services-swift = []
 services-tikv = ["tikv-client"]
 services-upyun = ["dep:hmac", "dep:sha1"]
 services-vercel-artifacts = []
+services-vercel-blob = []
 # Deprecated
 # wasabi services support has been removed.
 # We will remove this feature in the next version.
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index e7ffb00844..16f467329b 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -377,3 +377,10 @@ mod koofr;
 pub use koofr::Koofr;
 #[cfg(feature = "services-koofr")]
 pub use koofr::KoofrConfig;
+
+#[cfg(feature = "services-vercel-blob")]
+mod vercel_blob;
+#[cfg(feature = "services-vercel-blob")]
+pub use vercel_blob::VercelBlob;
+#[cfg(feature = "services-vercel-blob")]
+pub use vercel_blob::VercelBlobConfig;
diff --git a/core/src/services/vercel_blob/backend.rs 
b/core/src/services/vercel_blob/backend.rs
new file mode 100644
index 0000000000..4f63fc0362
--- /dev/null
+++ b/core/src/services/vercel_blob/backend.rs
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+
+use super::core::parse_blob;
+use super::core::Blob;
+use super::core::VercelBlobCore;
+use super::error::parse_error;
+use super::lister::VercelBlobLister;
+use super::writer::VercelBlobWriter;
+use super::writer::VercelBlobWriters;
+use crate::raw::*;
+use crate::*;
+
+/// Config for backblaze VercelBlob services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct VercelBlobConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub root: Option<String>,
+    /// vercel blob token.
+    pub token: String,
+}
+
+impl Debug for VercelBlobConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Config");
+
+        ds.field("root", &self.root);
+
+        ds.finish()
+    }
+}
+
+/// [VercelBlob](https://vercel.com/docs/storage/vercel-blob) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct VercelBlobBuilder {
+    config: VercelBlobConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for VercelBlobBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("VercelBlobBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl VercelBlobBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// Vercel Blob token.
+    ///
+    /// Get from Vercel environment variable `BLOB_READ_WRITE_TOKEN`.
+    /// It is required.
+    pub fn token(&mut self, token: &str) -> &mut Self {
+        self.config.token = token.to_string();
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for VercelBlobBuilder {
+    const SCHEME: Scheme = Scheme::VercelBlob;
+    type Accessor = VercelBlobBackend;
+
+    /// Converts a HashMap into an VercelBlobBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of VercelBlobBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = 
VercelBlobConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an VercelBlobBuilder instance with the deserialized config.
+        VercelBlobBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of VercelBlobBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle token.
+        if self.config.token.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "token is empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::VercelBlob));
+        }
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::VercelBlob)
+            })?
+        };
+
+        Ok(VercelBlobBackend {
+            core: Arc::new(VercelBlobCore {
+                root,
+                token: self.config.token.clone(),
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for VercelBlob services.
+#[derive(Debug, Clone)]
+pub struct VercelBlobBackend {
+    core: Arc<VercelBlobCore>,
+}
+
+#[async_trait]
+impl Accessor for VercelBlobBackend {
+    type Reader = IncomingAsyncBody;
+    type Writer = VercelBlobWriters;
+    type Lister = oio::PageLister<VercelBlobLister>;
+    type BlockingReader = ();
+    type BlockingWriter = ();
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::VercelBlob)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                read: true,
+                read_can_next: true,
+                read_with_range: true,
+
+                write: true,
+                write_can_empty: true,
+                write_can_multi: true,
+                write_multi_min_size: Some(5 * 1024 * 1024),
+
+                delete: true,
+                copy: true,
+
+                list: true,
+                list_with_limit: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        let resp = self.core.head(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp: Blob = 
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+                parse_blob(&resp).map(RpStat::new)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self.core.download(path, args).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let size = parse_content_length(resp.headers())?;
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let concurrent = args.concurrent();
+        let writer = VercelBlobWriter::new(self.core.clone(), args, 
path.to_string());
+
+        let w = oio::MultipartWriter::new(writer, concurrent);
+
+        Ok((RpWrite::default(), w))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        self.core.delete(path).await.map(|_| RpDelete::default())
+    }
+
+    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        let resp = self.core.copy(from, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+
+                Ok(RpCopy::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let l = VercelBlobLister::new(self.core.clone(), path, args.limit());
+        Ok((RpList::default(), oio::PageLister::new(l)))
+    }
+}
diff --git a/core/src/services/vercel_blob/core.rs 
b/core/src/services/vercel_blob/core.rs
new file mode 100644
index 0000000000..54900dfff8
--- /dev/null
+++ b/core/src/services/vercel_blob/core.rs
@@ -0,0 +1,433 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use bytes::Bytes;
+use http::header;
+use http::request;
+use http::Request;
+use http::Response;
+use http::StatusCode;
+use serde::Deserialize;
+use serde::Serialize;
+use serde_json::json;
+
+use self::constants::*;
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_error;
+
+pub(super) mod constants {
+    // https://github.com/vercel/storage/blob/main/packages/blob/src/put.ts#L16
+    // x-content-type specifies the MIME type of the file being uploaded.
+    pub const X_VERCEL_BLOB_CONTENT_TYPE: &str = "x-content-type";
+    // x-add-random-suffix specifying whether to  add a random suffix to the 
pathname
+    // Default value is 1, which means to add a random suffix.
+    // Set it to 0 to disable the random suffix.
+    pub const X_VERCEL_BLOB_ADD_RANDOM_SUFFIX: &str = "x-add-random-suffix";
+    // 
https://github.com/vercel/storage/blob/main/packages/blob/src/put-multipart.ts#L84
+    // x-mpu-action specifies the action to perform on the MPU.
+    // Possible values are:
+    // - create: create a new MPU.
+    // - upload: upload a part to an existing MPU.
+    // - complete: complete an existing MPU.
+    pub const X_VERCEL_BLOB_MPU_ACTION: &str = "x-mpu-action";
+    pub const X_VERCEL_BLOB_MPU_KEY: &str = "x-mpu-key";
+    pub const X_VERCEL_BLOB_MPU_PART_NUMBER: &str = "x-mpu-part-number";
+    pub const X_VERCEL_BLOB_MPU_UPLOAD_ID: &str = "x-mpu-upload-id";
+}
+
+#[derive(Clone)]
+pub struct VercelBlobCore {
+    /// The root of this core.
+    pub root: String,
+    /// Vercel Blob token.
+    pub token: String,
+
+    pub client: HttpClient,
+}
+
+impl Debug for VercelBlobCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .finish_non_exhaustive()
+    }
+}
+
+impl VercelBlobCore {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+
+    pub fn sign(&self, req: request::Builder) -> request::Builder {
+        req.header(header::AUTHORIZATION, format!("Bearer {}", self.token))
+    }
+}
+
+impl VercelBlobCore {
+    pub async fn download(&self, path: &str, args: OpRead) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        // Vercel blob use an unguessable random id url to download the file
+        // So we use list to get the url of the file and then use it to 
download the file
+        let resp = self.list(&p, Some(1)).await?;
+
+        // Use the mtach url to download the file
+        let url = resolve_blob(resp.blobs, p);
+
+        if url.is_empty() {
+            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
+        }
+
+        let mut req = Request::get(url);
+
+        let range = args.range();
+        if !range.is_full() {
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn get_put_request(
+        &self,
+        path: &str,
+        size: Option<u64>,
+        args: &OpWrite,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://blob.vercel-storage.com/{}";,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size.to_string())
+        }
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime)
+        }
+
+        let req = self.sign(req);
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn delete(&self, path: &str) -> Result<()> {
+        let p = build_abs_path(&self.root, path);
+
+        let resp = self.list(&p, Some(1)).await?;
+
+        let url = resolve_blob(resp.blobs, p);
+
+        if url.is_empty() {
+            return Ok(());
+        }
+
+        let req = Request::post("https://blob.vercel-storage.com/delete";);
+
+        let req = self.sign(req);
+
+        let req_body = &json!({
+            "urls": vec![url]
+        });
+
+        let req = req
+            .header(header::CONTENT_TYPE, "application/json")
+            .body(AsyncBody::Bytes(Bytes::from(req_body.to_string())))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn head(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let resp = self.list(&p, Some(1)).await?;
+
+        let url = resolve_blob(resp.blobs, p);
+
+        if url.is_empty() {
+            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
+        }
+
+        let req = Request::get(format!(
+            "https://blob.vercel-storage.com?url={}";,
+            percent_encode_path(&url)
+        ));
+
+        let req = self.sign(req);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn copy(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = build_abs_path(&self.root, from);
+
+        let resp = self.list(&from, Some(1)).await?;
+
+        let from_url = resolve_blob(resp.blobs, from);
+
+        if from_url.is_empty() {
+            return Err(Error::new(ErrorKind::NotFound, "Blob not found"));
+        }
+
+        let to = build_abs_path(&self.root, to);
+
+        let to_url = format!(
+            "https://blob.vercel-storage.com/{}?fromUrl={}";,
+            percent_encode_path(&to),
+            percent_encode_path(&from_url),
+        );
+
+        let req = Request::put(&to_url);
+
+        let req = self.sign(req);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn list(&self, prefix: &str, limit: Option<usize>) -> 
Result<ListResponse> {
+        let prefix = if prefix == "/" { "" } else { prefix };
+
+        let mut url = format!(
+            "https://blob.vercel-storage.com?prefix={}";,
+            percent_encode_path(prefix)
+        );
+
+        if let Some(limit) = limit {
+            url.push_str(&format!("&limit={}", limit))
+        }
+
+        let req = Request::get(&url);
+
+        let req = self.sign(req);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+
+        let status = resp.status();
+
+        if status != StatusCode::OK {
+            return Err(parse_error(resp).await?);
+        }
+
+        let body = resp.into_body().bytes().await?;
+
+        let resp: ListResponse =
+            serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+
+        Ok(resp)
+    }
+
+    pub async fn initiate_multipart_upload(
+        &self,
+        path: &str,
+        args: &OpWrite,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://blob.vercel-storage.com/mpu/{}";,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::post(&url);
+
+        let mut req = self.sign(req);
+
+        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "create");
+        req = req.header(X_VERCEL_BLOB_ADD_RANDOM_SUFFIX, "0");
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(X_VERCEL_BLOB_CONTENT_TYPE, mime);
+        };
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload_part(
+        &self,
+        path: &str,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://blob.vercel-storage.com/mpu/{}";,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+
+        req = req.header(header::CONTENT_LENGTH, size);
+        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "upload");
+        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
+        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
+        req = req.header(X_VERCEL_BLOB_MPU_PART_NUMBER, part_number);
+
+        let req = self.sign(req);
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn complete_multipart_upload(
+        &self,
+        path: &str,
+        upload_id: &str,
+        parts: Vec<Part>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://blob.vercel-storage.com/mpu/{}";,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+
+        req = req.header(X_VERCEL_BLOB_MPU_ACTION, "complete");
+        req = req.header(X_VERCEL_BLOB_MPU_KEY, p);
+        req = req.header(X_VERCEL_BLOB_MPU_UPLOAD_ID, upload_id);
+
+        let req = self.sign(req);
+
+        let parts_json = json!(parts);
+
+        let req = req
+            .header(header::CONTENT_TYPE, "application/json")
+            .body(AsyncBody::Bytes(Bytes::from(parts_json.to_string())))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+}
+
+pub fn parse_blob(blob: &Blob) -> Result<Metadata> {
+    let mode = if blob.pathname.ends_with('/') {
+        EntryMode::DIR
+    } else {
+        EntryMode::FILE
+    };
+    let mut md = Metadata::new(mode);
+    if let Some(content_type) = blob.content_type.clone() {
+        md.set_content_type(&content_type);
+    }
+    md.set_content_length(blob.size);
+    md.set_last_modified(parse_datetime_from_rfc3339(&blob.uploaded_at)?);
+    md.set_content_disposition(&blob.content_disposition);
+    Ok(md)
+}
+
+fn resolve_blob(blobs: Vec<Blob>, path: String) -> String {
+    for blob in blobs {
+        if blob.pathname == path {
+            return blob.url;
+        }
+    }
+    "".to_string()
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListResponse {
+    pub cursor: Option<String>,
+    pub has_more: bool,
+    pub blobs: Vec<Blob>,
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Blob {
+    pub url: String,
+    pub pathname: String,
+    pub size: u64,
+    pub uploaded_at: String,
+    pub content_disposition: String,
+    pub content_type: Option<String>,
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Part {
+    pub part_number: usize,
+    pub etag: String,
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct InitiateMultipartUploadResponse {
+    pub upload_id: String,
+    pub key: String,
+}
+
+#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UploadPartResponse {
+    pub etag: String,
+}
diff --git a/core/src/services/vercel_blob/docs.md 
b/core/src/services/vercel_blob/docs.md
new file mode 100644
index 0000000000..f640e89187
--- /dev/null
+++ b/core/src/services/vercel_blob/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [ ] rename
+- [x] list
+- [x] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `token`: VercelBlob token, environment var `BLOB_READ_WRITE_TOKEN`
+
+You can refer to [`VercelBlobBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::VercelBlob;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create backend builder
+    let mut builder = VercelBlob::default();
+
+    // set the storage bucket for OpenDAL
+    builder.root("/");
+    // set the token for OpenDAL
+    builder.token("you_token");
+
+    let op: Operator = Operator::new(builder)?.finish();
+
+    Ok(())
+}
+```
diff --git a/core/src/services/vercel_blob/error.rs 
b/core/src/services/vercel_blob/error.rs
new file mode 100644
index 0000000000..2d689983d5
--- /dev/null
+++ b/core/src/services/vercel_blob/error.rs
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// VercelBlobError is the error returned by VercelBlob service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct VercelBlobError {
+    error: VercelBlobErrorDetail,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct VercelBlobErrorDetail {
+    code: String,
+    message: Option<String>,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (kind, retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, _vercel_blob_err) = de::from_reader::<_, 
VercelBlobError>(bs.clone().reader())
+        .map(|vercel_blob_err| (format!("{vercel_blob_err:?}"), 
Some(vercel_blob_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    let mut err = Error::new(kind, &message);
+
+    err = with_error_response_context(err, parts);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
+
+#[cfg(test)]
+mod test {
+    use futures::stream;
+    use http::StatusCode;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_parse_error() {
+        let err_res = vec![(
+            r#"{
+                    "error": {
+                        "code": "forbidden",
+                        "message": "Invalid token"
+                    }
+                }"#,
+            ErrorKind::PermissionDenied,
+            StatusCode::FORBIDDEN,
+        )];
+
+        for res in err_res {
+            let bs = bytes::Bytes::from(res.0);
+            let body = IncomingAsyncBody::new(
+                Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+                None,
+            );
+            let resp = Response::builder().status(res.2).body(body).unwrap();
+
+            let err = parse_error(resp).await;
+
+            assert!(err.is_ok());
+            assert_eq!(err.unwrap().kind(), res.1);
+        }
+
+        let bs = bytes::Bytes::from(
+            r#"{
+                "error": {
+                    "code": "forbidden",
+                    "message": "Invalid token"
+                }
+            }"#,
+        );
+
+        let out: VercelBlobError = 
serde_json::from_reader(bs.reader()).expect("must success");
+        println!("{out:?}");
+
+        assert_eq!(out.error.code, "forbidden");
+        assert_eq!(out.error.message, Some("Invalid token".to_string()));
+    }
+}
diff --git a/core/src/services/vercel_blob/lister.rs 
b/core/src/services/vercel_blob/lister.rs
new file mode 100644
index 0000000000..e792d8f379
--- /dev/null
+++ b/core/src/services/vercel_blob/lister.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::parse_blob;
+use super::core::VercelBlobCore;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::Result;
+
+pub struct VercelBlobLister {
+    core: Arc<VercelBlobCore>,
+
+    path: String,
+    limit: Option<usize>,
+}
+
+impl VercelBlobLister {
+    pub(super) fn new(core: Arc<VercelBlobCore>, path: &str, limit: 
Option<usize>) -> Self {
+        VercelBlobLister {
+            core,
+            path: path.to_string(),
+            limit,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::PageList for VercelBlobLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let p = build_abs_path(&self.core.root, &self.path);
+
+        let resp = self.core.list(&p, self.limit).await?;
+
+        ctx.done = !resp.has_more;
+
+        if let Some(cursor) = resp.cursor {
+            ctx.token = cursor;
+        }
+
+        for blob in resp.blobs {
+            let path = build_rel_path(&self.core.root, &blob.pathname);
+
+            if path == self.path {
+                continue;
+            }
+
+            let md = parse_blob(&blob)?;
+
+            ctx.entries.push_back(Entry::new(&path, md));
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/vercel_blob/mod.rs 
b/core/src/services/vercel_blob/mod.rs
new file mode 100644
index 0000000000..2804d3fe73
--- /dev/null
+++ b/core/src/services/vercel_blob/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::VercelBlobBuilder as VercelBlob;
+pub use backend::VercelBlobConfig;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/vercel_blob/writer.rs 
b/core/src/services/vercel_blob/writer.rs
new file mode 100644
index 0000000000..46b8e32b8b
--- /dev/null
+++ b/core/src/services/vercel_blob/writer.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use super::core::{InitiateMultipartUploadResponse, Part, UploadPartResponse, 
VercelBlobCore};
+use super::error::parse_error;
+use crate::raw::*;
+use crate::*;
+
+pub type VercelBlobWriters = oio::MultipartWriter<VercelBlobWriter>;
+
+pub struct VercelBlobWriter {
+    core: Arc<VercelBlobCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl VercelBlobWriter {
+    pub fn new(core: Arc<VercelBlobCore>, op: OpWrite, path: String) -> Self {
+        VercelBlobWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartWrite for VercelBlobWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let req = self
+            .core
+            .get_put_request(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self
+            .core
+            .initiate_multipart_upload(&self.path, &self.op)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp = 
serde_json::from_slice::<InitiateMultipartUploadResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                Ok(resp.upload_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartPart> {
+        let part_number = part_number + 1;
+
+        let resp = self
+            .core
+            .upload_part(&self.path, upload_id, part_number, size, body)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp = serde_json::from_slice::<UploadPartResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                Ok(oio::MultipartPart {
+                    part_number,
+                    etag: resp.etag,
+                })
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn complete_part(&self, upload_id: &str, parts: 
&[oio::MultipartPart]) -> Result<()> {
+        let parts = parts
+            .iter()
+            .map(|p| Part {
+                part_number: p.part_number,
+                etag: p.etag.clone(),
+            })
+            .collect::<Vec<Part>>();
+
+        let resp = self
+            .core
+            .complete_multipart_upload(&self.path, upload_id, parts)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "VercelBlob does not support abort multipart upload",
+        ))
+    }
+}
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index d5ac384c03..a5767f59e3 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -257,6 +257,8 @@ impl Operator {
             Scheme::Tikv => Self::from_map::<services::Tikv>(map)?.finish(),
             #[cfg(feature = "services-vercel-artifacts")]
             Scheme::VercelArtifacts => 
Self::from_map::<services::VercelArtifacts>(map)?.finish(),
+            #[cfg(feature = "services-vercel-blob")]
+            Scheme::VercelBlob => 
Self::from_map::<services::VercelBlob>(map)?.finish(),
             #[cfg(feature = "services-webdav")]
             Scheme::Webdav => 
Self::from_map::<services::Webdav>(map)?.finish(),
             #[cfg(feature = "services-webhdfs")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 9276b1c323..6620fa8145 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -44,6 +44,8 @@ pub enum Scheme {
     Seafile,
     /// [Upyun][crate::services::Upyun]: Upyun Services.
     Upyun,
+    /// [VercelBlob][crate::services::VercelBlob]: VercelBlob Services.
+    VercelBlob,
     /// [YandexDisk][crate::services::YandexDisk]: YandexDisk Services.
     YandexDisk,
     /// [Pcloud][crate::services::Pcloud]: Pcloud Services.
@@ -279,6 +281,8 @@ impl Scheme {
             Scheme::Tikv,
             #[cfg(feature = "services-vercel-artifacts")]
             Scheme::VercelArtifacts,
+            #[cfg(feature = "services-vercel-blob")]
+            Scheme::VercelBlob,
             #[cfg(feature = "services-webdav")]
             Scheme::Webdav,
             #[cfg(feature = "services-webhdfs")]
@@ -367,6 +371,7 @@ impl FromStr for Scheme {
             "swift" => Ok(Scheme::Swift),
             "oss" => Ok(Scheme::Oss),
             "vercel_artifacts" => Ok(Scheme::VercelArtifacts),
+            "vercel_blob" => Ok(Scheme::VercelBlob),
             "webdav" => Ok(Scheme::Webdav),
             "webhdfs" => Ok(Scheme::Webhdfs),
             "tikv" => Ok(Scheme::Tikv),
@@ -427,6 +432,7 @@ impl From<Scheme> for &'static str {
             Scheme::Supabase => "supabase",
             Scheme::Swift => "swift",
             Scheme::VercelArtifacts => "vercel_artifacts",
+            Scheme::VercelBlob => "vercel_blob",
             Scheme::Oss => "oss",
             Scheme::Webdav => "webdav",
             Scheme::Webhdfs => "webhdfs",
diff --git a/core/tests/behavior/async_write.rs 
b/core/tests/behavior/async_write.rs
index fdef821b28..9b3124c456 100644
--- a/core/tests/behavior/async_write.rs
+++ b/core/tests/behavior/async_write.rs
@@ -113,6 +113,11 @@ pub async fn test_write_with_special_chars(op: Operator) 
-> Result<()> {
         warn!("ignore test for atomicserver until 
https://github.com/atomicdata-dev/atomic-server/issues/663 is resolved");
         return Ok(());
     }
+    // Ignore test for vercel blob https://github.com/apache/opendal/pull/4103.
+    if op.info().scheme() == opendal::Scheme::VercelBlob {
+        warn!("ignore test for vercel blob 
https://github.com/apache/opendal/pull/4103";);
+        return Ok(());
+    }
 
     let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4());
     let (path, content, size) = TEST_FIXTURE.new_file_with_path(op.clone(), 
&path);


Reply via email to