This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a9858cd4ec feat(services): add yandex disk support (#3918)
a9858cd4ec is described below
commit a9858cd4ec9a07eacfcfc56e43d002e32f19ed83
Author: hoslo <[email protected]>
AuthorDate: Fri Jan 5 20:04:02 2024 +0800
feat(services): add yandex disk support (#3918)
---
.env.example | 5 +-
core/Cargo.toml | 1 +
core/src/services/mod.rs | 7 +
core/src/services/yandex_disk/backend.rs | 325 +++++++++++++++++++++++++++++++
core/src/services/yandex_disk/core.rs | 317 ++++++++++++++++++++++++++++++
core/src/services/yandex_disk/docs.md | 47 +++++
core/src/services/yandex_disk/error.rs | 110 +++++++++++
core/src/services/yandex_disk/lister.rs | 114 +++++++++++
core/src/services/yandex_disk/mod.rs | 25 +++
core/src/services/yandex_disk/writer.rs | 67 +++++++
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 6 +
12 files changed, 1025 insertions(+), 1 deletion(-)
diff --git a/.env.example b/.env.example
index 746346e04f..849e09786b 100644
--- a/.env.example
+++ b/.env.example
@@ -199,4 +199,7 @@ OPENDAL_CHAINSAFE_API_KEY=<api_key>
OPENDAL_PCLOUD_ROOT=/path/to/dir
OPENDAL_PCLOUD_ENDPOINT=<endpoint>
OPENDAL_PCLOUD_USERNAME=<username>
-OPENDAL_PCLOUD_PASSWORD=<password>
\ No newline at end of file
+OPENDAL_PCLOUD_PASSWORD=<password>
+# yandex-disk
+OPENDAL_YANDEX_DISK_ROOT=/path/to/dir
+OPENDAL_YANDEX_DISK_ACCESS_TOKEN=<access_token>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 11c92212a0..0134477352 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -195,6 +195,7 @@ services-vercel-artifacts = []
services-wasabi = []
services-webdav = []
services-webhdfs = []
+services-yandex-disk = []
[lib]
bench = false
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index ad9d2368ac..f67d2a2334 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -347,3 +347,10 @@ mod pcloud;
pub use pcloud::Pcloud;
#[cfg(feature = "services-pcloud")]
pub use pcloud::PcloudConfig;
+
+#[cfg(feature = "services-yandex-disk")]
+mod yandex_disk;
+#[cfg(feature = "services-yandex-disk")]
+pub use yandex_disk::YandexDisk;
+#[cfg(feature = "services-yandex-disk")]
+pub use yandex_disk::YandexDiskConfig;
diff --git a/core/src/services/yandex_disk/backend.rs
b/core/src/services/yandex_disk/backend.rs
new file mode 100644
index 0000000000..d22032bc91
--- /dev/null
+++ b/core/src/services/yandex_disk/backend.rs
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::Request;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use super::core::*;
+use super::error::parse_error;
+use super::lister::YandexDiskLister;
+use super::writer::YandexDiskWriter;
+use super::writer::YandexDiskWriters;
+use crate::raw::*;
+use crate::*;
+
+/// Config for backblaze YandexDisk services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct YandexDiskConfig {
+ /// root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub root: Option<String>,
+ /// yandex disk oauth access_token.
+ pub access_token: String,
+}
+
+impl Debug for YandexDiskConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("Config");
+
+ ds.field("root", &self.root);
+
+ ds.finish()
+ }
+}
+
+/// [YandexDisk](https://360.yandex.com/disk/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct YandexDiskBuilder {
+ config: YandexDiskConfig,
+
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for YandexDiskBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("YandexDiskBuilder");
+
+ d.field("config", &self.config);
+ d.finish_non_exhaustive()
+ }
+}
+
+impl YandexDiskBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.config.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// yandex disk oauth access_token.
+ /// The valid token will looks like
`y0_XXXXXXqihqIWAADLWwAAAAD3IXXXXXX0gtVeSPeIKM0oITMGhXXXXXX`.
+ /// We can fetch the debug token from
<https://yandex.com/dev/disk/poligon>.
+ /// To use it in production, please register an app at
<https://oauth.yandex.com> instead.
+ pub fn access_token(&mut self, access_token: &str) -> &mut Self {
+ self.config.access_token = access_token.to_string();
+
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+}
+
+impl Builder for YandexDiskBuilder {
+ const SCHEME: Scheme = Scheme::YandexDisk;
+ type Accessor = YandexDiskBackend;
+
+ /// Converts a HashMap into an YandexDiskBuilder instance.
+ ///
+ /// # Arguments
+ ///
+ /// * `map` - A HashMap containing the configuration values.
+ ///
+ /// # Returns
+ ///
+ /// Returns an instance of YandexDiskBuilder.
+ fn from_map(map: HashMap<String, String>) -> Self {
+ // Deserialize the configuration from the HashMap.
+ let config =
YandexDiskConfig::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ // Create an YandexDiskBuilder instance with the deserialized config.
+ YandexDiskBuilder {
+ config,
+ http_client: None,
+ }
+ }
+
+ /// Builds the backend and returns the result of YandexDiskBackend.
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let root =
normalize_root(&self.config.root.clone().unwrap_or_default());
+ debug!("backend use root {}", &root);
+
+ // Handle oauth access_token.
+ if self.config.access_token.is_empty() {
+ return Err(
+ Error::new(ErrorKind::ConfigInvalid, "access_token is empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::YandexDisk),
+ );
+ }
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::YandexDisk)
+ })?
+ };
+
+ Ok(YandexDiskBackend {
+ core: Arc::new(YandexDiskCore {
+ root,
+ access_token: self.config.access_token.clone(),
+ client,
+ }),
+ })
+ }
+}
+
+/// Backend for YandexDisk services.
+#[derive(Debug, Clone)]
+pub struct YandexDiskBackend {
+ core: Arc<YandexDiskCore>,
+}
+
+#[async_trait]
+impl Accessor for YandexDiskBackend {
+ type Reader = IncomingAsyncBody;
+ type Writer = YandexDiskWriters;
+ type Lister = oio::PageLister<YandexDiskLister>;
+ type BlockingReader = ();
+ type BlockingWriter = ();
+ type BlockingLister = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::YandexDisk)
+ .set_root(&self.core.root)
+ .set_native_capability(Capability {
+ stat: true,
+
+ create_dir: true,
+
+ read: true,
+
+ write: true,
+ write_can_empty: true,
+
+ delete: true,
+ rename: true,
+ copy: true,
+
+ list: true,
+ list_with_limit: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ self.core.ensure_dir_exists(path).await?;
+
+ Ok(RpCreateDir::default())
+ }
+
+ async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
+ self.core.ensure_dir_exists(to).await?;
+
+ let resp = self.core.move_object(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::CREATED => {
+ resp.into_body().consume().await?;
+
+ Ok(RpRename::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
+ self.core.ensure_dir_exists(to).await?;
+
+ let resp = self.core.copy(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::CREATED => {
+ resp.into_body().consume().await?;
+
+ Ok(RpCopy::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let download_url = self.core.get_download_url(path).await?;
+
+ let req = Request::get(download_url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let size = parse_content_length(resp.headers())?;
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+ let resp = self.core.metainformation(path, None, None).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let mf: MetainformationResponse =
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+ parse_info(mf).map(RpStat::new)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let writer = YandexDiskWriter::new(self.core.clone(),
path.to_string());
+
+ let w = oio::OneShotWriter::new(writer);
+
+ Ok((RpWrite::default(), w))
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let resp = self.core.delete(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(RpDelete::default()),
+ StatusCode::NO_CONTENT => Ok(RpDelete::default()),
+ // Yandex Disk deleting a non-empty folder can take an unknown
amount of time,
+ // So the API responds with the code 202 Accepted (the deletion
process has started).
+ StatusCode::ACCEPTED => Ok(RpDelete::default()),
+ // Allow 404 when deleting a non-existing object
+ StatusCode::NOT_FOUND => Ok(RpDelete::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ let l = YandexDiskLister::new(self.core.clone(), path, args.limit());
+ Ok((RpList::default(), oio::PageLister::new(l)))
+ }
+}
diff --git a/core/src/services/yandex_disk/core.rs
b/core/src/services/yandex_disk/core.rs
new file mode 100644
index 0000000000..d3852ea79e
--- /dev/null
+++ b/core/src/services/yandex_disk/core.rs
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+
+use http::{header, request, Request, Response, StatusCode};
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_error;
+
+#[derive(Clone)]
+pub struct YandexDiskCore {
+ /// The root of this core.
+ pub root: String,
+ /// Yandex Disk oauth access_token.
+ pub access_token: String,
+
+ pub client: HttpClient,
+}
+
+impl Debug for YandexDiskCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend")
+ .field("root", &self.root)
+ .finish_non_exhaustive()
+ }
+}
+
+impl YandexDiskCore {
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+
+ #[inline]
+ pub fn sign(&self, req: request::Builder) -> request::Builder {
+ req.header(
+ header::AUTHORIZATION,
+ format!("OAuth {}", self.access_token),
+ )
+ }
+}
+
+impl YandexDiskCore {
+ /// Get upload url.
+ pub async fn get_upload_url(&self, path: &str) -> Result<String> {
+ let path = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+
"https://cloud-api.yandex.net/v1/disk/resources/upload?path={}&overwrite=true",
+ percent_encode_path(&path)
+ );
+
+ let req = Request::get(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bytes = resp.into_body().bytes().await?;
+
+ let resp: GetUploadUrlResponse =
+
serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?;
+
+ Ok(resp.href)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn get_download_url(&self, path: &str) -> Result<String> {
+ let path = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+
"https://cloud-api.yandex.net/v1/disk/resources/download?path={}&overwrite=true",
+ percent_encode_path(&path)
+ );
+
+ let req = Request::get(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bytes = resp.into_body().bytes().await?;
+
+ let resp: GetUploadUrlResponse =
+
serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?;
+
+ Ok(resp.href)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn ensure_dir_exists(&self, path: &str) -> Result<()> {
+ let path = build_abs_path(&self.root, path);
+
+ let paths = path.split('/').collect::<Vec<&str>>();
+
+ for i in 0..paths.len() - 1 {
+ let path = paths[..i + 1].join("/");
+ let resp = self.create_dir(&path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::CONFLICT => {
+ resp.into_body().consume().await?;
+ }
+ _ => return Err(parse_error(resp).await?),
+ }
+ }
+ Ok(())
+ }
+
+ pub async fn create_dir(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let url = format!(
+ "https://cloud-api.yandex.net/v1/disk/resources?path=/{}",
+ percent_encode_path(path),
+ );
+
+ let req = Request::put(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn copy(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let from = build_rooted_abs_path(&self.root, from);
+ let to = build_rooted_abs_path(&self.root, to);
+
+ let url = format!(
+
"https://cloud-api.yandex.net/v1/disk/resources/copy?from={}&path={}&overwrite=true",
+ percent_encode_path(&from),
+ percent_encode_path(&to)
+ );
+
+ let req = Request::post(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn move_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let from = build_rooted_abs_path(&self.root, from);
+ let to = build_rooted_abs_path(&self.root, to);
+
+ let url = format!(
+
"https://cloud-api.yandex.net/v1/disk/resources/move?from={}&path={}&overwrite=true",
+ percent_encode_path(&from),
+ percent_encode_path(&to)
+ );
+
+ let req = Request::post(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+
"https://cloud-api.yandex.net/v1/disk/resources?path={}&permanently=true",
+ percent_encode_path(&path),
+ );
+
+ let req = Request::delete(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn metainformation(
+ &self,
+ path: &str,
+ limit: Option<usize>,
+ offset: Option<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let path = build_rooted_abs_path(&self.root, path);
+
+ let mut url = format!(
+ "https://cloud-api.yandex.net/v1/disk/resources?path={}",
+ percent_encode_path(&path),
+ );
+
+ if let Some(limit) = limit {
+ url = format!("{}&limit={}", url, limit);
+ }
+
+ if let Some(offset) = offset {
+ url = format!("{}&offset={}", url, offset);
+ }
+
+ let req = Request::get(url);
+
+ let req = self.sign(req);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+}
+
+pub(super) fn parse_info(mf: MetainformationResponse) -> Result<Metadata> {
+ let mode = if mf.ty == "file" {
+ EntryMode::FILE
+ } else {
+ EntryMode::DIR
+ };
+
+ let mut m = Metadata::new(mode);
+
+ m.set_last_modified(parse_datetime_from_rfc3339(&mf.modified)?);
+
+ if let Some(md5) = mf.md5 {
+ m.set_content_md5(&md5);
+ }
+
+ if let Some(mime_type) = mf.mime_type {
+ m.set_content_type(&mime_type);
+ }
+
+ if let Some(size) = mf.size {
+ m.set_content_length(size);
+ }
+
+ Ok(m)
+}
+
+#[derive(Debug, Deserialize)]
+pub struct GetUploadUrlResponse {
+ pub href: String,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct MetainformationResponse {
+ #[serde(rename = "type")]
+ pub ty: String,
+ pub name: String,
+ pub path: String,
+ pub modified: String,
+ pub md5: Option<String>,
+ pub mime_type: Option<String>,
+ pub size: Option<u64>,
+ #[serde(rename = "_embedded")]
+ pub embedded: Option<Embedded>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct Embedded {
+ pub total: usize,
+ pub items: Vec<MetainformationResponse>,
+}
diff --git a/core/src/services/yandex_disk/docs.md
b/core/src/services/yandex_disk/docs.md
new file mode 100644
index 0000000000..5a55119714
--- /dev/null
+++ b/core/src/services/yandex_disk/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [x] rename
+- [x] list
+- [x] scan
+- [ ] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `access_token` YandexDisk oauth access_token
+
+You can refer to [`YandexDiskBuilder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::YandexDisk;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create backend builder
+ let mut builder = YandexDisk::default();
+
+ // set the storage bucket for OpenDAL
+ builder.root("/");
+ // set the access_token for OpenDAL
+ builder.access_token("test");
+
+ let op: Operator = Operator::new(builder)?.finish();
+
+ Ok(())
+}
+```
diff --git a/core/src/services/yandex_disk/error.rs
b/core/src/services/yandex_disk/error.rs
new file mode 100644
index 0000000000..c56c40e84f
--- /dev/null
+++ b/core/src/services/yandex_disk/error.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use quick_xml::de;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// YandexDiskError is the error returned by YandexDisk service.
+#[derive(Default, Debug, Deserialize)]
+#[allow(unused)]
+struct YandexDiskError {
+ message: String,
+ description: String,
+ error: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (kind, retryable) = match parts.status.as_u16() {
+ 400 => (ErrorKind::InvalidInput, false),
+ 410 | 403 => (ErrorKind::PermissionDenied, false),
+ 404 => (ErrorKind::NotFound, false),
+ 499 => (ErrorKind::Unexpected, true),
+ 503 | 507 => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let (message, _yandex_disk_err) = de::from_reader::<_,
YandexDiskError>(bs.clone().reader())
+ .map(|yandex_disk_err| (format!("{yandex_disk_err:?}"),
Some(yandex_disk_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ let mut err = Error::new(kind, &message);
+
+ err = with_error_response_context(err, parts);
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
+#[cfg(test)]
+mod test {
+ use futures::stream;
+ use http::StatusCode;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_parse_error() {
+ let err_res = vec![
+ (
+ r#"{
+ "message": "Не удалось найти запрошенный ресурс.",
+ "description": "Resource not found.",
+ "error": "DiskNotFoundError"
+ }"#,
+ ErrorKind::NotFound,
+ StatusCode::NOT_FOUND,
+ ),
+ (
+ r#"{
+ "message": "Не авторизован.",
+ "description": "Unauthorized",
+ "error": "UnauthorizedError"
+ }"#,
+ ErrorKind::PermissionDenied,
+ StatusCode::FORBIDDEN,
+ ),
+ ];
+
+ for res in err_res {
+ let bs = bytes::Bytes::from(res.0);
+ let body = IncomingAsyncBody::new(
+ Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+ None,
+ );
+ let resp = Response::builder().status(res.2).body(body).unwrap();
+
+ let err = parse_error(resp).await;
+
+ assert!(err.is_ok());
+ assert_eq!(err.unwrap().kind(), res.1);
+ }
+ }
+}
diff --git a/core/src/services/yandex_disk/lister.rs
b/core/src/services/yandex_disk/lister.rs
new file mode 100644
index 0000000000..a116ce53a0
--- /dev/null
+++ b/core/src/services/yandex_disk/lister.rs
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use super::core::parse_info;
+use super::core::MetainformationResponse;
+use super::core::YandexDiskCore;
+use super::error::parse_error;
+use crate::raw::oio::Entry;
+use crate::raw::*;
+use crate::Result;
+
+pub struct YandexDiskLister {
+ core: Arc<YandexDiskCore>,
+
+ path: String,
+ limit: Option<usize>,
+}
+
+impl YandexDiskLister {
+ pub(super) fn new(core: Arc<YandexDiskCore>, path: &str, limit:
Option<usize>) -> Self {
+ YandexDiskLister {
+ core,
+ path: path.to_string(),
+ limit,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::PageList for YandexDiskLister {
+ async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+ let offset = if ctx.token.is_empty() {
+ None
+ } else {
+ Some(ctx.token.clone())
+ };
+
+ let resp = self
+ .core
+ .metainformation(&self.path, self.limit, offset)
+ .await?;
+
+ if resp.status() == http::StatusCode::NOT_FOUND {
+ ctx.done = true;
+ return Ok(());
+ }
+
+ match resp.status() {
+ http::StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+
+ let resp: MetainformationResponse =
+
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
+
+ if let Some(embedded) = resp.embedded {
+ let n = embedded.items.len();
+
+ for mf in embedded.items {
+ let path = mf.path.strip_prefix("disk:");
+
+ if let Some(path) = path {
+ let mut path = build_rel_path(&self.core.root,
path);
+
+ let md = parse_info(mf)?;
+
+ if md.mode().is_dir() {
+ path = format!("{}/", path);
+ }
+
+ ctx.entries.push_back(Entry::new(&path, md));
+ };
+ }
+
+ let current_len = ctx.token.parse::<usize>().unwrap_or(0)
+ n;
+
+ if current_len >= embedded.total {
+ ctx.done = true;
+ }
+
+ ctx.token = current_len.to_string();
+
+ return Ok(());
+ }
+ }
+ http::StatusCode::NOT_FOUND => {
+ ctx.done = true;
+ return Ok(());
+ }
+ _ => {
+ return Err(parse_error(resp).await?);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/yandex_disk/mod.rs
b/core/src/services/yandex_disk/mod.rs
new file mode 100644
index 0000000000..e2f2aff448
--- /dev/null
+++ b/core/src/services/yandex_disk/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::YandexDiskBuilder as YandexDisk;
+pub use backend::YandexDiskConfig;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/yandex_disk/writer.rs
b/core/src/services/yandex_disk/writer.rs
new file mode 100644
index 0000000000..b25d770098
--- /dev/null
+++ b/core/src/services/yandex_disk/writer.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::{Request, StatusCode};
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::YandexDiskCore;
+use super::error::parse_error;
+
+pub type YandexDiskWriters = oio::OneShotWriter<YandexDiskWriter>;
+
+pub struct YandexDiskWriter {
+ core: Arc<YandexDiskCore>,
+ path: String,
+}
+
+impl YandexDiskWriter {
+ pub fn new(core: Arc<YandexDiskCore>, path: String) -> Self {
+ YandexDiskWriter { core, path }
+ }
+}
+
+#[async_trait]
+impl oio::OneShotWrite for YandexDiskWriter {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ self.core.ensure_dir_exists(&self.path).await?;
+
+ let upload_url = self.core.get_upload_url(&self.path).await?;
+
+ let bs = bs.bytes(bs.remaining());
+
+ let req = Request::put(upload_url)
+ .body(AsyncBody::Bytes(bs))
+ .map_err(new_request_build_error)?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 506d1cefe4..30dcd251ed 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -159,6 +159,8 @@ impl Operator {
Scheme::Alluxio =>
Self::from_map::<services::Alluxio>(map)?.finish(),
#[cfg(feature = "services-upyun")]
Scheme::Upyun => Self::from_map::<services::Upyun>(map)?.finish(),
+ #[cfg(feature = "services-yandex-disk")]
+ Scheme::YandexDisk =>
Self::from_map::<services::YandexDisk>(map)?.finish(),
#[cfg(feature = "services-pcloud")]
Scheme::Pcloud =>
Self::from_map::<services::Pcloud>(map)?.finish(),
#[cfg(feature = "services-chainsafe")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index f74e992672..cdb4cf2f67 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -44,6 +44,8 @@ pub enum Scheme {
Seafile,
/// [Upyun][crate::services::Upyun]: Upyun Services.
Upyun,
+ /// [YandexDisk][crate::services::YandexDisk]: YandexDisk Services.
+ YandexDisk,
/// [Pcloud][crate::services::Pcloud]: Pcloud Services.
Pcloud,
/// [Chainsafe][crate::services::Chainsafe]: Chainsafe Services.
@@ -251,6 +253,8 @@ impl Scheme {
Scheme::Seafile,
#[cfg(feature = "services-upyun")]
Scheme::Upyun,
+ #[cfg(feature = "services-yandex-disk")]
+ Scheme::YandexDisk,
#[cfg(feature = "services-pcloud")]
Scheme::Pcloud,
#[cfg(feature = "services-sftp")]
@@ -343,6 +347,7 @@ impl FromStr for Scheme {
"s3" => Ok(Scheme::S3),
"seafile" => Ok(Scheme::Seafile),
"upyun" => Ok(Scheme::Upyun),
+ "yandex_disk" => Ok(Scheme::YandexDisk),
"pcloud" => Ok(Scheme::Pcloud),
"sftp" => Ok(Scheme::Sftp),
"sled" => Ok(Scheme::Sled),
@@ -417,6 +422,7 @@ impl From<Scheme> for &'static str {
Scheme::Mongodb => "mongodb",
Scheme::Alluxio => "alluxio",
Scheme::Upyun => "upyun",
+ Scheme::YandexDisk => "yandex_disk",
Scheme::Pcloud => "pcloud",
Scheme::Custom(v) => v,
}