hoslo commented on code in PR #3790:
URL: 
https://github.com/apache/incubator-opendal/pull/3790#discussion_r1433430241


##########
core/src/services/upyun/writer.rs:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{parse_initiate_part, UpYunCore};
+use super::error::parse_error;
+
+pub type UpYunWriters = oio::MultipartUploadWriter<UpYunWriter>;
+
+pub struct UpYunWriter {
+    core: Arc<UpYunCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl UpYunWriter {
+    pub fn new(core: Arc<UpYunCore>, op: OpWrite, path: String) -> Self {
+        UpYunWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for UpYunWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self
+            .core
+            .upload(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self
+            .core
+            .initiate_multipart_upload(&self.path, &self.op)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => {
+                let id = parse_initiate_part(resp.headers())?;
+
+                Ok(id.to_string())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
+        let mut req = self
+            .core
+            .upload_part(&self.path, upload_id, part_number, size, body)?;
+
+        self.core.sign(&mut req).await?;

Review Comment:
   fix



##########
core/src/services/upyun/writer.rs:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{parse_initiate_part, UpYunCore};
+use super::error::parse_error;
+
+pub type UpYunWriters = oio::MultipartUploadWriter<UpYunWriter>;
+
+pub struct UpYunWriter {
+    core: Arc<UpYunCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl UpYunWriter {
+    pub fn new(core: Arc<UpYunCore>, op: OpWrite, path: String) -> Self {
+        UpYunWriter { core, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for UpYunWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self
+            .core
+            .upload(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        self.core.sign(&mut req).await?;

Review Comment:
   fix



##########
core/src/services/upyun/error.rs:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// UpYunError is the error returned by upyun service.
+#[derive(Default, Debug, Deserialize)]
+#[serde(default, rename_all = "PascalCase")]
+struct UpYunError {
+    code: i64,
+    msg: String,
+    id: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (kind, retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        304 | 412 => (ErrorKind::ConditionNotMatch, false),
+        // Service like R2 could return 499 error with a message like:

Review Comment:
   fix



##########
core/src/services/upyun/core.rs:
##########
@@ -0,0 +1,553 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+
+use base64::Engine;
+use hmac::{Hmac, Mac};
+use http::{header, HeaderMap, Request, Response};
+use md5::Digest;
+use serde::Deserialize;
+use sha1::Sha1;
+
+use crate::raw::*;
+use crate::*;
+
+use self::constants::*;
+
+mod constants {
+    pub const X_UPYUN_FILE_TYPE: &str = "x-upyun-file-type";
+    pub const X_UPYUN_FILE_SIZE: &str = "x-upyun-file-size";
+    pub const X_UPYUN_CACHE_CONTROL: &str = "x-upyun-meta-cache-control";
+    pub const X_UPYUN_CONTENT_DISPOSITION: &str = 
"x-upyun-meta-content-disposition";
+    pub const X_UPYUN_MULTI_STAGE: &str = "X-Upyun-Multi-Stage";
+    pub const X_UPYUN_MULTI_TYPE: &str = "X-Upyun-Multi-Type";
+    pub const X_UPYUN_MULTI_DISORDER: &str = "X-Upyun-Multi-Disorder";
+    pub const X_UPYUN_MULTI_UUID: &str = "X-Upyun-Multi-Uuid";
+    pub const X_UPYUN_PART_ID: &str = "X-Upyun-Part-Id";
+    pub const X_UPYUN_FOLDER: &str = "x-upyun-folder";
+    pub const X_UPYUN_MOVE_SOURCE: &str = "X-Upyun-Move-Source";
+    pub const X_UPYUN_METADATA_DIRECTIVE: &str = "X-Upyun-Metadata-Directive";
+    pub const X_UPYUN_LIST_ITER: &str = "x-list-iter";
+    pub const X_UPYUN_LIST_LIMIT: &str = "X-List-Limit";
+    pub const X_UPYUN_LIST_MAX_LIMIT: usize = 4096;
+    pub const X_UPYUN_LIST_DEFAULT_LIMIT: usize = 256;
+}
+
+#[derive(Clone)]
+pub struct UpYunCore {
+    /// The root of this core.
+    pub root: String,
+    /// The endpoint of this backend.
+    pub operator: String,
+    /// The password id of this backend.
+    pub password: String,
+    /// The bucket of this backend.
+    pub bucket: String,
+
+    /// signer of this backend.
+    pub signer: UpYunSigner,
+
+    pub client: HttpClient,
+}
+
+impl Debug for UpYunCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("operator", &self.operator)
+            .finish_non_exhaustive()
+    }
+}
+
+impl UpYunCore {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+
+    pub async fn sign(&self, req: &mut Request<AsyncBody>) -> Result<()> {
+        // get rfc1123 date
+        let date = chrono::Utc::now()
+            .format("%a, %d %b %Y %H:%M:%S GMT")
+            .to_string();
+        let authorization =
+            self.signer
+                .authorization(&date, req.method().as_str(), req.uri().path());
+
+        req.headers_mut()
+            .insert("Authorization", authorization.parse().unwrap());
+        req.headers_mut().insert("Date", date.parse().unwrap());
+
+        Ok(())
+    }
+}
+
+impl UpYunCore {
+    pub async fn download_file(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::get(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn info(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let req = Request::head(url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload(
+        &self,
+        path: &str,
+        size: Option<u64>,
+        args: &OpWrite,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "https://v0.api.upyun.com/{}/{}";,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size.to_string())
+        }
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(X_UPYUN_CONTENT_DISPOSITION, pos)
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(X_UPYUN_CACHE_CONTROL, cache_control)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)

Review Comment:
   fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to