This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e369d2f7 feat(services): add upyun support (#3790)
5e369d2f7 is described below

commit 5e369d2f72f9f89e40a864cbca243f762fc9c3cf
Author: hoslo <[email protected]>
AuthorDate: Fri Dec 22 10:00:37 2023 +0800

    feat(services): add upyun support (#3790)
---
 .env.example                       |   7 +-
 Cargo.lock                         |   2 +
 bindings/java/Cargo.toml           |   2 +
 bindings/nodejs/Cargo.toml         |   2 +
 bindings/python/Cargo.toml         |   2 +
 core/Cargo.toml                    |   5 +-
 core/src/services/mod.rs           |   7 +
 core/src/services/upyun/backend.rs | 378 ++++++++++++++++++++++++
 core/src/services/upyun/core.rs    | 586 +++++++++++++++++++++++++++++++++++++
 core/src/services/upyun/docs.md    |  53 ++++
 core/src/services/upyun/error.rs   | 104 +++++++
 core/src/services/upyun/lister.rs  | 101 +++++++
 core/src/services/upyun/mod.rs     |  25 ++
 core/src/services/upyun/writer.rs  | 145 +++++++++
 core/src/types/operator/builder.rs |   2 +
 core/src/types/scheme.rs           |   6 +
 16 files changed, 1425 insertions(+), 2 deletions(-)

diff --git a/.env.example b/.env.example
index f95a10298..f8fa06e88 100644
--- a/.env.example
+++ b/.env.example
@@ -185,4 +185,9 @@ OPENDAL_SEAFILE_ROOT=/path/to/dir
 OPENDAL_SEAFILE_ENDPOINT=<endpoint>
 OPENDAL_SEAFILE_USERNAME=<usernmae>
 OPENDAL_SEAFILE_PASSWORD=<password>
-OPENDAL_SEAFILE_REPO_NAME=<repo_name>
\ No newline at end of file
+OPENDAL_SEAFILE_REPO_NAME=<repo_name>
+# upyun
+OPENDAL_UPYUN_ROOT=/path/to/dir
+OPENDAL_UPYUN_BUCKET=<bucket>
+OPENDAL_UPYUN_OPERATOR=<operator>
+OPENDAL_UPYUN_PASSWORD=<password>
\ No newline at end of file
diff --git a/Cargo.lock b/Cargo.lock
index 864b6d346..0060321d2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4640,6 +4640,7 @@ dependencies = [
  "getrandom 0.2.11",
  "governor",
  "hdrs",
+ "hmac",
  "hrana-client-proto",
  "http",
  "libtest-mimic",
@@ -4675,6 +4676,7 @@ dependencies = [
  "rusqlite",
  "serde",
  "serde_json",
+ "sha1",
  "sha2",
  "size",
  "sled",
diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml
index de42caf06..0f1a3121e 100644
--- a/bindings/java/Cargo.toml
+++ b/bindings/java/Cargo.toml
@@ -91,6 +91,7 @@ services-all = [
   "services-alluxio",
   "services-b2",
   "services-seafile",
+  "services-upyun",
 ]
 
 # Default services provided by opendal.
@@ -113,6 +114,7 @@ services-webhdfs = ["opendal/services-webhdfs"]
 services-alluxio = ["opendal/services-alluxio"]
 services-azfile = ["opendal/services-azfile"]
 services-b2 = ["opendal/services-b2"]
+services-upyun = ["opendal/services-upyun"]
 services-cacache = ["opendal/services-cacache"]
 services-dashmap = ["opendal/services-dashmap"]
 services-dropbox = ["opendal/services-dropbox"]
diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml
index a812d69ec..c0f9cd752 100644
--- a/bindings/nodejs/Cargo.toml
+++ b/bindings/nodejs/Cargo.toml
@@ -86,6 +86,7 @@ services-all = [
   "services-alluxio",
   "services-b2",
   "services-seafile",
+  "services-upyun",
 ]
 
 # Default services provided by opendal.
@@ -107,6 +108,7 @@ services-webhdfs = ["opendal/services-webhdfs"]
 # Optional services provided by opendal.
 services-alluxio = ["opendal/services-alluxio"]
 services-azfile = ["opendal/services-azfile"]
+services-upyun = ["opendal/services-upyun"]
 services-b2 = ["opendal/services-b2"]
 services-cacache = ["opendal/services-cacache"]
 services-dashmap = ["opendal/services-dashmap"]
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 5de87a081..38fe2597c 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -85,6 +85,7 @@ services-all = [
   "services-alluxio",
   "services-b2",
   "services-seafile",
+  "services-upyun",
 ]
 
 # Default services provided by opendal.
@@ -107,6 +108,7 @@ services-webhdfs = ["opendal/services-webhdfs"]
 services-alluxio = ["opendal/services-alluxio"]
 services-azfile = ["opendal/services-azfile"]
 services-b2 = ["opendal/services-b2"]
+services-upyun = ["opendal/services-upyun"]
 services-cacache = ["opendal/services-cacache"]
 services-dashmap = ["opendal/services-dashmap"]
 services-dropbox = ["opendal/services-dropbox"]
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 76084a8c3..3dd0438ea 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -127,6 +127,7 @@ services-azdls = [
 services-azfile = []
 services-b2 = []
 services-seafile = []
+services-upyun = ["dep:hmac", "dep:sha1"]
 services-cacache = ["dep:cacache"]
 services-cloudflare-kv = []
 services-cos = [
@@ -293,6 +294,8 @@ rusqlite = { version = "0.29.0", optional = true, features 
= ["bundled"] }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
 sha2 = { version = "0.10", optional = true }
+hmac = { version = "0.12.1", optional = true }
+sha1 = { version = "0.10.6", optional = true }
 sled = { version = "0.34.7", optional = true }
 suppaftp = { version = "5.2", default-features = false, features = [
   "async-secure",
@@ -326,4 +329,4 @@ tracing-subscriber = { version = "0.3", features = [
   "env-filter",
   "tracing-log",
 ] }
-wiremock = "0.5"
+wiremock = "0.5"
\ No newline at end of file
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 086db17cd..a25f8d7bf 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -312,3 +312,10 @@ mod seafile;
 pub use seafile::Seafile;
 #[cfg(feature = "services-seafile")]
 pub use seafile::SeafileConfig;
+
+#[cfg(feature = "services-upyun")]
+mod upyun;
+#[cfg(feature = "services-upyun")]
+pub use upyun::Upyun;
+#[cfg(feature = "services-upyun")]
+pub use upyun::UpyunConfig;
diff --git a/core/src/services/upyun/backend.rs 
b/core/src/services/upyun/backend.rs
new file mode 100644
index 000000000..1a69119d4
--- /dev/null
+++ b/core/src/services/upyun/backend.rs
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use super::core::parse_info;
+use super::core::UpyunCore;
+use super::error::parse_error;
+use super::lister::UpyunLister;
+use super::writer::UpyunWriter;
+use super::writer::UpyunWriters;
+use crate::raw::*;
+use crate::services::upyun::core::UpyunSigner;
+use crate::*;
+
+/// Config for backblaze upyun services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct UpyunConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub root: Option<String>,
+    /// bucket address of this backend.
+    pub bucket: String,
+    /// username of this backend.
+    pub operator: Option<String>,
+    /// password of this backend.
+    pub password: Option<String>,
+}
+
+impl Debug for UpyunConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Config");
+
+        ds.field("root", &self.root);
+        ds.field("bucket", &self.bucket);
+        ds.field("operator", &self.operator);
+
+        ds.finish()
+    }
+}
+
+/// [upyun](https://www.upyun.com/products/file-storage) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct UpyunBuilder {
+    config: UpyunConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for UpyunBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("UpyunBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl UpyunBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// bucket of this backend.
+    ///
+    /// It is required. e.g. `test`
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        self.config.bucket = bucket.to_string();
+
+        self
+    }
+
+    /// operator of this backend.
+    ///
+    /// It is required. e.g. `test`
+    pub fn operator(&mut self, operator: &str) -> &mut Self {
+        self.config.operator = if operator.is_empty() {
+            None
+        } else {
+            Some(operator.to_string())
+        };
+
+        self
+    }
+
+    /// password of this backend.
+    ///
+    /// It is required. e.g. `asecret`
+    pub fn password(&mut self, password: &str) -> &mut Self {
+        self.config.password = if password.is_empty() {
+            None
+        } else {
+            Some(password.to_string())
+        };
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for UpyunBuilder {
+    const SCHEME: Scheme = Scheme::Upyun;
+    type Accessor = UpyunBackend;
+
+    /// Converts a HashMap into an UpyunBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of UpyunBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = UpyunConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an UpyunBuilder instance with the deserialized config.
+        UpyunBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of UpyunBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle bucket.
+        if self.config.bucket.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Upyun));
+        }
+
+        debug!("backend use bucket {}", &self.config.bucket);
+
+        let operator = match &self.config.operator {
+            Some(operator) => Ok(operator.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "operator is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Upyun)),
+        }?;
+
+        let password = match &self.config.password {
+            Some(password) => Ok(password.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Upyun)),
+        }?;
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::Upyun)
+            })?
+        };
+
+        let signer = UpyunSigner {
+            operator: operator.clone(),
+            password: password.clone(),
+        };
+
+        Ok(UpyunBackend {
+            core: Arc::new(UpyunCore {
+                root,
+                operator,
+                password,
+                bucket: self.config.bucket.clone(),
+                signer,
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for upyun services.
+#[derive(Debug, Clone)]
+pub struct UpyunBackend {
+    core: Arc<UpyunCore>,
+}
+
+#[async_trait]
+impl Accessor for UpyunBackend {
+    type Reader = IncomingAsyncBody;
+
+    type BlockingReader = ();
+
+    type Writer = UpyunWriters;
+
+    type BlockingWriter = ();
+
+    type Lister = oio::PageLister<UpyunLister>;
+
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Upyun)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                create_dir: true,
+
+                read: true,
+                read_can_next: true,
+
+                write: true,
+                write_can_empty: true,
+                write_can_multi: true,
+                write_with_cache_control: true,
+                write_with_content_type: true,
+
+                // 
https://help.upyun.com/knowledge-base/rest_api/#e5b9b6e8a18ce5bc8fe696ade782b9e7bbade4bca0
+                write_multi_min_size: Some(1024 * 1024),
+                write_multi_max_size: Some(50 * 1024 * 1024),
+
+                delete: true,
+                rename: true,
+                copy: true,
+
+                list: true,
+                list_with_limit: true,
+                list_with_recursive: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let resp = self.core.create_dir(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpCreateDir::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
+        let resp = self.core.move_object(from, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+
+                Ok(RpRename::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        let resp = self.core.copy(from, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+
+                Ok(RpCopy::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self.core.download_file(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let size = parse_content_length(resp.headers())?;
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        let resp = self.core.info(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => parse_info(resp.headers()).map(RpStat::new),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let writer = UpyunWriter::new(self.core.clone(), args, 
path.to_string());
+
+        let w = oio::MultipartUploadWriter::new(writer);
+
+        Ok((RpWrite::default(), w))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = self.core.delete(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpDelete::default()),
+            // Allow 404 when deleting a non-existing object
+            StatusCode::NOT_FOUND => Ok(RpDelete::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let l = UpyunLister::new(self.core.clone(), path, args.limit());
+        Ok((RpList::default(), oio::PageLister::new(l)))
+    }
+}
diff --git a/core/src/services/upyun/core.rs b/core/src/services/upyun/core.rs
new file mode 100644
index 000000000..45d890a2c
--- /dev/null
+++ b/core/src/services/upyun/core.rs
@@ -0,0 +1,586 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+
+use base64::Engine;
+use hmac::{Hmac, Mac};
+use http::{header, HeaderMap, Request, Response};
+use md5::Digest;
+use serde::Deserialize;
+use sha1::Sha1;
+
+use crate::raw::*;
+use crate::*;
+
+use self::constants::*;
+
+mod constants {
+    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
+    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
+    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
+    pub const X_UPYUN_CONTENT_DISPOSITION: &str = 
"x-upyun-meta-content-disposition";
+    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
+    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
+    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
+    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
+    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
+    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
+    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
+    pub const X_UPYUN_COPY_SOURCE: &str = "X-Upyun-Copy-Source";
+    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
+    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
+    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
+    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
+    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
+}
+
+#[derive(Clone)]
+pub struct UpyunCore {
+    /// The root of this core.
+    pub root: String,
+    /// The endpoint of this backend.
+    pub operator: String,
+    /// The password id of this backend.
+    pub password: String,
+    /// The bucket of this backend.
+    pub bucket: String,
+
+    /// signer of this backend.
+    pub signer: UpyunSigner,
+
+    pub client: HttpClient,
+}
+
+impl Debug for UpyunCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("operator", &self.operator)
+            .finish_non_exhaustive()
+    }
+}
+
+impl UpyunCore {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+
+    pub async fn sign(&self, req: &mut Request<AsyncBody>) -> Result<()> {
+        // get rfc1123 date
+        let date = chrono::Utc::now()
+            .format("%a, %d %b %Y %H:%M:%S GMT")
+            .to_string();
+        let authorization =
+            self.signer
+                .authorization(&date, req.method().as_str(), req.uri().path());
+
+        req.headers_mut()
+            .insert("Authorization", authorization.parse().unwrap());
+        req.headers_mut().insert("Date", date.parse().unwrap());
+
+        Ok(())
+    }
+}
+
+impl UpyunCore {
+    pub async fn download_file(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::get(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn info(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::head(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload(
+        &self,
+        path: &str,
+        size: Option<u64>,
+        args: &OpWrite,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size.to_string())
+        }
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
+        }
+
+        // Set body
+        let mut req = req.body(body).map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        Ok(req)
+    }
+
+    pub async fn delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::delete(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn copy(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, 
from));
+        let to = build_abs_path(&self.root, to);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&to)
+        );
+
+        let mut req = Request::put(url);
+
+        req = req.header(header::CONTENT_LENGTH, "0");
+
+        req = req.header(X_UPYUN_COPY_SOURCE, from);
+
+        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
+
+        // Set body
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn move_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, 
from));
+        let to = build_abs_path(&self.root, to);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&to)
+        );
+
+        let mut req = Request::put(url);
+
+        req = req.header(header::CONTENT_LENGTH, "0");
+
+        req = req.header(X_UPYUN_MOVE_SOURCE, from);
+
+        req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy");
+
+        // Set body
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn create_dir(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+        let path = path[..path.len() - 1].to_string();
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let mut req = Request::post(url);
+
+        req = req.header("folder", "true");
+
+        req = req.header(X_UPYUN_FOLDER, "true");
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn initiate_multipart_upload(
+        &self,
+        path: &str,
+        args: &OpWrite,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let mut req = Request::put(url);
+
+        req = req.header(X_UPYUN_MULTI_STAGE, "initiate");
+
+        req = req.header(X_UPYUN_MULTI_DISORDER, "true");
+
+        if let Some(content_type) = args.content_type() {
+            req = req.header(X_UPYUN_MULTI_TYPE, content_type);
+        }
+
+        if let Some(content_disposition) = args.content_disposition() {
+            req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition)
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
+        }
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload_part(
+        &self,
+        path: &str,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&p),
+        );
+
+        let mut req = Request::put(&url);
+
+        req = req.header(header::CONTENT_LENGTH, size);
+
+        req = req.header(X_UPYUN_MULTI_STAGE, "upload");
+
+        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
+
+        req = req.header(X_UPYUN_PART_ID, part_number);
+
+        // Set body
+        let mut req = req.body(body).map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        Ok(req)
+    }
+
+    pub async fn complete_multipart_upload(
+        &self,
+        path: &str,
+        upload_id: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&p),
+        );
+
+        let mut req = Request::put(url);
+
+        req = req.header(X_UPYUN_MULTI_STAGE, "complete");
+
+        req = req.header(X_UPYUN_MULTI_UUID, upload_id);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn list_objects(
+        &self,
+        path: &str,
+        iter: &str,
+        limit: Option<usize>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path),
+        );
+
+        let mut req = Request::get(url.clone());
+
+        req = req.header(header::ACCEPT, "application/json");
+
+        if !iter.is_empty() {
+            req = req.header(X_UPYUN_LIST_ITER, iter);
+        }
+
+        if let Some(mut limit) = limit {
+            if limit > X_UPYUN_LIST_MAX_LIMIT {
+                limit = X_UPYUN_LIST_DEFAULT_LIMIT;
+            }
+            req = req.header(X_UPYUN_LIST_LIMIT, limit);
+        }
+
+        // Set body
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct UpyunSigner {
+    pub operator: String,
+    pub password: String,
+}
+
+type HmacSha1 = Hmac<Sha1>;
+
+impl UpyunSigner {
+    pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String 
{
+        let sign = vec![method, uri, date];
+
+        let sign = sign
+            .into_iter()
+            .filter(|s| !s.is_empty())
+            .collect::<Vec<&str>>()
+            .join("&");
+
+        let mut mac = 
HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes())
+            .expect("HMAC can take key of any size");
+        mac.update(sign.as_bytes());
+        let sign_str = mac.finalize().into_bytes();
+
+        let sign = 
base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice());
+        format!("UPYUN {}:{}", self.operator, sign)
+    }
+}
+
+pub(super) fn parse_initiate_part(headers: &HeaderMap) -> Result<&str> {
+    match headers.get(X_UPYUN_MULTI_UUID) {
+        None => Err(Error::new(ErrorKind::Unexpected, "missing uuid")),
+        Some(v) => Ok(v.to_str().map_err(|e| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "header value has to be valid utf-8 string",
+            )
+            .with_operation("parse_initiate_part")
+            .set_source(e)
+        })?),
+    }
+}
+
+pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> {
+    let mode = if parse_file_type(headers)? == "file" {
+        EntryMode::FILE
+    } else {
+        EntryMode::DIR
+    };
+
+    let mut m = Metadata::new(mode);
+
+    if let Some(v) = parse_file_size(headers)? {
+        m.set_content_length(v);
+    }
+
+    if let Some(v) = parse_content_type(headers)? {
+        m.set_content_type(v);
+    }
+
+    if let Some(v) = parse_content_md5(headers)? {
+        m.set_content_md5(v);
+    }
+
+    if let Some(v) = parse_cache_control(headers)? {
+        m.set_cache_control(v);
+    }
+
+    if let Some(v) = parse_content_disposition(headers)? {
+        m.set_content_disposition(v);
+    }
+
+    Ok(m)
+}
+
+fn parse_file_type(headers: &HeaderMap) -> Result<&str> {
+    match headers.get(X_UPYUN_FILE_TYPE) {
+        None => Err(Error::new(ErrorKind::Unexpected, "missing file type")),
+        Some(v) => Ok(v.to_str().map_err(|e| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "header value has to be valid utf-8 string",
+            )
+            .with_operation("parse_file_type")
+            .set_source(e)
+        })?),
+    }
+}
+
+fn parse_file_size(headers: &HeaderMap) -> Result<Option<u64>> {
+    match headers.get(X_UPYUN_FILE_SIZE) {
+        None => Ok(None),
+        Some(v) => Ok(Some(
+            v.to_str()
+                .map_err(|e| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        "header value is not valid utf-8 string",
+                    )
+                    .with_operation("http_util::parse_content_length")
+                    .set_source(e)
+                })?
+                .parse::<u64>()
+                .map_err(|e| {
+                    Error::new(ErrorKind::Unexpected, "header value is not 
valid integer")
+                        .with_operation("http_util::parse_content_length")
+                        .set_source(e)
+                })?,
+        )),
+    }
+}
+
+fn parse_cache_control(headers: &HeaderMap) -> Result<Option<&str>> {
+    match headers.get(X_UPYUN_CACHE_CONTROL) {
+        None => Ok(None),
+        Some(v) => Ok(Some(v.to_str().map_err(|e| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "header value has to be valid utf-8 string",
+            )
+            .with_operation("parse_cache_control")
+            .set_source(e)
+        })?)),
+    }
+}
+
+fn parse_content_disposition(headers: &HeaderMap) -> Result<Option<&str>> {
+    match headers.get(X_UPYUN_CONTENT_DISPOSITION) {
+        None => Ok(None),
+        Some(v) => Ok(Some(v.to_str().map_err(|e| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "header value has to be valid utf-8 string",
+            )
+            .with_operation("parse_content_disposition")
+            .set_source(e)
+        })?)),
+    }
+}
+
+pub fn format_md5(bs: &[u8]) -> String {
+    let mut hasher = md5::Md5::new();
+    hasher.update(bs);
+
+    format!("{:x}", hasher.finalize())
+}
+
+#[derive(Debug, Deserialize)]
+pub(super) struct File {
+    #[serde(rename = "type")]
+    pub type_field: String,
+    pub name: String,
+    pub length: u64,
+    pub last_modified: i64,
+}
+
+#[derive(Debug, Deserialize)]
+pub(super) struct ListObjectsResponse {
+    pub iter: String,
+    pub files: Vec<File>,
+}
diff --git a/core/src/services/upyun/docs.md b/core/src/services/upyun/docs.md
new file mode 100644
index 000000000..72dd8478e
--- /dev/null
+++ b/core/src/services/upyun/docs.md
@@ -0,0 +1,53 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [x] rename
+- [x] list
+- [x] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `bucket`: Upyun bucket name
+- `operator` Upyun operator
+- `password` Upyun password
+
+You can refer to [`UpyunBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Upyun;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create backend builder
+    let mut builder = Upyun::default();
+
+    // set the storage bucket for OpenDAL
+    builder.root("/");
+    // set the bucket for OpenDAL
+    builder.bucket("test");
+    // set the operator for OpenDAL
+    builder.operator("xxxxxxxxxx");
+    // set the password name for OpenDAL
+    builder.password("opendal");
+
+    let op: Operator = Operator::new(builder)?.finish();
+
+    Ok(())
+}
+```
diff --git a/core/src/services/upyun/error.rs b/core/src/services/upyun/error.rs
new file mode 100644
index 000000000..51f4934b1
--- /dev/null
+++ b/core/src/services/upyun/error.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// UpyunError is the error returned by upyun service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct UpyunError {
+    code: i64,
+    msg: String,
+    id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (kind, retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        304 | 412 => (ErrorKind::ConditionNotMatch, false),
+        // Service like Upyun could return 499 error with a message like:
+        // Client Disconnect, we should retry it.
+        499 => (ErrorKind::Unexpected, true),
+        500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, _upyun_err) = de::from_reader::<_, 
UpyunError>(bs.clone().reader())
+        .map(|upyun_err| (format!("{upyun_err:?}"), Some(upyun_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    let mut err = Error::new(kind, &message);
+
+    err = with_error_response_context(err, parts);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
+
+#[cfg(test)]
+mod test {
+    use futures::stream;
+    use http::StatusCode;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_parse_error() {
+        let err_res = vec![
+            (
+                r#"{"code": 40100016, "msg": "invalid date value in header", 
"id": "f5b30c720ddcecc70abd2f5c1c64bde8"}"#,
+                ErrorKind::Unexpected,
+                StatusCode::UNAUTHORIZED,
+            ),
+            (
+                r#"{"code": 40300010, "msg": "file type error", "id": 
"f5b30c720ddcecc70abd2f5c1c64bde7"}"#,
+                ErrorKind::PermissionDenied,
+                StatusCode::FORBIDDEN,
+            ),
+        ];
+
+        for res in err_res {
+            let bs = bytes::Bytes::from(res.0);
+            let body = IncomingAsyncBody::new(
+                Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+                None,
+            );
+            let resp = Response::builder().status(res.2).body(body).unwrap();
+
+            let err = parse_error(resp).await;
+
+            assert!(err.is_ok());
+            assert_eq!(err.unwrap().kind(), res.1);
+        }
+    }
+}
diff --git a/core/src/services/upyun/lister.rs 
b/core/src/services/upyun/lister.rs
new file mode 100644
index 000000000..e08933bbd
--- /dev/null
+++ b/core/src/services/upyun/lister.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::{ListObjectsResponse, UpyunCore};
+use super::error::parse_error;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
+
+pub struct UpyunLister {
+    core: Arc<UpyunCore>,
+
+    path: String,
+    limit: Option<usize>,
+}
+
+impl UpyunLister {
+    pub(super) fn new(core: Arc<UpyunCore>, path: &str, limit: Option<usize>) 
-> Self {
+        UpyunLister {
+            core,
+            path: path.to_string(),
+            limit,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::PageList for UpyunLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self
+            .core
+            .list_objects(&self.path, &ctx.token, self.limit)
+            .await?;
+
+        if resp.status() == http::StatusCode::NOT_FOUND {
+            ctx.done = true;
+            return Ok(());
+        }
+
+        match resp.status() {
+            http::StatusCode::OK => {}
+            http::StatusCode::NOT_FOUND => {
+                ctx.done = true;
+                return Ok(());
+            }
+            _ => {
+                return Err(parse_error(resp).await?);
+            }
+        }
+
+        let bs = resp.into_body().bytes().await?;
+
+        let response = serde_json::from_slice::<ListObjectsResponse>(&bs)
+            .map_err(new_json_deserialize_error)?;
+
+        // ref 
https://help.upyun.com/knowledge-base/rest_api/#e88eb7e58f96e79baee5bd95e69687e4bbb6e58897e8a1a8
+        // when iter is "g2gCZAAEbmV4dGQAA2VvZg", it means the list is done.
+        ctx.done = response.iter == "g2gCZAAEbmV4dGQAA2VvZg";
+
+        ctx.token = response.iter;
+
+        for file in response.files {
+            let path = build_abs_path(&format!("/{}", &self.path), &file.name);
+
+            let entry = if file.type_field == "folder" {
+                let path = format!("{}/", path);
+                Entry::new(&path, Metadata::new(EntryMode::DIR))
+            } else {
+                let m = Metadata::new(EntryMode::FILE)
+                    .with_content_length(file.length)
+                    .with_content_type(file.type_field)
+                    
.with_last_modified(parse_datetime_from_from_timestamp(file.last_modified)?);
+                Entry::new(&path, m)
+            };
+
+            ctx.entries.push_back(entry);
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/upyun/mod.rs b/core/src/services/upyun/mod.rs
new file mode 100644
index 000000000..039f2aa22
--- /dev/null
+++ b/core/src/services/upyun/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::UpyunBuilder as Upyun;
+pub use backend::UpyunConfig;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/upyun/writer.rs 
b/core/src/services/upyun/writer.rs
new file mode 100644
index 000000000..c0b42e44f
--- /dev/null
+++ b/core/src/services/upyun/writer.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{parse_initiate_part, UpyunCore};
+use super::error::parse_error;
+
+pub type UpyunWriters = oio::MultipartUploadWriter<UpyunWriter>;
+
+pub struct UpyunWriter {
+    core: Arc<UpyunCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl UpyunWriter {
+    pub fn new(core: Arc<UpyunCore>, op: OpWrite, path: String) -> Self {
+        UpyunWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for UpyunWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let req = self
+            .core
+            .upload(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self
+            .core
+            .initiate_multipart_upload(&self.path, &self.op)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => {
+                let id = parse_initiate_part(resp.headers())?;
+
+                Ok(id.to_string())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
+        let req = self
+            .core
+            .upload_part(&self.path, upload_id, part_number, size, body)
+            .await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => {
+                let etag = parse_etag(resp.headers())?
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "ETag not present in returning response",
+                        )
+                    })?
+                    .to_string();
+
+                resp.into_body().consume().await?;
+
+                Ok(oio::MultipartUploadPart { part_number, etag })
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn complete_part(
+        &self,
+        upload_id: &str,
+        _parts: &[oio::MultipartUploadPart],
+    ) -> Result<()> {
+        let resp = self
+            .core
+            .complete_multipart_upload(&self.path, upload_id)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => {
+                resp.into_body().consume().await?;
+
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "Upyun does not support abort multipart upload",
+        ))
+    }
+}
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index c5f1f27b2..2e4df2968 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -157,6 +157,8 @@ impl Operator {
             Scheme::Atomicserver => 
Self::from_map::<services::Atomicserver>(map)?.finish(),
             #[cfg(feature = "services-alluxio")]
             Scheme::Alluxio => 
Self::from_map::<services::Alluxio>(map)?.finish(),
+            #[cfg(feature = "services-upyun")]
+            Scheme::Upyun => Self::from_map::<services::Upyun>(map)?.finish(),
             #[cfg(feature = "services-azblob")]
             Scheme::Azblob => 
Self::from_map::<services::Azblob>(map)?.finish(),
             #[cfg(feature = "services-azdls")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 295de6a96..6c5151321 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -42,6 +42,8 @@ pub enum Scheme {
     B2,
     /// [Seafile][crate::services::Seafile]: Seafile Services.
     Seafile,
+    /// [Upyun][crate::services::Upyun]: Upyun Services.
+    Upyun,
     /// [cacache][crate::services::Cacache]: cacache backend support.
     Cacache,
     /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services.
@@ -243,6 +245,8 @@ impl Scheme {
             Scheme::S3,
             #[cfg(feature = "services-seafile")]
             Scheme::Seafile,
+            #[cfg(feature = "services-upyun")]
+            Scheme::Upyun,
             #[cfg(feature = "services-sftp")]
             Scheme::Sftp,
             #[cfg(feature = "services-sled")]
@@ -331,6 +335,7 @@ impl FromStr for Scheme {
             "rocksdb" => Ok(Scheme::Rocksdb),
             "s3" => Ok(Scheme::S3),
             "seafile" => Ok(Scheme::Seafile),
+            "upyun" => Ok(Scheme::Upyun),
             "sftp" => Ok(Scheme::Sftp),
             "sled" => Ok(Scheme::Sled),
             "supabase" => Ok(Scheme::Supabase),
@@ -402,6 +407,7 @@ impl From<Scheme> for &'static str {
             Scheme::Sqlite => "sqlite",
             Scheme::Mongodb => "mongodb",
             Scheme::Alluxio => "alluxio",
+            Scheme::Upyun => "upyun",
             Scheme::Custom(v) => v,
         }
     }

Reply via email to