Xuanwo commented on code in PR #3790: URL: https://github.com/apache/incubator-opendal/pull/3790#discussion_r1433520081
########## core/src/services/mod.rs: ########## @@ -312,3 +312,10 @@ mod seafile; pub use seafile::Seafile; #[cfg(feature = "services-seafile")] pub use seafile::SeafileConfig; + +#[cfg(feature = "services-upyun")] +mod upyun; +#[cfg(feature = "services-upyun")] +pub use upyun::UpYun; Review Comment: Please call it `Upyun`, the naming should be exactly the same mapping. - `abc` -> `abc` -> `Abc` - `abc-def` -> `abc_def` -> `AbcDef` ########## core/src/services/upyun/backend.rs: ########## @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use super::core::parse_info; +use super::core::UpYunCore; +use super::error::parse_error; +use super::lister::UpYunLister; +use super::writer::UpYunWriter; +use super::writer::UpYunWriters; +use crate::raw::*; +use crate::services::upyun::core::UpYunSigner; +use crate::*; + +/// Config for backblaze upyun services support. +#[derive(Default, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct UpYunConfig { + /// root of this backend. + /// + /// All operations will happen under this root. + pub root: Option<String>, + /// bucket address of this backend. + pub bucket: String, + /// username of this backend. + pub operator: Option<String>, + /// password of this backend. + pub password: Option<String>, +} + +impl Debug for UpYunConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Config"); + + ds.field("root", &self.root); + ds.field("bucket", &self.bucket); + ds.field("operator", &self.operator); + + ds.finish() + } +} + +/// [upyun](https://www.upyun.com/products/file-storage) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct UpYunBuilder { + config: UpYunConfig, + + http_client: Option<HttpClient>, +} + +impl Debug for UpYunBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut d = f.debug_struct("UpYunBuilder"); + + d.field("config", &self.config); + d.finish_non_exhaustive() + } +} + +impl UpYunBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.config.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// bucket of this backend. + /// + /// It is required. e.g. `test` + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.config.bucket = bucket.to_string(); + + self + } + + /// operator of this backend. + /// + /// It is required. e.g. `test` + pub fn operator(&mut self, operator: &str) -> &mut Self { + self.config.operator = if operator.is_empty() { + None + } else { + Some(operator.to_string()) + }; + + self + } + + /// password of this backend. + /// + /// It is required. e.g. `asecret` + pub fn password(&mut self, password: &str) -> &mut Self { + self.config.password = if password.is_empty() { + None + } else { + Some(password.to_string()) + }; + + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for UpYunBuilder { + const SCHEME: Scheme = Scheme::UpYun; + type Accessor = UpYunBackend; + + /// Converts a HashMap into an UpYunBuilder instance. + /// + /// # Arguments + /// + /// * `map` - A HashMap containing the configuration values. + /// + /// # Returns + /// + /// Returns an instance of UpYunBuilder. + fn from_map(map: HashMap<String, String>) -> Self { + // Deserialize the configuration from the HashMap. + let config = UpYunConfig::deserialize(ConfigDeserializer::new(map)) + .expect("config deserialize must succeed"); + + // Create an UpYunBuilder instance with the deserialized config. + UpYunBuilder { + config, + http_client: None, + } + } + + /// Builds the backend and returns the result of UpYunBackend. + fn build(&mut self) -> Result<Self::Accessor> { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.config.root.clone().unwrap_or_default()); + debug!("backend use root {}", &root); + + // Handle bucket. + if self.config.bucket.is_empty() { + return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::UpYun)); + } + + debug!("backend use bucket {}", &self.config.bucket); + + let operator = match &self.config.operator { + Some(operator) => Ok(operator.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "operator is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::UpYun)), + }?; + + let password = match &self.config.password { + Some(password) => Ok(password.clone()), + None => Err(Error::new(ErrorKind::ConfigInvalid, "password is empty") + .with_operation("Builder::build") + .with_context("service", Scheme::UpYun)), + }?; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::UpYun) + })? + }; + + let signer = UpYunSigner { + operator: operator.clone(), + password: password.clone(), + }; + + Ok(UpYunBackend { + core: Arc::new(UpYunCore { + root, + operator, + password, + bucket: self.config.bucket.clone(), + signer, + client, + }), + }) + } +} + +/// Backend for upyun services. +#[derive(Debug, Clone)] +pub struct UpYunBackend { + core: Arc<UpYunCore>, +} + +#[async_trait] +impl Accessor for UpYunBackend { + type Reader = IncomingAsyncBody; + + type BlockingReader = (); + + type Writer = UpYunWriters; + + type BlockingWriter = (); + + type Lister = oio::PageLister<UpYunLister>; + + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::UpYun) + .set_root(&self.core.root) + .set_native_capability(Capability { + stat: true, + + create_dir: true, + + read: true, + read_can_next: true, + + write: true, + write_can_empty: true, + write_can_multi: true, + write_with_cache_control: true, + write_with_content_type: true, + + write_multi_min_size: Some(1024 * 1024), + write_multi_max_size: Some(50 * 1024 * 1024), Review Comment: Seems the max size should also be `1MiB`? > 文件分块:直接切分二进制文件成小块。分块大小固定为 1M。最后一个分块除外。 ########## core/src/services/upyun/lister.rs: ########## @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use super::core::{ListObjectsResponse, UpYunCore}; +use super::error::parse_error; +use crate::raw::oio::Entry; +use crate::raw::*; +use crate::EntryMode; +use crate::Metadata; +use crate::Result; + +pub struct UpYunLister { + core: Arc<UpYunCore>, + + path: String, + limit: Option<usize>, +} + +impl UpYunLister { + pub(super) fn new(core: Arc<UpYunCore>, path: &str, limit: Option<usize>) -> Self { + UpYunLister { + core, + path: path.to_string(), + limit, + } + } +} + +#[async_trait] +impl oio::PageList for UpYunLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self + .core + .list_objects(&self.path, &ctx.token, self.limit) + .await?; + + if resp.status() == http::StatusCode::NOT_FOUND { + ctx.done = true; + return Ok(()); + } + + match resp.status() { + http::StatusCode::OK => {} + http::StatusCode::NOT_FOUND => { + ctx.done = true; + return Ok(()); + } + _ => { + return Err(parse_error(resp).await?); + } + } + + let bs = resp.into_body().bytes().await?; + + let response = serde_json::from_slice::<ListObjectsResponse>(&bs) + .map_err(new_json_deserialize_error)?; + + // ref https://github.com/upyun/go-sdk/blob/master/upyun/rest.go#L772 Review Comment: It's best to provide a link to the API documentation along with some explanation. ########## core/src/services/upyun/core.rs: ########## @@ -0,0 +1,557 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use base64::Engine; +use hmac::{Hmac, Mac}; +use http::{header, HeaderMap, Request, Response}; +use md5::Digest; +use serde::Deserialize; +use sha1::Sha1; + +use crate::raw::*; +use crate::*; + +use self::constants::*; + +mod constants { + pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type"; + pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size"; + pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control"; + pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition"; + pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage"; + pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type"; + pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder"; + pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid"; + pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id"; + pub const X_UPYUN_FOLDER: &str = "x-upyun-folder"; + pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source"; + pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive"; + pub const X_UPYUN_LIST_ITER: &str = "x-list-iter"; + pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit"; + pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096; + pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256; +} + +#[derive(Clone)] +pub struct UpYunCore { + /// The root of this core. + pub root: String, + /// The endpoint of this backend. + pub operator: String, + /// The password id of this backend. + pub password: String, + /// The bucket of this backend. + pub bucket: String, + + /// signer of this backend. + pub signer: UpYunSigner, + + pub client: HttpClient, +} + +impl Debug for UpYunCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("operator", &self.operator) + .finish_non_exhaustive() + } +} + +impl UpYunCore { + #[inline] + pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> { + self.client.send(req).await + } + + pub async fn sign(&self, req: &mut Request<AsyncBody>) -> Result<()> { + // get rfc1123 date + let date = chrono::Utc::now() + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string(); + let authorization = + self.signer + .authorization(&date, req.method().as_str(), req.uri().path()); + + req.headers_mut() + .insert("Authorization", authorization.parse().unwrap()); + req.headers_mut().insert("Date", date.parse().unwrap()); + + Ok(()) + } +} + +impl UpYunCore { + pub async fn download_file(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn info(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::head(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload( + &self, + path: &str, + size: Option<u64>, + args: &OpWrite, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(header::CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::delete(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result<Response<IncomingAsyncBody>> { + let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from)); + let to = build_abs_path(&self.root, to); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&to) + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MOVE_SOURCE, from); + + req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy"); + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn create_dir(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + let path = path[..path.len() - 1].to_string(); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::post(url); + + req = req.header("folder", "true"); + + req = req.header(X_UPYUN_FOLDER, "true"); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn initiate_multipart_upload( + &self, + path: &str, + args: &OpWrite, + ) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "initiate"); + + req = req.header(X_UPYUN_MULTI_DISORDER, "true"); + + if let Some(content_type) = args.content_type() { + req = req.header(X_UPYUN_MULTI_TYPE, content_type); + } + + if let Some(content_disposition) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload_part( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(&url); + + req = req.header(header::CONTENT_LENGTH, size); + + req = req.header(X_UPYUN_MULTI_STAGE, "upload"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + req = req.header(X_UPYUN_PART_ID, part_number); + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "complete"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn list_objects( + &self, + path: &str, + iter: &str, + limit: Option<usize>, + ) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path), + ); + + let mut req = Request::get(url.clone()); + + req = req.header(header::ACCEPT, "application/json"); + + if !iter.is_empty() { + req = req.header(X_UPYUN_LIST_ITER, iter); + } + + if let Some(mut limit) = limit { + if limit > X_UPYUN_LIST_MAX_LIMIT { + limit = X_UPYUN_LIST_DEFAULT_LIMIT; + } + req = req.header(X_UPYUN_LIST_LIMIT, limit); + } else { + req = req.header(X_UPYUN_LIST_LIMIT, X_UPYUN_LIST_DEFAULT_LIMIT); Review Comment: Can we ignore this header if not specified? Why we need to set it to 256 here? ########## core/src/services/upyun/core.rs: ########## @@ -0,0 +1,557 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; + +use base64::Engine; +use hmac::{Hmac, Mac}; +use http::{header, HeaderMap, Request, Response}; +use md5::Digest; +use serde::Deserialize; +use sha1::Sha1; + +use crate::raw::*; +use crate::*; + +use self::constants::*; + +mod constants { + pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type"; + pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size"; + pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control"; + pub const X_UPYUN_CONTENT_DISPOSITION: &str = "x-upyun-meta-content-disposition"; + pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage"; + pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type"; + pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder"; + pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid"; + pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id"; + pub const X_UPYUN_FOLDER: &str = "x-upyun-folder"; + pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source"; + pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive"; + pub const X_UPYUN_LIST_ITER: &str = "x-list-iter"; + pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit"; + pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096; + pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256; +} + +#[derive(Clone)] +pub struct UpYunCore { + /// The root of this core. + pub root: String, + /// The endpoint of this backend. + pub operator: String, + /// The password id of this backend. + pub password: String, + /// The bucket of this backend. + pub bucket: String, + + /// signer of this backend. + pub signer: UpYunSigner, + + pub client: HttpClient, +} + +impl Debug for UpYunCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("operator", &self.operator) + .finish_non_exhaustive() + } +} + +impl UpYunCore { + #[inline] + pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> { + self.client.send(req).await + } + + pub async fn sign(&self, req: &mut Request<AsyncBody>) -> Result<()> { + // get rfc1123 date + let date = chrono::Utc::now() + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string(); + let authorization = + self.signer + .authorization(&date, req.method().as_str(), req.uri().path()); + + req.headers_mut() + .insert("Authorization", authorization.parse().unwrap()); + req.headers_mut().insert("Date", date.parse().unwrap()); + + Ok(()) + } +} + +impl UpYunCore { + pub async fn download_file(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::get(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn info(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::head(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload( + &self, + path: &str, + size: Option<u64>, + args: &OpWrite, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(header::CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(header::CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let req = Request::delete(url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn copy(&self, from: &str, to: &str) -> Result<Response<IncomingAsyncBody>> { + let from = format!("/{}/{}", self.bucket, build_abs_path(&self.root, from)); + let to = build_abs_path(&self.root, to); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&to) + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MOVE_SOURCE, from); + + req = req.header(X_UPYUN_METADATA_DIRECTIVE, "copy"); + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn create_dir(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + let path = path[..path.len() - 1].to_string(); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::post(url); + + req = req.header("folder", "true"); + + req = req.header(X_UPYUN_FOLDER, "true"); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn initiate_multipart_upload( + &self, + path: &str, + args: &OpWrite, + ) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path) + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "initiate"); + + req = req.header(X_UPYUN_MULTI_DISORDER, "true"); + + if let Some(content_type) = args.content_type() { + req = req.header(X_UPYUN_MULTI_TYPE, content_type); + } + + if let Some(content_disposition) = args.content_disposition() { + req = req.header(X_UPYUN_CONTENT_DISPOSITION, content_disposition) + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(X_UPYUN_CACHE_CONTROL, cache_control) + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn upload_part( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(&url); + + req = req.header(header::CONTENT_LENGTH, size); + + req = req.header(X_UPYUN_MULTI_STAGE, "upload"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + req = req.header(X_UPYUN_PART_ID, part_number); + + // Set body + let mut req = req.body(body).map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + Ok(req) + } + + pub async fn complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&p), + ); + + let mut req = Request::put(url); + + req = req.header(X_UPYUN_MULTI_STAGE, "complete"); + + req = req.header(X_UPYUN_MULTI_UUID, upload_id); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn list_objects( + &self, + path: &str, + iter: &str, + limit: Option<usize>, + ) -> Result<Response<IncomingAsyncBody>> { + let path = build_abs_path(&self.root, path); + + let url = format!( + "https://v0.api.upyun.com/{}/{}", + self.bucket, + percent_encode_path(&path), + ); + + let mut req = Request::get(url.clone()); + + req = req.header(header::ACCEPT, "application/json"); + + if !iter.is_empty() { + req = req.header(X_UPYUN_LIST_ITER, iter); + } + + if let Some(mut limit) = limit { + if limit > X_UPYUN_LIST_MAX_LIMIT { + limit = X_UPYUN_LIST_DEFAULT_LIMIT; + } + req = req.header(X_UPYUN_LIST_LIMIT, limit); + } else { + req = req.header(X_UPYUN_LIST_LIMIT, X_UPYUN_LIST_DEFAULT_LIMIT); + } + + // Set body + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } +} + +#[derive(Clone, Default)] +pub struct UpYunSigner { + pub operator: String, + pub password: String, +} + +type HmacSha1 = Hmac<Sha1>; + +impl UpYunSigner { + pub fn authorization(&self, date: &str, method: &str, uri: &str) -> String { + let sign = vec![method, uri, date]; + + let sign = sign + .into_iter() + .filter(|s| !s.is_empty()) + .collect::<Vec<&str>>() + .join("&"); + + let mut mac = HmacSha1::new_from_slice(format_md5(self.password.as_bytes()).as_bytes()) + .expect("HMAC can take key of any size"); + mac.update(sign.as_bytes()); + let sign_str = mac.finalize().into_bytes(); + + let sign = base64::engine::general_purpose::STANDARD.encode(sign_str.as_slice()); + format!("UPYUN {}:{}", self.operator, sign) + } +} + +pub(super) fn parse_initiate_part(headers: &HeaderMap) -> Result<&str> { + match headers.get(X_UPYUN_MULTI_UUID) { + None => Err(Error::new(ErrorKind::Unexpected, "missing uuid")), + Some(v) => Ok(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_initiate_part") + .set_source(e) + })?), + } +} + +pub(super) fn parse_info(headers: &HeaderMap) -> Result<Metadata> { + let mode = if parse_file_type(headers)? == "file" { + EntryMode::FILE + } else { + EntryMode::DIR + }; + + let mut m = Metadata::new(mode); + + if let Some(v) = parse_file_size(headers)? { + m.set_content_length(v); + } + + if let Some(v) = parse_content_type(headers)? { + m.set_content_type(v); + } + + if let Some(v) = parse_content_md5(headers)? { + m.set_content_md5(v); + } + + if let Some(v) = parse_cache_control(headers)? { + m.set_cache_control(v); + } + + if let Some(v) = parse_content_disposition(headers)? { + m.set_content_disposition(v); + } + + Ok(m) +} + +fn parse_file_type(headers: &HeaderMap) -> Result<&str> { + match headers.get(X_UPYUN_FILE_TYPE) { + None => Err(Error::new(ErrorKind::Unexpected, "missing file type")), + Some(v) => Ok(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_file_type") + .set_source(e) + })?), + } +} + +fn parse_file_size(headers: &HeaderMap) -> Result<Option<u64>> { + match headers.get(X_UPYUN_FILE_SIZE) { + None => Ok(None), + Some(v) => Ok(Some( + v.to_str() + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value is not valid utf-8 string", + ) + .with_operation("http_util::parse_content_length") + .set_source(e) + })? + .parse::<u64>() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "header value is not valid integer") + .with_operation("http_util::parse_content_length") + .set_source(e) + })?, + )), + } +} + +fn parse_cache_control(headers: &HeaderMap) -> Result<Option<&str>> { + match headers.get(X_UPYUN_CACHE_CONTROL) { + None => Ok(None), + Some(v) => Ok(Some(v.to_str().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "header value has to be valid utf-8 string", + ) + .with_operation("parse_cache_control") + .set_source(e) + })?)), + } +} + +fn parse_content_disposition(headers: &HeaderMap) -> Result<Option<&str>> { Review Comment: How about adding a `opendal::raw::parse_header_to_str` so we don't need to repeat eror handling logic here? We can submit a new PR for this refactor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
