This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f12b05c1 feat(services/sftp): Add read/write/stat support for sftp
(#2186)
f12b05c1 is described below
commit f12b05c1a4f808ca10f986057a9f430489cdabfb
Author: silver-ymz <[email protected]>
AuthorDate: Mon May 1 18:35:00 2023 +0800
feat(services/sftp): Add read/write/stat support for sftp (#2186)
* add sftp basic skeleton, exist life cycle problem
* complete sftp basic skeleton
* fix some buf && pass alomost all tests
* typo
* add some comments
* fix format
* fix clippy warning
---
.env.example | 6 +
Cargo.lock | 269 +++++++++++++++++++-
core/Cargo.toml | 9 +
core/README.md | 1 +
core/src/services/ftp/backend.rs | 6 +-
core/src/services/mod.rs | 5 +
core/src/services/sftp/backend.rs | 507 ++++++++++++++++++++++++++++++++++++++
core/src/services/sftp/error.rs | 91 +++++++
core/src/services/sftp/mod.rs | 24 ++
core/src/services/sftp/pager.rs | 101 ++++++++
core/src/services/sftp/utils.rs | 108 ++++++++
core/src/services/sftp/writer.rs | 57 +++++
core/src/types/scheme.rs | 3 +
core/tests/behavior/main.rs | 1 +
14 files changed, 1179 insertions(+), 9 deletions(-)
diff --git a/.env.example b/.env.example
index 2bc8a6b7..2b83073a 100644
--- a/.env.example
+++ b/.env.example
@@ -61,6 +61,12 @@ OPENDAL_REDIS_DB=0
OPENDAL_ROCKSDB_TEST=false
OPENDAL_ROCKSDB_DATADIR=/path/to/database
OPENDAL_ROCKSDB_ROOT=/path/to/root
+# sftp
+OPENDAL_SFTP_TEST=false
+OPENDAL_SFTP_ENDPOINT=ssh://<endpoint>
+OPENDAL_SFTP_ROOT=/path/to/dir
+OPENDAL_SFTP_USER=<user>
+OPENDAL_SFTP_KEY=<key_path>
# sled
OPENDAL_SLED_TEST=false
OPENDAL_SLED_DATADIR=/path/to/database
diff --git a/Cargo.lock b/Cargo.lock
index 15b2aa90..ae8f0cc8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -104,6 +104,12 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
+[[package]]
+name = "array-init"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc"
+
[[package]]
name = "assert-json-diff"
version = "2.0.2"
@@ -312,6 +318,22 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+[[package]]
+name = "awaitable"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70af449c9a763cb655c6a1e5338b42d99c67190824ff90658c1e30be844c0775"
+dependencies = [
+ "awaitable-error",
+ "cfg-if",
+]
+
+[[package]]
+name = "awaitable-error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a"
+
[[package]]
name = "backon"
version = "0.4.0"
@@ -716,12 +738,31 @@ dependencies = [
"crossbeam-utils",
]
+[[package]]
+name = "concurrent_arena"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24bfeb060a299f86521bb3940344800fc861cc506356e44a273a42cb552afde5"
+dependencies = [
+ "arc-swap",
+ "array-init",
+ "const_fn_assert",
+ "parking_lot 0.12.1",
+ "triomphe",
+]
+
[[package]]
name = "const-oid"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913"
+[[package]]
+name = "const_fn_assert"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "27d614f23f34f7b5165a77dc1591f497e2518f9cec4b4f4b92bfc4dc6cf7a190"
+
[[package]]
name = "convert_case"
version = "0.6.0"
@@ -994,6 +1035,17 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "derive_destructure2"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35cb7e5875e1028a73e551747d6d0118f25c3d6dbba2dadf97cc0f4d0c53f2f5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "diff"
version = "0.1.13"
@@ -1018,13 +1070,33 @@ dependencies = [
"subtle",
]
+[[package]]
+name = "dirs"
+version = "4.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
+dependencies = [
+ "dirs-sys 0.3.7",
+]
+
[[package]]
name = "dirs"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd"
dependencies = [
- "dirs-sys",
+ "dirs-sys 0.4.0",
+]
+
+[[package]]
+name = "dirs-sys"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
+dependencies = [
+ "libc",
+ "redox_users",
+ "winapi",
]
[[package]]
@@ -2301,6 +2373,17 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "num-derive"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -2348,7 +2431,7 @@ version = "0.33.2"
dependencies = [
"anyhow",
"clap 4.2.5",
- "dirs",
+ "dirs 5.0.0",
"env_logger",
"futures",
"log",
@@ -2398,7 +2481,7 @@ dependencies = [
"anyhow",
"assert_cmd",
"clap 4.2.5",
- "dirs",
+ "dirs 5.0.0",
"env_logger",
"futures",
"log",
@@ -2453,8 +2536,11 @@ dependencies = [
"minitrace",
"moka",
"once_cell",
+ "openssh",
+ "openssh-sftp-client",
"opentelemetry 0.19.0",
"opentelemetry-jaeger",
+ "owning_ref",
"parking_lot 0.12.1",
"paste",
"percent-encoding",
@@ -2535,6 +2621,95 @@ dependencies = [
"rb-sys-env",
]
+[[package]]
+name = "openssh"
+version = "0.9.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ca6c277973fb549b36dd8980941b5ea3ecebea026f5b1f0060acde74d893c22"
+dependencies = [
+ "dirs 4.0.0",
+ "libc",
+ "once_cell",
+ "shell-escape",
+ "tempfile",
+ "thiserror",
+ "tokio",
+ "tokio-pipe",
+]
+
+[[package]]
+name = "openssh-sftp-client"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620"
+dependencies = [
+ "bytes",
+ "derive_destructure2",
+ "once_cell",
+ "openssh-sftp-client-lowlevel",
+ "openssh-sftp-error",
+ "scopeguard",
+ "tokio",
+ "tokio-io-utility",
+ "tokio-util",
+]
+
+[[package]]
+name = "openssh-sftp-client-lowlevel"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa"
+dependencies = [
+ "awaitable",
+ "bytes",
+ "concurrent_arena",
+ "derive_destructure2",
+ "openssh-sftp-error",
+ "openssh-sftp-protocol",
+ "pin-project",
+ "tokio",
+ "tokio-io-utility",
+]
+
+[[package]]
+name = "openssh-sftp-error"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486"
+dependencies = [
+ "awaitable-error",
+ "openssh-sftp-protocol-error",
+ "ssh_format_error",
+ "thiserror",
+ "tokio",
+]
+
+[[package]]
+name = "openssh-sftp-protocol"
+version = "0.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf38532d784978966f95d241226223823f351d5bb2a4bebcf6b20b9cb1e393e0"
+dependencies = [
+ "bitflags 2.0.2",
+ "num-derive",
+ "num-traits",
+ "openssh-sftp-protocol-error",
+ "serde",
+ "ssh_format",
+ "vec-strings",
+]
+
+[[package]]
+name = "openssh-sftp-protocol-error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0719269eb3f037866ae07ec89cb44ed2c1d63b72b2390cef8e1aa3016a956ff8"
+dependencies = [
+ "serde",
+ "thiserror",
+ "vec-strings",
+]
+
[[package]]
name = "openssl"
version = "0.10.47"
@@ -2717,6 +2892,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+[[package]]
+name = "owning_ref"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
+dependencies = [
+ "stable_deref_trait",
+]
+
[[package]]
name = "parking"
version = "2.0.0"
@@ -3425,7 +3609,7 @@ dependencies = [
"base64 0.21.0",
"bytes",
"chrono",
- "dirs",
+ "dirs 5.0.0",
"form_urlencoded",
"hex",
"hmac",
@@ -3841,6 +4025,12 @@ dependencies = [
"lazy_static",
]
+[[package]]
+name = "shell-escape"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
+
[[package]]
name = "shell-words"
version = "1.1.0"
@@ -3993,6 +4183,32 @@ dependencies = [
"der",
]
+[[package]]
+name = "ssh_format"
+version = "0.14.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24ab31081d1c9097c327ec23550858cb5ffb4af6b866c1ef4d728455f01f3304"
+dependencies = [
+ "bytes",
+ "serde",
+ "ssh_format_error",
+]
+
+[[package]]
+name = "ssh_format_error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be3c6519de7ca611f71ef7e8a56eb57aa1c818fecb5242d0a0f39c83776c210c"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "stable_deref_trait"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
+
[[package]]
name = "strsim"
version = "0.10.0"
@@ -4088,6 +4304,12 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
+[[package]]
+name = "thin-vec"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8"
+
[[package]]
name = "thiserror"
version = "1.0.40"
@@ -4222,6 +4444,16 @@ dependencies = [
"windows-sys 0.45.0",
]
+[[package]]
+name = "tokio-io-utility"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8d672654d175710e52c7c41f6aec77c62b3c0954e2a7ebce9049d1e94ed7c263"
+dependencies = [
+ "bytes",
+ "tokio",
+]
+
[[package]]
name = "tokio-macros"
version = "2.0.0"
@@ -4243,6 +4475,16 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-pipe"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@@ -4256,9 +4498,9 @@ dependencies = [
[[package]]
name = "tokio-util"
-version = "0.7.7"
+version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
+checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes",
"futures-core",
@@ -4398,6 +4640,11 @@ name = "triomphe"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
+dependencies = [
+ "arc-swap",
+ "serde",
+ "stable_deref_trait",
+]
[[package]]
name = "trust-dns-proto"
@@ -4566,6 +4813,16 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
+[[package]]
+name = "vec-strings"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c8509489e2a7ee219522238ad45fd370bec6808811ac15ac6b07453804e77659"
+dependencies = [
+ "serde",
+ "thin-vec",
+]
+
[[package]]
name = "version_check"
version = "0.9.4"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b88de207..5dc6170c 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -135,6 +135,12 @@ services-s3 = [
"reqsign?/services-aws",
"reqsign?/reqwest_request",
]
+services-sftp = [
+ "dep:openssh",
+ "dep:openssh-sftp-client",
+ "dep:bb8",
+ "dep:owning_ref",
+]
services-sled = ["dep:sled"]
services-supabase = []
services-wasabi = [
@@ -176,7 +182,10 @@ metrics = { version = "0.20", optional = true }
minitrace = { version = "0.4.0", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
once_cell = "1"
+openssh = { version = "0.9.9", optional = true }
+openssh-sftp-client = { version = "0.12.2", optional = true }
opentelemetry = { version = "0.19.0", optional = true }
+owning_ref = { version = "0.4.1", optional = true }
parking_lot = "0.12"
percent-encoding = "2"
pin-project = "1"
diff --git a/core/README.md b/core/README.md
index 8bbce48e..ca758fa8 100644
--- a/core/README.md
+++ b/core/README.md
@@ -36,6 +36,7 @@
- [redis](https://docs.rs/opendal/latest/opendal/services/struct.Redis.html):
[Redis](https://redis.io/) services support.
-
[rocksdb](https://docs.rs/opendal/latest/opendal/services/struct.Rocksdb.html):
[RocksDB](http://rocksdb.org/) services support.
- [s3](https://docs.rs/opendal/latest/opendal/services/struct.S3.html): [AWS
S3](https://aws.amazon.com/s3/) alike services.
+- [sftp](https://docs.rs/opendal/latest/opendal/services/struct.Sftp.html):
[SFTP](https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02)
services support.
-
[sled](https://docs.rs/opendal/latest/opendal/services/sled/struct.Sled.html):
[sled](https://crates.io/crates/sled) services support.
-
[webdav](https://docs.rs/opendal/latest/opendal/services/struct.Webdav.html):
[WebDAV](https://datatracker.ietf.org/doc/html/rfc4918) Service Support.
-
[webhdfs](https://docs.rs/opendal/latest/opendal/services/struct.Webhdfs.html):
[WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)
Service Support.
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 32338980..33ad34a6 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -60,10 +60,10 @@ use crate::*;
///
/// # Configuration
///
-/// - `endpoint`: set the endpoint for connection
+/// - `endpoint`: Set the endpoint for connection
/// - `root`: Set the work directory for backend
-/// - `credential`: login credentials
-/// - `tls`: tls mode
+/// - `user`: Set the login user
+/// - `password`: Set the login password
///
/// You can refer to [`FtpBuilder`]'s docs for more information
///
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 02621986..d08135b0 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -114,6 +114,11 @@ mod s3;
#[cfg(feature = "services-s3")]
pub use s3::S3;
+#[cfg(feature = "services-sftp")]
+mod sftp;
+#[cfg(feature = "services-sftp")]
+pub use sftp::Sftp;
+
#[cfg(feature = "services-sled")]
mod sled;
#[cfg(feature = "services-sled")]
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
new file mode 100644
index 00000000..4ae80d7f
--- /dev/null
+++ b/core/src/services/sftp/backend.rs
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::min;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use async_trait::async_trait;
+use bb8::PooledConnection;
+use futures::executor::block_on;
+use log::debug;
+use openssh::RemoteChild;
+use openssh::Session;
+use openssh::SessionBuilder;
+use openssh::Stdio;
+use openssh_sftp_client::Sftp;
+use owning_ref::OwningHandle;
+use tokio::sync::OnceCell;
+
+use super::error::is_not_found;
+use super::error::is_sftp_protocol_error;
+use super::error::SftpError;
+use super::pager::SftpPager;
+use super::utils::SftpReader;
+use super::writer::SftpWriter;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// SFTP services support. (only works on unix)
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [x] list
+/// - [ ] ~~scan~~
+/// - [ ] ~~presign~~
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `endpoint`: Set the endpoint for connection
+/// - `root`: Set the work directory for backend, default to `/home/$USER/`
+/// - `user`: Set the login user
+/// - `key`: Set the public key for login
+///
+/// It doesn't support password login, you can use public key instead.
+///
+/// You can refer to [`SftpBuilder`]'s docs for more information
+///
+/// # Example
+///
+/// ## Via Builder
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Ftp;
+/// use opendal::Object;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // create backend builder
+/// let mut builder = Sftp::default();
+///
+/// builder.endpoint("127.0.0.1").user("test").password("test");
+///
+/// let op: Operator = Operator::new(builder)?.finish();
+/// let _obj: Object = op.object("test_file");
+/// Ok(())
+/// }
+/// ```
+
+#[derive(Default)]
+pub struct SftpBuilder {
+ endpoint: Option<String>,
+ root: Option<String>,
+ user: Option<String>,
+ key: Option<String>,
+}
+
+impl Debug for SftpBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Builder")
+ .field("endpoint", &self.endpoint)
+ .field("root", &self.root)
+ .finish()
+ }
+}
+
+impl SftpBuilder {
+ /// set endpoint for sftp backend.
+ pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+ self.endpoint = if endpoint.is_empty() {
+ None
+ } else {
+ Some(endpoint.to_string())
+ };
+
+ self
+ }
+
+ /// set root path for sftp backend.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// set user for sftp backend.
+ pub fn user(&mut self, user: &str) -> &mut Self {
+ self.user = if user.is_empty() {
+ None
+ } else {
+ Some(user.to_string())
+ };
+
+ self
+ }
+
+ /// set key path for sftp backend.
+ pub fn key(&mut self, key: &str) -> &mut Self {
+ self.key = if key.is_empty() {
+ None
+ } else {
+ Some(key.to_string())
+ };
+
+ self
+ }
+}
+
+impl Builder for SftpBuilder {
+ const SCHEME: Scheme = Scheme::Sftp;
+ type Accessor = SftpBackend;
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("sftp backend build started: {:?}", &self);
+ let endpoint = match self.endpoint.clone() {
+ Some(v) => v,
+ None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint
is empty")),
+ };
+
+ let user = match self.user.clone() {
+ Some(v) => v,
+ None => return Err(Error::new(ErrorKind::ConfigInvalid, "user is
empty")),
+ };
+
+ let root = self
+ .root
+ .clone()
+ .map(|r| normalize_root(r.as_str()))
+ .unwrap_or(format!("/home/{}/", user));
+
+ debug!("sftp backend finished: {:?}", &self);
+
+ Ok(SftpBackend {
+ endpoint,
+ root,
+ user,
+ key: self.key.clone(),
+ sftp: OnceCell::new(),
+ })
+ }
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = SftpBuilder::default();
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("endpoint").map(|v| builder.endpoint(v));
+ map.get("user").map(|v| builder.user(v));
+ map.get("key").map(|v| builder.key(v));
+
+ builder
+ }
+}
+
+#[derive(Clone)]
+pub struct Manager {
+ endpoint: String,
+ user: String,
+ key: Option<String>,
+}
+
+pub struct Connection {
+ // the remote child owns the ref to session, so we need to use owning
handle
+ // The session will only create one child, so we can make sure the child
can live
+ // as long as the session. (the session will be dropped when the
connection is dropped)
+ // Related: https://stackoverflow.com/a/47260399
+ child: OwningHandle<Box<Session>, Box<RemoteChild<'static>>>,
+ pub sftp: Sftp,
+}
+
+#[async_trait]
+impl bb8::ManageConnection for Manager {
+ type Connection = Connection;
+ type Error = SftpError;
+
+ async fn connect(&self) -> std::result::Result<Self::Connection,
Self::Error> {
+ let mut session = SessionBuilder::default();
+
+ session.user(self.user.clone());
+
+ if let Some(key) = &self.key {
+ session.keyfile(key);
+ }
+
+ let session = session.connect(self.endpoint.clone()).await?;
+
+ let sess = Box::new(session);
+ let mut oref = OwningHandle::new_with_fn(sess, unsafe {
+ |x| {
+ Box::new(
+ block_on(
+ (*x).subsystem("sftp")
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn(),
+ )
+ .unwrap(),
+ )
+ }
+ });
+
+ let sftp = Sftp::new(
+ oref.stdin().take().unwrap(),
+ oref.stdout().take().unwrap(),
+ Default::default(),
+ )
+ .await?;
+
+ Ok(Connection { child: oref, sftp })
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) ->
std::result::Result<(), Self::Error> {
+ conn.child.session().check().await?;
+ Ok(())
+ }
+
+ /// Always allow reuse conn.
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
+/// Backend is used to serve `Accessor` support for sftp.
+pub struct SftpBackend {
+ endpoint: String,
+ root: String,
+ user: String,
+ key: Option<String>,
+ sftp: OnceCell<bb8::Pool<Manager>>,
+}
+
+impl Debug for SftpBackend {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend").finish()
+ }
+}
+
+#[async_trait]
+impl Accessor for SftpBackend {
+ type Reader = SftpReader;
+ type BlockingReader = ();
+ type Writer = SftpWriter;
+ type BlockingWriter = ();
+ type Pager = SftpPager;
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_root(self.root.as_str())
+ .set_scheme(Scheme::Sftp)
+ .set_capability(Capability {
+ stat: true,
+ read: true,
+ write: true,
+ list: true,
+ list_with_limit: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
+ let client = self.sftp_connect().await?;
+ let mut fs = client.sftp.fs();
+ fs.set_cwd(self.root.clone());
+
+ let paths: Vec<&str> = path.split_inclusive('/').collect();
+ let mut current = self.root.clone();
+ for p in paths {
+ if p.is_empty() {
+ continue;
+ }
+
+ current.push_str(p);
+ let res = fs.create_dir(p).await;
+
+ if let Err(e) = res {
+ // ignore error if dir already exists
+ if !is_sftp_protocol_error(&e) {
+ return Err(e.into());
+ }
+ }
+ fs.set_cwd(current.clone());
+ }
+
+ return Ok(RpCreate::default());
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let client = self.sftp_connect().await?;
+
+ let mut fs = client.sftp.fs();
+ fs.set_cwd(self.root.clone());
+ let path = fs.canonicalize(path).await?;
+
+ let mut file = client.sftp.open(path.as_path()).await?;
+
+ let total_length = file.metadata().await?.len().ok_or(Error::new(
+ ErrorKind::NotFound,
+ format!("file not found: {}", path.to_str().unwrap()).as_str(),
+ ))?;
+
+ let br = args.range();
+ let (start, end) = match (br.offset(), br.size()) {
+ // Read a specific range.
+ (Some(offset), Some(size)) => (offset, min(offset + size,
total_length)),
+ // Read from offset.
+ (Some(offset), None) => (offset, total_length),
+ // Read the last size bytes.
+ (None, Some(size)) => (
+ if total_length > size {
+ total_length - size
+ } else {
+ 0
+ },
+ total_length,
+ ),
+ // Read the whole file.
+ (None, None) => (0, total_length),
+ };
+
+ let r = SftpReader::new(self.sftp_connect_owned().await?, path, start,
end).await?;
+
+ Ok((RpRead::new(end - start), r))
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
+ if let Some((dir, _)) = path.rsplit_once('/') {
+ self.create_dir(dir, OpCreate::default()).await?;
+ }
+
+ let path = format!("{}{}", self.root, path);
+
+ Ok((
+ RpWrite::new(),
+ SftpWriter::new(self.sftp_connect_owned().await?, path),
+ ))
+ }
+
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let client = self.sftp_connect().await?;
+ let mut fs = client.sftp.fs();
+ fs.set_cwd(self.root.clone());
+
+ let meta = fs.metadata(path).await?;
+
+ Ok(RpStat::new(meta.into()))
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let client = self.sftp_connect().await?;
+
+ let mut fs = client.sftp.fs();
+ fs.set_cwd(self.root.clone());
+
+ if path.ends_with('/') {
+ let file_path = format!("./{}", path);
+ let dir = match fs.open_dir(file_path.clone()).await {
+ Ok(dir) => dir,
+ Err(e) => {
+ if is_not_found(&e) {
+ return Ok(RpDelete::default());
+ } else {
+ return Err(e.into());
+ }
+ }
+ }
+ .read_dir()
+ .await?;
+
+ for file in &dir {
+ let file_name = file.filename().to_str().unwrap();
+ if file_name == "." || file_name == ".." {
+ continue;
+ }
+ let file_path = format!("{}{}", path, file_name);
+ self.delete(file_path.as_str(), OpDelete::default()).await?;
+ }
+
+ match fs.remove_dir(path).await {
+ Err(e) if !is_not_found(&e) => {
+ return Err(e.into());
+ }
+ _ => {}
+ }
+ } else {
+ match fs.remove_file(path).await {
+ Err(e) if !is_not_found(&e) => {
+ return Err(e.into());
+ }
+ _ => {}
+ }
+ };
+
+ Ok(RpDelete::default())
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ let client = self.sftp_connect().await?;
+ let mut fs = client.sftp.fs();
+ fs.set_cwd(self.root.clone());
+
+ let file_path = format!("./{}", path);
+
+ let mut dir = match fs.open_dir(file_path.clone()).await {
+ Ok(dir) => dir,
+ Err(e) => {
+ if is_not_found(&e) {
+ return Ok((RpList::default(), SftpPager::empty()));
+ } else {
+ return Err(e.into());
+ }
+ }
+ };
+ let dir = dir.read_dir().await?;
+
+ Ok((
+ RpList::default(),
+ SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ ))
+ }
+}
+
+impl SftpBackend {
+ async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
+ let pool = self
+ .sftp
+ .get_or_try_init(|| async {
+ let manager = Manager {
+ endpoint: self.endpoint.clone(),
+ user: self.user.clone(),
+ key: self.key.clone(),
+ };
+
+ bb8::Pool::builder().max_size(10).build(manager).await
+ })
+ .await?;
+
+ Ok(pool)
+ }
+
+ pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
+ let conn = self.pool().await?.get().await?;
+
+ Ok(conn)
+ }
+
+ pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
+ let conn = self.pool().await?.get_owned().await?;
+
+ Ok(conn)
+ }
+}
diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs
new file mode 100644
index 00000000..0e7dc2ef
--- /dev/null
+++ b/core/src/services/sftp/error.rs
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bb8::RunError;
+use openssh::Error as SshError;
+use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError};
+
+use crate::{Error, ErrorKind};
+
+#[derive(Debug)]
+pub enum SftpError {
+ SftpClientError(SftpClientError),
+ SshError(SshError),
+}
+
+impl From<SftpClientError> for Error {
+ fn from(e: SftpClientError) -> Self {
+ let kind = match &e {
+ SftpClientError::UnsupportedSftpProtocol { version: _ } =>
ErrorKind::Unsupported,
+ SftpClientError::SftpError(kind, _msg) => match kind {
+ SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
+ SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
+ SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
+ _ => ErrorKind::Unexpected,
+ },
+ _ => ErrorKind::Unexpected,
+ };
+
+ Error::new(kind, "sftp error").set_source(e)
+ }
+}
+
+impl From<SshError> for Error {
+ fn from(e: SshError) -> Self {
+ Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
+ }
+}
+
+impl From<SftpClientError> for SftpError {
+ fn from(e: SftpClientError) -> Self {
+ SftpError::SftpClientError(e)
+ }
+}
+
+impl From<SshError> for SftpError {
+ fn from(e: SshError) -> Self {
+ SftpError::SshError(e)
+ }
+}
+
+impl From<SftpError> for Error {
+ fn from(e: SftpError) -> Self {
+ match e {
+ SftpError::SftpClientError(e) => e.into(),
+ SftpError::SshError(e) => e.into(),
+ }
+ }
+}
+
+impl From<RunError<SftpError>> for Error {
+ fn from(e: RunError<SftpError>) -> Self {
+ match e {
+ RunError::User(err) => err.into(),
+ RunError::TimedOut => {
+ Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ }
+ }
+ }
+}
+
+pub(super) fn is_not_found(e: &SftpClientError) -> bool {
+ matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _))
+}
+
+pub(super) fn is_sftp_protocol_error(e: &SftpClientError) -> bool {
+ matches!(e, SftpClientError::SftpError(_, _))
+}
diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs
new file mode 100644
index 00000000..d854429b
--- /dev/null
+++ b/core/src/services/sftp/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub use backend::SftpBuilder as Sftp;
+
+mod backend;
+mod error;
+mod pager;
+mod utils;
+mod writer;
diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs
new file mode 100644
index 00000000..e6b557c2
--- /dev/null
+++ b/core/src/services/sftp/pager.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use openssh_sftp_client::fs::DirEntry;
+
+use crate::raw::oio;
+use crate::Result;
+
+pub struct SftpPager {
+ dir: Box<[DirEntry]>,
+ path: String,
+ limit: Option<usize>,
+ complete: bool,
+}
+
+impl SftpPager {
+ pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) ->
Self {
+ Self {
+ dir,
+ path,
+ limit,
+ complete: false,
+ }
+ }
+
+ pub fn empty() -> Self {
+ Self {
+ dir: Box::new([]),
+ path: String::new(),
+ limit: None,
+ complete: true,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for SftpPager {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ if self.complete {
+ return Ok(None);
+ }
+
+ // when listing the root directory, the prefix should be empty
+ if self.path == "/" {
+ self.path = "".to_owned();
+ }
+
+ let iter = self
+ .dir
+ .iter()
+ .filter(|e| {
+ // filter out "." and ".."
+ e.filename().to_str().unwrap() != "." &&
e.filename().to_str().unwrap() != ".."
+ })
+ .map(|e| map_entry(self.path.clone(), e.clone()));
+
+ let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
+ iter.take(limit).collect()
+ } else {
+ iter.collect()
+ };
+
+ self.complete = true;
+
+ if v.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(v))
+ }
+ }
+}
+
+fn map_entry(prefix: String, value: DirEntry) -> oio::Entry {
+ let path = format!(
+ "{}{}{}",
+ prefix,
+ value.filename().to_str().unwrap(),
+ if value.file_type().unwrap().is_dir() {
+ "/"
+ } else {
+ ""
+ }
+ );
+
+ oio::Entry::new(path.as_str(), value.metadata().into())
+}
diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs
new file mode 100644
index 00000000..91387aad
--- /dev/null
+++ b/core/src/services/sftp/utils.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::SeekFrom;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use async_compat::Compat;
+use bb8::PooledConnection;
+use futures::executor::block_on;
+use openssh_sftp_client::file::TokioCompatFile;
+use openssh_sftp_client::metadata::MetaData as SftpMeta;
+use owning_ref::OwningHandle;
+
+use super::backend::Manager;
+use crate::raw::oio;
+use crate::raw::oio::into_reader::FdReader;
+use crate::raw::oio::ReadExt;
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
+
+pub struct SftpReader {
+ // similar situation to connection struct
+ // We can make sure the file can live as long as the connection.
+ file: OwningHandle<
+ Box<PooledConnection<'static, Manager>>,
+ Box<FdReader<Compat<TokioCompatFile<'static>>>>,
+ >,
+}
+
+impl SftpReader {
+ pub async fn new(
+ conn: PooledConnection<'static, Manager>,
+ path: PathBuf,
+ start: u64,
+ end: u64,
+ ) -> Result<Self> {
+ let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe
{
+ let file = block_on((*conn).sftp.open(path)).unwrap();
+ let f = Compat::new(TokioCompatFile::from(file));
+ Box::new(oio::into_reader::from_fd(f, start, end))
+ });
+
+ file.seek(SeekFrom::Start(0)).await?;
+
+ Ok(SftpReader { file })
+ }
+}
+
+impl oio::Read for SftpReader {
+ fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ Pin::new(&mut *self.file).poll_read(cx, buf)
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
+ Pin::new(&mut *self.file).poll_seek(cx, pos)
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<bytes::Bytes>>> {
+ Pin::new(&mut *self.file).poll_next(cx)
+ }
+}
+
+impl From<SftpMeta> for Metadata {
+ fn from(meta: SftpMeta) -> Self {
+ let mode = meta
+ .file_type()
+ .map(|filetype| {
+ if filetype.is_file() {
+ EntryMode::FILE
+ } else if filetype.is_dir() {
+ EntryMode::DIR
+ } else {
+ EntryMode::Unknown
+ }
+ })
+ .unwrap_or(EntryMode::Unknown);
+
+ let mut metadata = Metadata::new(mode);
+
+ if let Some(size) = meta.len() {
+ metadata.set_content_length(size);
+ }
+
+ if let Some(modified) = meta.modified() {
+ metadata.set_last_modified(modified.as_system_time().into());
+ }
+
+ metadata
+ }
+}
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
new file mode 100644
index 00000000..2fa5215d
--- /dev/null
+++ b/core/src/services/sftp/writer.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bb8::PooledConnection;
+use bytes::Bytes;
+
+use super::backend::Manager;
+use crate::raw::oio;
+use crate::{Error, ErrorKind, Result};
+
+pub struct SftpWriter {
+ conn: PooledConnection<'static, Manager>,
+ path: String,
+}
+
+impl SftpWriter {
+ pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self
{
+ SftpWriter { conn, path }
+ }
+}
+
+#[async_trait]
+impl oio::Write for SftpWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let mut file = self.conn.sftp.create(&self.path).await?;
+
+ file.write_all(&bs).await?;
+
+ Ok(())
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "SFTP does not support aborting writes",
+ ))
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 5019d22e..7e52ab84 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
Rocksdb,
/// [s3][crate::services::S3]: AWS S3 alike services.
S3,
+ /// [sftp][crate::services::Sftp]: SFTP services
+ Sftp,
/// [sled][crate::services::Sled]: Sled services
Sled,
/// [Supabase][crate::services::Supabase]: Supabase storage service
@@ -166,6 +168,7 @@ impl From<Scheme> for &'static str {
Scheme::Redis => "redis",
Scheme::Rocksdb => "rocksdb",
Scheme::S3 => "s3",
+ Scheme::Sftp => "sftp",
Scheme::Sled => "sled",
Scheme::Supabase => "supabase",
Scheme::Oss => "oss",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index c9038d05..991b0a6c 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -110,6 +110,7 @@ cfg_if::cfg_if! { if #[cfg(feature = "services-redis")] {
behavior_tests!(Redis)
cfg_if::cfg_if! { if #[cfg(feature = "services-rocksdb")] {
behavior_tests!(Rocksdb); }}
behavior_tests!(Oss);
behavior_tests!(S3);
+cfg_if::cfg_if! { if #[cfg(feature = "services-sftp")] {
behavior_tests!(Sftp); }}
cfg_if::cfg_if! { if #[cfg(feature = "services-sled")] {
behavior_tests!(Sled); }}
behavior_tests!(Webdav);
behavior_tests!(Webhdfs);