This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new d444c48cb5 feat(services): add pcloud support (#3892)
d444c48cb5 is described below

commit d444c48cb5d0bc3015ff73939cedf7545e61800f
Author: hoslo <[email protected]>
AuthorDate: Tue Jan 2 18:55:40 2024 +0800

    feat(services): add pcloud support (#3892)
---
 .env.example                        |   7 +-
 core/Cargo.toml                     |   1 +
 core/src/services/mod.rs            |   7 +
 core/src/services/pcloud/backend.rs | 421 +++++++++++++++++++++++++++++++++
 core/src/services/pcloud/core.rs    | 459 ++++++++++++++++++++++++++++++++++++
 core/src/services/pcloud/docs.md    |  53 +++++
 core/src/services/pcloud/error.rs   |  95 ++++++++
 core/src/services/pcloud/lister.rs  |  96 ++++++++
 core/src/services/pcloud/mod.rs     |  25 ++
 core/src/services/pcloud/writer.rs  |  69 ++++++
 core/src/types/operator/builder.rs  |   2 +
 core/src/types/scheme.rs            |   6 +
 12 files changed, 1240 insertions(+), 1 deletion(-)

diff --git a/.env.example b/.env.example
index 8c724bddd7..746346e04f 100644
--- a/.env.example
+++ b/.env.example
@@ -194,4 +194,9 @@ OPENDAL_UPYUN_PASSWORD=<password>
 # chainsafe
 OPENDAL_CHAINSAFE_ROOT=/path/to/dir
 OPENDAL_CHAINSAFE_BUCKET_ID=<bucket_id>
-OPENDAL_CHAINSAFE_API_KEY=<api_key>
\ No newline at end of file
+OPENDAL_CHAINSAFE_API_KEY=<api_key>
+# pcloud
+OPENDAL_PCLOUD_ROOT=/path/to/dir
+OPENDAL_PCLOUD_ENDPOINT=<endpoint>
+OPENDAL_PCLOUD_USERNAME=<username>
+OPENDAL_PCLOUD_PASSWORD=<password>
\ No newline at end of file
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 70d111d4cf..0128db3724 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -124,6 +124,7 @@ services-b2 = []
 services-seafile = []
 services-upyun = ["dep:hmac", "dep:sha1"]
 services-chainsafe = []
+services-pcloud = []
 services-cacache = ["dep:cacache"]
 services-cloudflare-kv = []
 services-cos = [
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index f0d086d6e7..c470e4e7f4 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -338,3 +338,10 @@ mod chainsafe;
 pub use chainsafe::Chainsafe;
 #[cfg(feature = "services-chainsafe")]
 pub use chainsafe::ChainsafeConfig;
+
+#[cfg(feature = "services-pcloud")]
+mod pcloud;
+#[cfg(feature = "services-pcloud")]
+pub use pcloud::Pcloud;
+#[cfg(feature = "services-pcloud")]
+pub use pcloud::PcloudConfig;
diff --git a/core/src/services/pcloud/backend.rs 
b/core/src/services/pcloud/backend.rs
new file mode 100644
index 0000000000..9338fb4893
--- /dev/null
+++ b/core/src/services/pcloud/backend.rs
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use super::core::*;
+use super::error::parse_error;
+use super::error::PcloudError;
+use super::lister::PcloudLister;
+use super::writer::PcloudWriter;
+use super::writer::PcloudWriters;
+use crate::raw::*;
+use crate::*;
+
+/// Config for backblaze Pcloud services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct PcloudConfig {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub root: Option<String>,
+    ///pCloud  endpoint address.
+    pub endpoint: String,
+    /// pCloud username.
+    pub username: Option<String>,
+    /// pCloud password.
+    pub password: Option<String>,
+}
+
+impl Debug for PcloudConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Config");
+
+        ds.field("root", &self.root);
+        ds.field("endpoint", &self.endpoint);
+        ds.field("username", &self.username);
+
+        ds.finish()
+    }
+}
+
+/// [pCloud](https://www.pcloud.com/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct PcloudBuilder {
+    config: PcloudConfig,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for PcloudBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("PcloudBuilder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl PcloudBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// Pcloud endpoint.
+    /// https://api.pcloud.com for United States and https://eapi.pcloud.com 
for Europe
+    /// ref to [doc.pcloud.com](https://docs.pcloud.com/)
+    ///
+    /// It is required. e.g. `https://api.pcloud.com`
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        self.config.endpoint = endpoint.to_string();
+
+        self
+    }
+
+    /// Pcloud username.
+    ///
+    /// It is required. your pCloud login email, e.g. `[email protected]`
+    pub fn username(&mut self, username: &str) -> &mut Self {
+        self.config.username = if username.is_empty() {
+            None
+        } else {
+            Some(username.to_string())
+        };
+
+        self
+    }
+
+    /// Pcloud password.
+    ///
+    /// It is required. your pCloud login password, e.g. `password`
+    pub fn password(&mut self, password: &str) -> &mut Self {
+        self.config.password = if password.is_empty() {
+            None
+        } else {
+            Some(password.to_string())
+        };
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for PcloudBuilder {
+    const SCHEME: Scheme = Scheme::Pcloud;
+    type Accessor = PcloudBackend;
+
+    /// Converts a HashMap into an PcloudBuilder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of PcloudBuilder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = PcloudConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an PcloudBuilder instance with the deserialized config.
+        PcloudBuilder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of PcloudBackend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle endpoint.
+        if self.config.endpoint.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Pcloud));
+        }
+
+        debug!("backend use endpoint {}", &self.config.endpoint);
+
+        let username = match &self.config.username {
+            Some(username) => Ok(username.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "username is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Pcloud)),
+        }?;
+
+        let password = match &self.config.password {
+            Some(password) => Ok(password.clone()),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "password is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::Pcloud)),
+        }?;
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::Pcloud)
+            })?
+        };
+
+        Ok(PcloudBackend {
+            core: Arc::new(PcloudCore {
+                root,
+                endpoint: self.config.endpoint.clone(),
+                username,
+                password,
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for Pcloud services.
+#[derive(Debug, Clone)]
+pub struct PcloudBackend {
+    core: Arc<PcloudCore>,
+}
+
+#[async_trait]
+impl Accessor for PcloudBackend {
+    type Reader = IncomingAsyncBody;
+
+    type BlockingReader = ();
+
+    type Writer = PcloudWriters;
+
+    type BlockingWriter = ();
+
+    type Lister = oio::PageLister<PcloudLister>;
+
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::Pcloud)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                create_dir: true,
+
+                read: true,
+                read_can_next: true,
+
+                write: true,
+
+                delete: true,
+                rename: true,
+                copy: true,
+
+                list: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        self.core.ensure_dir_exists(path).await?;
+        Ok(RpCreateDir::default())
+    }
+
+    async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
+        self.core.ensure_dir_exists(to).await?;
+
+        let resp = if from.ends_with('/') {
+            self.core.rename_folder(from, to).await?
+        } else {
+            self.core.rename_file(from, to).await?
+        };
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: PcloudError =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+                if result == 2009 || result == 2010 || result == 2055 || 
result == 2002 {
+                    return Err(Error::new(ErrorKind::NotFound, 
&format!("{resp:?}")));
+                }
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                Ok(RpRename::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        self.core.ensure_dir_exists(to).await?;
+
+        let resp = if from.ends_with('/') {
+            self.core.copy_folder(from, to).await?
+        } else {
+            self.core.copy_file(from, to).await?
+        };
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: PcloudError =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+                if result == 2009 || result == 2010 || result == 2055 || 
result == 2002 {
+                    return Err(Error::new(ErrorKind::NotFound, 
&format!("{resp:?}")));
+                }
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                Ok(RpCopy::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let link = self.core.get_file_link(path).await?;
+
+        let resp = self.core.download(&link).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let size = parse_content_length(resp.headers())?;
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        let resp = self.core.stat(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: StatResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+                if result == 2010 || result == 2055 || result == 2002 {
+                    return Err(Error::new(ErrorKind::NotFound, 
&format!("{resp:?}")));
+                }
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                if let Some(md) = resp.metadata {
+                    let md = parse_stat_metadata(md);
+                    return md.map(RpStat::new);
+                }
+
+                Err(Error::new(ErrorKind::Unexpected, &format!("{resp:?}")))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let writer = PcloudWriter::new(self.core.clone(), path.to_string());
+
+        let w = oio::OneShotWriter::new(writer);
+
+        Ok((RpWrite::default(), w))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = if path.ends_with('/') {
+            self.core.delete_folder(path).await?
+        } else {
+            self.core.delete_file(path).await?
+        };
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: PcloudError =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+
+                // pCloud returns 2005 or 2009 if the file or folder is not 
found
+                if result != 0 && result != 2005 && result != 2009 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                Ok(RpDelete::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let l = PcloudLister::new(self.core.clone(), path);
+        Ok((RpList::default(), oio::PageLister::new(l)))
+    }
+}
diff --git a/core/src/services/pcloud/core.rs b/core/src/services/pcloud/core.rs
new file mode 100644
index 0000000000..cdbf46c36c
--- /dev/null
+++ b/core/src/services/pcloud/core.rs
@@ -0,0 +1,459 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use http::{Request, Response, StatusCode};
+use serde::Deserialize;
+
+use super::error::{parse_error, PcloudError};
+
+#[derive(Clone)]
+pub struct PcloudCore {
+    /// The root of this core.
+    pub root: String,
+    /// The endpoint of this backend.
+    pub endpoint: String,
+    /// The username id of this backend.
+    pub username: String,
+    /// The password of this backend.
+    pub password: String,
+
+    pub client: HttpClient,
+}
+
+impl Debug for PcloudCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .field("username", &self.username)
+            .finish_non_exhaustive()
+    }
+}
+
+impl PcloudCore {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+}
+
+impl PcloudCore {
+    pub async fn get_file_link(&self, path: &str) -> Result<String> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/getfilelink?path=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::get(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: GetFileLinkResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+                if result == 2010 || result == 2055 || result == 2002 {
+                    return Err(Error::new(ErrorKind::NotFound, 
&format!("{resp:?}")));
+                }
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                if let Some(hosts) = resp.hosts {
+                    if let Some(path) = resp.path {
+                        if !hosts.is_empty() {
+                            return Ok(format!("https://{}{}";, hosts[0], path));
+                        }
+                    }
+                }
+                Err(Error::new(ErrorKind::Unexpected, "hosts is empty"))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn download(&self, url: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let req = Request::get(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
+        let path = build_abs_path(&self.root, path);
+
+        let paths = path.split('/').collect::<Vec<&str>>();
+
+        for i in 0..paths.len() - 1 {
+            let path = paths[..i + 1].join("/");
+            let resp = self.create_folder_if_not_exists(&path).await?;
+
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK => {
+                    let bs = resp.into_body().bytes().await?;
+                    let resp: PcloudError =
+                        
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                    let result = resp.result;
+                    if result == 2010 || result == 2055 || result == 2002 {
+                        return Err(Error::new(ErrorKind::NotFound, 
&format!("{resp:?}")));
+                    }
+                    if result != 0 {
+                        return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                    }
+
+                    if result != 0 {
+                        return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                    }
+                }
+                _ => return Err(parse_error(resp).await?),
+            }
+        }
+        Ok(())
+    }
+
+    pub async fn create_folder_if_not_exists(
+        &self,
+        path: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            "{}/createfolderifnotexists?path=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn rename_file(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = build_abs_path(&self.root, from);
+        let to = build_abs_path(&self.root, to);
+
+        let url = format!(
+            "{}/renamefile?path=/{}&topath=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&from),
+            percent_encode_path(&to),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn rename_folder(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = build_abs_path(&self.root, from);
+        let to = build_abs_path(&self.root, to);
+        let url = format!(
+            "{}/renamefolder?path=/{}&topath=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&from),
+            percent_encode_path(&to),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn delete_folder(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/deletefolder?path=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn delete_file(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/deletefile?path=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn copy_file(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = build_abs_path(&self.root, from);
+        let to = build_abs_path(&self.root, to);
+
+        let url = format!(
+            "{}/copyfile?path=/{}&topath=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&from),
+            percent_encode_path(&to),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn copy_folder(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let from = build_abs_path(&self.root, from);
+        let to = build_abs_path(&self.root, to);
+
+        let url = format!(
+            "{}/copyfolder?path=/{}&topath=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&from),
+            percent_encode_path(&to),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn stat(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let path = path.trim_end_matches('/');
+
+        let url = format!(
+            "{}/stat?path=/{}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::post(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn upload_file(&self, path: &str, bs: Bytes) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let paths: Vec<&str> = path.split('/').collect();
+        let name = paths[paths.len() - 1];
+        let path = path.replace(&format!("/{}", name), "");
+
+        let url = format!(
+            "{}/uploadfile?path=/{}&filename={}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(&path),
+            percent_encode_path(name),
+            self.username,
+            self.password
+        );
+
+        let req = Request::put(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn list_folder(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let path = normalize_root(&path);
+
+        let path = path.trim_end_matches('/');
+
+        let url = format!(
+            "{}/listfolder?path={}&username={}&password={}",
+            self.endpoint,
+            percent_encode_path(path),
+            self.username,
+            self.password
+        );
+
+        let req = Request::get(url);
+
+        // set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+}
+
+pub(super) fn parse_stat_metadata(content: StatMetadata) -> Result<Metadata> {
+    let mut md = if content.isfolder {
+        Metadata::new(EntryMode::DIR)
+    } else {
+        Metadata::new(EntryMode::FILE)
+    };
+
+    if let Some(size) = content.size {
+        md.set_content_length(size);
+    }
+
+    if let Some(size) = content.size {
+        md.set_content_length(size);
+    }
+
+    md.set_last_modified(parse_datetime_from_rfc2822(&content.modified)?);
+
+    Ok(md)
+}
+
+pub(super) fn parse_list_metadata(content: ListMetadata) -> Result<Metadata> {
+    let mut md = if content.isfolder {
+        Metadata::new(EntryMode::DIR)
+    } else {
+        Metadata::new(EntryMode::FILE)
+    };
+
+    if let Some(size) = content.size {
+        md.set_content_length(size);
+    }
+
+    if let Some(size) = content.size {
+        md.set_content_length(size);
+    }
+
+    md.set_last_modified(parse_datetime_from_rfc2822(&content.modified)?);
+
+    Ok(md)
+}
+
+#[derive(Debug, Deserialize)]
+pub struct GetFileLinkResponse {
+    pub result: u64,
+    pub path: Option<String>,
+    pub hosts: Option<Vec<String>>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct StatResponse {
+    pub result: u64,
+    pub metadata: Option<StatMetadata>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct StatMetadata {
+    pub name: String,
+    pub modified: String,
+    pub isfolder: bool,
+    pub size: Option<u64>,
+    pub contenttype: Option<String>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct ListFolderResponse {
+    pub result: u64,
+    pub metadata: Option<ListMetadata>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct ListMetadata {
+    pub name: String,
+    pub path: String,
+    pub modified: String,
+    pub isfolder: bool,
+    pub size: Option<u64>,
+    pub contenttype: Option<String>,
+    pub contents: Option<Vec<ListMetadata>>,
+}
diff --git a/core/src/services/pcloud/docs.md b/core/src/services/pcloud/docs.md
new file mode 100644
index 0000000000..d2876f4c5b
--- /dev/null
+++ b/core/src/services/pcloud/docs.md
@@ -0,0 +1,53 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [x] rename
+- [x] list
+- [x] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `endpoint`: Pcloud bucket name
+- `username` Pcloud username
+- `password` Pcloud password
+
+You can refer to [`PcloudBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Pcloud;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create backend builder
+    let mut builder = Pcloud::default();
+
+    // set the storage bucket for OpenDAL
+    builder.root("/");
+    // set the bucket for OpenDAL
+    builder.endpoint("[https](https://api.pcloud.com)");
+    // set the username for OpenDAL
+    builder.username("[email protected]");
+    // set the password name for OpenDAL
+    builder.password("opendal");
+
+    let op: Operator = Operator::new(builder)?.finish();
+
+    Ok(())
+}
+```
diff --git a/core/src/services/pcloud/error.rs 
b/core/src/services/pcloud/error.rs
new file mode 100644
index 0000000000..e59bcb672c
--- /dev/null
+++ b/core/src/services/pcloud/error.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Response;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// PcloudError is the error returned by Pcloud service.
+#[derive(Default, Deserialize)]
+pub(super) struct PcloudError {
+    pub result: u32,
+    pub error: Option<String>,
+}
+
+impl Debug for PcloudError {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PcloudError")
+            .field("result", &self.result)
+            .field("error", &self.error)
+            .finish_non_exhaustive()
+    }
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+    let message = String::from_utf8_lossy(&bs).into_owned();
+
+    let mut err = Error::new(ErrorKind::Unexpected, &message);
+
+    err = with_error_response_context(err, parts);
+
+    Ok(err)
+}
+
+#[cfg(test)]
+mod test {
+    use futures::stream;
+    use http::StatusCode;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_parse_error() {
+        let err_res = vec![(
+            r#"<html>
+
+                <head>
+                    <title>Invalid link</title>
+                </head>
+                
+                <body>This link was generated for another IP address. Try 
previous step again.</body>
+                
+                </html> "#,
+            ErrorKind::Unexpected,
+            StatusCode::GONE,
+        )];
+
+        for res in err_res {
+            let bs = bytes::Bytes::from(res.0);
+            let body = IncomingAsyncBody::new(
+                Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+                None,
+            );
+            let resp = Response::builder().status(res.2).body(body).unwrap();
+
+            let err = parse_error(resp).await;
+
+            assert!(err.is_ok());
+            assert_eq!(err.unwrap().kind(), res.1);
+        }
+    }
+}
diff --git a/core/src/services/pcloud/lister.rs 
b/core/src/services/pcloud/lister.rs
new file mode 100644
index 0000000000..e4dd65c381
--- /dev/null
+++ b/core/src/services/pcloud/lister.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::*;
+
+pub struct PcloudLister {
+    core: Arc<PcloudCore>,
+
+    path: String,
+}
+
+impl PcloudLister {
+    pub(super) fn new(core: Arc<PcloudCore>, path: &str) -> Self {
+        PcloudLister {
+            core,
+            path: path.to_string(),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::PageList for PcloudLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self.core.list_folder(&self.path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp: ListFolderResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+
+                if result == 2005 {
+                    ctx.done = true;
+                    return Ok(());
+                }
+
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                if let Some(metadata) = resp.metadata {
+                    if let Some(contents) = metadata.contents {
+                        for content in contents {
+                            let path = if content.isfolder {
+                                format!("{}/", content.path.clone())
+                            } else {
+                                content.path.clone()
+                            };
+
+                            let md = parse_list_metadata(content)?;
+                            let path = build_rel_path(&self.core.root, &path);
+
+                            ctx.entries.push_back(Entry::new(&path, md))
+                        }
+                    }
+
+                    ctx.done = true;
+                    return Ok(());
+                }
+
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    &String::from_utf8_lossy(&bs),
+                ));
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/services/pcloud/mod.rs b/core/src/services/pcloud/mod.rs
new file mode 100644
index 0000000000..50fb5e5f53
--- /dev/null
+++ b/core/src/services/pcloud/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::PcloudBuilder as Pcloud;
+pub use backend::PcloudConfig;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/pcloud/writer.rs 
b/core/src/services/pcloud/writer.rs
new file mode 100644
index 0000000000..9bd47bb023
--- /dev/null
+++ b/core/src/services/pcloud/writer.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::PcloudCore;
+use super::error::{parse_error, PcloudError};
+
+pub type PcloudWriters = oio::OneShotWriter<PcloudWriter>;
+
+pub struct PcloudWriter {
+    core: Arc<PcloudCore>,
+    path: String,
+}
+
+impl PcloudWriter {
+    pub fn new(core: Arc<PcloudCore>, path: String) -> Self {
+        PcloudWriter { core, path }
+    }
+}
+
+#[async_trait]
+impl oio::OneShotWrite for PcloudWriter {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
+
+        self.core.ensure_dir_exists(&self.path).await?;
+
+        let resp = self.core.upload_file(&self.path, bs).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let resp: PcloudError =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                let result = resp.result;
+
+                if result != 0 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("{resp:?}")));
+                }
+
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index ccffc55dc5..506d1cefe4 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -159,6 +159,8 @@ impl Operator {
             Scheme::Alluxio => 
Self::from_map::<services::Alluxio>(map)?.finish(),
             #[cfg(feature = "services-upyun")]
             Scheme::Upyun => Self::from_map::<services::Upyun>(map)?.finish(),
+            #[cfg(feature = "services-pcloud")]
+            Scheme::Pcloud => 
Self::from_map::<services::Pcloud>(map)?.finish(),
             #[cfg(feature = "services-chainsafe")]
             Scheme::Chainsafe => 
Self::from_map::<services::Chainsafe>(map)?.finish(),
             #[cfg(feature = "services-azblob")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index cbf44565a0..f74e992672 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -44,6 +44,8 @@ pub enum Scheme {
     Seafile,
     /// [Upyun][crate::services::Upyun]: Upyun Services.
     Upyun,
+    /// [Pcloud][crate::services::Pcloud]: Pcloud Services.
+    Pcloud,
     /// [Chainsafe][crate::services::Chainsafe]: Chainsafe Services.
     Chainsafe,
     /// [cacache][crate::services::Cacache]: cacache backend support.
@@ -249,6 +251,8 @@ impl Scheme {
             Scheme::Seafile,
             #[cfg(feature = "services-upyun")]
             Scheme::Upyun,
+            #[cfg(feature = "services-pcloud")]
+            Scheme::Pcloud,
             #[cfg(feature = "services-sftp")]
             Scheme::Sftp,
             #[cfg(feature = "services-sled")]
@@ -339,6 +343,7 @@ impl FromStr for Scheme {
             "s3" => Ok(Scheme::S3),
             "seafile" => Ok(Scheme::Seafile),
             "upyun" => Ok(Scheme::Upyun),
+            "pcloud" => Ok(Scheme::Pcloud),
             "sftp" => Ok(Scheme::Sftp),
             "sled" => Ok(Scheme::Sled),
             "supabase" => Ok(Scheme::Supabase),
@@ -412,6 +417,7 @@ impl From<Scheme> for &'static str {
             Scheme::Mongodb => "mongodb",
             Scheme::Alluxio => "alluxio",
             Scheme::Upyun => "upyun",
+            Scheme::Pcloud => "pcloud",
             Scheme::Custom(v) => v,
         }
     }


Reply via email to