suyanhanx commented on code in PR #3790:
URL: 
https://github.com/apache/incubator-opendal/pull/3790#discussion_r1433373052


##########
core/src/services/upyun/writer.rs:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{parse_initiate_part, UpYunCore};
+use super::error::parse_error;
+
+pub type UpYunWriters = oio::MultipartUploadWriter<UpYunWriter>;
+
+pub struct UpYunWriter {
+    core: Arc<UpYunCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl UpYunWriter {
+    pub fn new(core: Arc<UpYunCore>, op: OpWrite, path: String) -> Self {
+        UpYunWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for UpYunWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self
+            .core
+            .upload(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        self.core.sign(&mut req).await?;

Review Comment:
   Could this line be merged into upload fn?



##########
core/src/services/upyun/backend.rs:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use super::core::parse_info;
+use super::core::UpYunCore;
+use super::error::parse_error;
+use super::lister::UpYunLister;
+use super::writer::UpYunWriter;
+use super::writer::UpYunWriters;
+use crate::raw::*;
+use crate::services::upyun::core::UpYunSigner;
+use crate::*;
+
+/// Config for backblaze upyun services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct UpYunConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub root: Option<String>,
+    /// bucket address of this backend.
+    pub bucket: String,
+    /// username of this backend.
+    pub operator: Option<String>,
+    /// password of this backend.
+    pub password: Option<String>,
+}
+
+impl Debug for UpYunConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Config");
+
+        ds.field("root", &self.root);
+        ds.field("bucket", &self.bucket);
+        ds.field("operator", &self.operator);
+
+        ds.finish()
+    }
+}
+
+/// [upyun](https://www.upyun.com/products/file-storage) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct UpYunBuilder {
+    config: UpYunConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for UpYunBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("UpYunBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl UpYunBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// bucket of this backend.
+    ///
+    /// It is required. e.g. `test`
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        self.config.bucket = bucket.to_string();
+
+        self
+    }
+
+    /// operator of this backend.
+    ///
+    /// It is required. e.g. `test`
+    pub fn operator(&mut self, operator: &str) -> &mut Self {
+        self.config.operator = if operator.is_empty() {
+            None
+        } else {
+            Some(operator.to_string())
+        };
+
+        self
+    }
+
+    /// password of this backend.
+    ///
+    /// It is required. e.g. `asecret`
+    pub fn password(&mut self, password: &str) -> &mut Self {
+        self.config.password = if password.is_empty() {
+            None
+        } else {
+            Some(password.to_string())
+        };
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for UpYunBuilder {
+    const SCHEME: Scheme = Scheme::UpYun;
+    type Accessor = UpYunBackend;
+
+    /// Converts a HashMap into an UpYunBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of UpYunBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = UpYunConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an UpYunBuilder instance with the deserialized config.
+        UpYunBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of UpYunBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle bucket.
+        if self.config.bucket.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::UpYun));
+        }
+
+        debug!("backend use bucket {}", &self.config.bucket);
+
+        let operator = match &self.config.operator {
+            Some(operator) => Ok(operator.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "operator is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::UpYun)),
+        }?;
+
+        let password = match &self.config.password {
+            Some(password) => Ok(password.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::UpYun)),
+        }?;
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::UpYun)
+            })?
+        };
+
+        let signer = UpYunSigner {
+            operator: operator.clone(),
+            password: password.clone(),
+        };
+
+        Ok(UpYunBackend {
+            core: Arc::new(UpYunCore {
+                root,
+                operator,
+                password,
+                bucket: self.config.bucket.clone(),
+                signer,
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for upyun services.
+#[derive(Debug, Clone)]
+pub struct UpYunBackend {
+    core: Arc<UpYunCore>,
+}
+
+#[async_trait]
+impl Accessor for UpYunBackend {
+    type Reader = IncomingAsyncBody;
+
+    type BlockingReader = ();
+
+    type Writer = UpYunWriters;
+
+    type BlockingWriter = ();
+
+    type Lister = oio::PageLister<UpYunLister>;
+
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::UpYun)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                create_dir: true,
+
+                read: true,
+                read_can_next: true,
+
+                write: true,
+                write_can_empty: true,
+                write_can_multi: true,
+                write_with_cache_control: true,
+                write_with_content_type: true,
+
+                write_multi_min_size: Some(1024 * 1024),
+                write_multi_max_size: Some(50 * 1024 * 1024),
+
+                delete: true,
+                rename: true,
+                copy: true,
+
+                list: true,
+                list_with_limit: true,
+                list_with_recursive: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let resp = self.core.create_dir(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpCreateDir::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
+        let resp = self.core.copy(from, to).await?;

Review Comment:
   Wrong fn call?



##########
core/src/services/upyun/error.rs:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// UpYunError is the error returned by upyun service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct UpYunError {
+    code: i64,
+    msg: String,
+    id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (kind, retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        304 | 412 => (ErrorKind::ConditionNotMatch, false),
+        // Service like R2 could return 499 error with a message like:

Review Comment:
   Wrong comment



##########
core/src/services/upyun/core.rs:
##########
@@ -0,0 +1,553 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+
+use base64::Engine;
+use hmac::{Hmac, Mac};
+use http::{header, HeaderMap, Request, Response};
+use md5::Digest;
+use serde::Deserialize;
+use sha1::Sha1;
+
+use crate::raw::*;
+use crate::*;
+
+use self::constants::*;
+
+mod constants {
+    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
+    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
+    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
+    pub const X_UPYUN_CONTENT_DISPOSITION: &str = 
"x-upyun-meta-content-disposition";
+    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
+    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
+    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
+    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
+    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
+    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
+    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
+    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
+    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
+    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
+    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
+    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
+}
+
+#[derive(Clone)]
+pub struct UpYunCore {
+    /// The root of this core.
+    pub root: String,
+    /// The endpoint of this backend.
+    pub operator: String,
+    /// The password id of this backend.
+    pub password: String,
+    /// The bucket of this backend.
+    pub bucket: String,
+
+    /// signer of this backend.
+    pub signer: UpYunSigner,
+
+    pub client: HttpClient,
+}
+
+impl Debug for UpYunCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("operator", &self.operator)
+            .finish_non_exhaustive()
+    }
+}
+
+impl UpYunCore {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+
+    pub async fn sign(&self, req: &mut Request<AsyncBody>) -> Result<()> {
+        // get rfc1123 date
+        let date = chrono::Utc::now()
+            .format("%a, %d %b %Y %H:%M:%S GMT")
+            .to_string();
+        let authorization =
+            self.signer
+                .authorization(&date, req.method().as_str(), req.uri().path());
+
+        req.headers_mut()
+            .insert("Authorization", authorization.parse().unwrap());
+        req.headers_mut().insert("Date", date.parse().unwrap());
+
+        Ok(())
+    }
+}
+
+impl UpYunCore {
+    pub async fn download_file(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::get(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn info(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::head(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload(
+        &self,
+        path: &str,
+        size: Option<u64>,
+        args: &OpWrite,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size.to_string())
+        }
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)

Review Comment:
   Missing signing?



##########
core/src/services/upyun/writer.rs:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{parse_initiate_part, UpYunCore};
+use super::error::parse_error;
+
+pub type UpYunWriters = oio::MultipartUploadWriter<UpYunWriter>;
+
+pub struct UpYunWriter {
+    core: Arc<UpYunCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl UpYunWriter {
+    pub fn new(core: Arc<UpYunCore>, op: OpWrite, path: String) -> Self {
+        UpYunWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for UpYunWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self
+            .core
+            .upload(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self
+            .core
+            .initiate_multipart_upload(&self.path, &self.op)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => {
+                let id = parse_initiate_part(resp.headers())?;
+
+                Ok(id.to_string())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
+        let mut req = self
+            .core
+            .upload_part(&self.path, upload_id, part_number, size, body)?;
+
+        self.core.sign(&mut req).await?;

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to