This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f12b05c1 feat(services/sftp): Add read/write/stat support for sftp 
(#2186)
f12b05c1 is described below

commit f12b05c1a4f808ca10f986057a9f430489cdabfb
Author: silver-ymz <[email protected]>
AuthorDate: Mon May 1 18:35:00 2023 +0800

    feat(services/sftp): Add read/write/stat support for sftp (#2186)
    
    * add sftp basic skeleton, exist life cycle problem
    
    * complete sftp basic skeleton
    
    * fix some buf && pass alomost all tests
    
    * typo
    
    * add some comments
    
    * fix format
    
    * fix clippy warning
---
 .env.example                      |   6 +
 Cargo.lock                        | 269 +++++++++++++++++++-
 core/Cargo.toml                   |   9 +
 core/README.md                    |   1 +
 core/src/services/ftp/backend.rs  |   6 +-
 core/src/services/mod.rs          |   5 +
 core/src/services/sftp/backend.rs | 507 ++++++++++++++++++++++++++++++++++++++
 core/src/services/sftp/error.rs   |  91 +++++++
 core/src/services/sftp/mod.rs     |  24 ++
 core/src/services/sftp/pager.rs   | 101 ++++++++
 core/src/services/sftp/utils.rs   | 108 ++++++++
 core/src/services/sftp/writer.rs  |  57 +++++
 core/src/types/scheme.rs          |   3 +
 core/tests/behavior/main.rs       |   1 +
 14 files changed, 1179 insertions(+), 9 deletions(-)

diff --git a/.env.example b/.env.example
index 2bc8a6b7..2b83073a 100644
--- a/.env.example
+++ b/.env.example
@@ -61,6 +61,12 @@ OPENDAL_REDIS_DB=0
 OPENDAL_ROCKSDB_TEST=false
 OPENDAL_ROCKSDB_DATADIR=/path/to/database
 OPENDAL_ROCKSDB_ROOT=/path/to/root
+# sftp
+OPENDAL_SFTP_TEST=false
+OPENDAL_SFTP_ENDPOINT=ssh://<endpoint>
+OPENDAL_SFTP_ROOT=/path/to/dir
+OPENDAL_SFTP_USER=<user>
+OPENDAL_SFTP_KEY=<key_path>
 # sled
 OPENDAL_SLED_TEST=false
 OPENDAL_SLED_DATADIR=/path/to/database
diff --git a/Cargo.lock b/Cargo.lock
index 15b2aa90..ae8f0cc8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -104,6 +104,12 @@ version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
 
+[[package]]
+name = "array-init"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3d62b7694a562cdf5a74227903507c56ab2cc8bdd1f781ed5cb4cf9c9f810bfc"
+
 [[package]]
 name = "assert-json-diff"
 version = "2.0.2"
@@ -312,6 +318,22 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
 
+[[package]]
+name = "awaitable"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "70af449c9a763cb655c6a1e5338b42d99c67190824ff90658c1e30be844c0775"
+dependencies = [
+ "awaitable-error",
+ "cfg-if",
+]
+
+[[package]]
+name = "awaitable-error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a"
+
 [[package]]
 name = "backon"
 version = "0.4.0"
@@ -716,12 +738,31 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "concurrent_arena"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "24bfeb060a299f86521bb3940344800fc861cc506356e44a273a42cb552afde5"
+dependencies = [
+ "arc-swap",
+ "array-init",
+ "const_fn_assert",
+ "parking_lot 0.12.1",
+ "triomphe",
+]
+
 [[package]]
 name = "const-oid"
 version = "0.9.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913"
 
+[[package]]
+name = "const_fn_assert"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "27d614f23f34f7b5165a77dc1591f497e2518f9cec4b4f4b92bfc4dc6cf7a190"
+
 [[package]]
 name = "convert_case"
 version = "0.6.0"
@@ -994,6 +1035,17 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "derive_destructure2"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "35cb7e5875e1028a73e551747d6d0118f25c3d6dbba2dadf97cc0f4d0c53f2f5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "diff"
 version = "0.1.13"
@@ -1018,13 +1070,33 @@ dependencies = [
  "subtle",
 ]
 
+[[package]]
+name = "dirs"
+version = "4.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
+dependencies = [
+ "dirs-sys 0.3.7",
+]
+
 [[package]]
 name = "dirs"
 version = "5.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd"
 dependencies = [
- "dirs-sys",
+ "dirs-sys 0.4.0",
+]
+
+[[package]]
+name = "dirs-sys"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
+dependencies = [
+ "libc",
+ "redox_users",
+ "winapi",
 ]
 
 [[package]]
@@ -2301,6 +2373,17 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "num-derive"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "num-integer"
 version = "0.1.45"
@@ -2348,7 +2431,7 @@ version = "0.33.2"
 dependencies = [
  "anyhow",
  "clap 4.2.5",
- "dirs",
+ "dirs 5.0.0",
  "env_logger",
  "futures",
  "log",
@@ -2398,7 +2481,7 @@ dependencies = [
  "anyhow",
  "assert_cmd",
  "clap 4.2.5",
- "dirs",
+ "dirs 5.0.0",
  "env_logger",
  "futures",
  "log",
@@ -2453,8 +2536,11 @@ dependencies = [
  "minitrace",
  "moka",
  "once_cell",
+ "openssh",
+ "openssh-sftp-client",
  "opentelemetry 0.19.0",
  "opentelemetry-jaeger",
+ "owning_ref",
  "parking_lot 0.12.1",
  "paste",
  "percent-encoding",
@@ -2535,6 +2621,95 @@ dependencies = [
  "rb-sys-env",
 ]
 
+[[package]]
+name = "openssh"
+version = "0.9.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8ca6c277973fb549b36dd8980941b5ea3ecebea026f5b1f0060acde74d893c22"
+dependencies = [
+ "dirs 4.0.0",
+ "libc",
+ "once_cell",
+ "shell-escape",
+ "tempfile",
+ "thiserror",
+ "tokio",
+ "tokio-pipe",
+]
+
+[[package]]
+name = "openssh-sftp-client"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620"
+dependencies = [
+ "bytes",
+ "derive_destructure2",
+ "once_cell",
+ "openssh-sftp-client-lowlevel",
+ "openssh-sftp-error",
+ "scopeguard",
+ "tokio",
+ "tokio-io-utility",
+ "tokio-util",
+]
+
+[[package]]
+name = "openssh-sftp-client-lowlevel"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa"
+dependencies = [
+ "awaitable",
+ "bytes",
+ "concurrent_arena",
+ "derive_destructure2",
+ "openssh-sftp-error",
+ "openssh-sftp-protocol",
+ "pin-project",
+ "tokio",
+ "tokio-io-utility",
+]
+
+[[package]]
+name = "openssh-sftp-error"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486"
+dependencies = [
+ "awaitable-error",
+ "openssh-sftp-protocol-error",
+ "ssh_format_error",
+ "thiserror",
+ "tokio",
+]
+
+[[package]]
+name = "openssh-sftp-protocol"
+version = "0.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bf38532d784978966f95d241226223823f351d5bb2a4bebcf6b20b9cb1e393e0"
+dependencies = [
+ "bitflags 2.0.2",
+ "num-derive",
+ "num-traits",
+ "openssh-sftp-protocol-error",
+ "serde",
+ "ssh_format",
+ "vec-strings",
+]
+
+[[package]]
+name = "openssh-sftp-protocol-error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0719269eb3f037866ae07ec89cb44ed2c1d63b72b2390cef8e1aa3016a956ff8"
+dependencies = [
+ "serde",
+ "thiserror",
+ "vec-strings",
+]
+
 [[package]]
 name = "openssl"
 version = "0.10.47"
@@ -2717,6 +2892,15 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
 
+[[package]]
+name = "owning_ref"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
+dependencies = [
+ "stable_deref_trait",
+]
+
 [[package]]
 name = "parking"
 version = "2.0.0"
@@ -3425,7 +3609,7 @@ dependencies = [
  "base64 0.21.0",
  "bytes",
  "chrono",
- "dirs",
+ "dirs 5.0.0",
  "form_urlencoded",
  "hex",
  "hmac",
@@ -3841,6 +4025,12 @@ dependencies = [
  "lazy_static",
 ]
 
+[[package]]
+name = "shell-escape"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
+
 [[package]]
 name = "shell-words"
 version = "1.1.0"
@@ -3993,6 +4183,32 @@ dependencies = [
  "der",
 ]
 
+[[package]]
+name = "ssh_format"
+version = "0.14.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "24ab31081d1c9097c327ec23550858cb5ffb4af6b866c1ef4d728455f01f3304"
+dependencies = [
+ "bytes",
+ "serde",
+ "ssh_format_error",
+]
+
+[[package]]
+name = "ssh_format_error"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "be3c6519de7ca611f71ef7e8a56eb57aa1c818fecb5242d0a0f39c83776c210c"
+dependencies = [
+ "serde",
+]
+
+[[package]]
+name = "stable_deref_trait"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
+
 [[package]]
 name = "strsim"
 version = "0.10.0"
@@ -4088,6 +4304,12 @@ version = "0.16.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
 
+[[package]]
+name = "thin-vec"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8"
+
 [[package]]
 name = "thiserror"
 version = "1.0.40"
@@ -4222,6 +4444,16 @@ dependencies = [
  "windows-sys 0.45.0",
 ]
 
+[[package]]
+name = "tokio-io-utility"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8d672654d175710e52c7c41f6aec77c62b3c0954e2a7ebce9049d1e94ed7c263"
+dependencies = [
+ "bytes",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-macros"
 version = "2.0.0"
@@ -4243,6 +4475,16 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-pipe"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-rustls"
 version = "0.23.4"
@@ -4256,9 +4498,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-util"
-version = "0.7.7"
+version = "0.7.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
+checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
 dependencies = [
  "bytes",
  "futures-core",
@@ -4398,6 +4640,11 @@ name = "triomphe"
 version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
+dependencies = [
+ "arc-swap",
+ "serde",
+ "stable_deref_trait",
+]
 
 [[package]]
 name = "trust-dns-proto"
@@ -4566,6 +4813,16 @@ version = "0.2.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
 
+[[package]]
+name = "vec-strings"
+version = "0.4.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c8509489e2a7ee219522238ad45fd370bec6808811ac15ac6b07453804e77659"
+dependencies = [
+ "serde",
+ "thin-vec",
+]
+
 [[package]]
 name = "version_check"
 version = "0.9.4"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b88de207..5dc6170c 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -135,6 +135,12 @@ services-s3 = [
   "reqsign?/services-aws",
   "reqsign?/reqwest_request",
 ]
+services-sftp = [
+  "dep:openssh",
+  "dep:openssh-sftp-client",
+  "dep:bb8",
+  "dep:owning_ref",
+]
 services-sled = ["dep:sled"]
 services-supabase = []
 services-wasabi = [
@@ -176,7 +182,10 @@ metrics = { version = "0.20", optional = true }
 minitrace = { version = "0.4.0", optional = true }
 moka = { version = "0.10", optional = true, features = ["future"] }
 once_cell = "1"
+openssh = { version = "0.9.9", optional = true }
+openssh-sftp-client = { version = "0.12.2", optional = true }
 opentelemetry = { version = "0.19.0", optional = true }
+owning_ref = { version = "0.4.1", optional = true }
 parking_lot = "0.12"
 percent-encoding = "2"
 pin-project = "1"
diff --git a/core/README.md b/core/README.md
index 8bbce48e..ca758fa8 100644
--- a/core/README.md
+++ b/core/README.md
@@ -36,6 +36,7 @@
 - [redis](https://docs.rs/opendal/latest/opendal/services/struct.Redis.html): 
[Redis](https://redis.io/) services support.
 - 
[rocksdb](https://docs.rs/opendal/latest/opendal/services/struct.Rocksdb.html): 
[RocksDB](http://rocksdb.org/) services support.
 - [s3](https://docs.rs/opendal/latest/opendal/services/struct.S3.html): [AWS 
S3](https://aws.amazon.com/s3/) alike services.
+- [sftp](https://docs.rs/opendal/latest/opendal/services/struct.Sftp.html): 
[SFTP](https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02) 
services support.
 - 
[sled](https://docs.rs/opendal/latest/opendal/services/sled/struct.Sled.html): 
[sled](https://crates.io/crates/sled) services support.
 - 
[webdav](https://docs.rs/opendal/latest/opendal/services/struct.Webdav.html): 
[WebDAV](https://datatracker.ietf.org/doc/html/rfc4918) Service Support.
 - 
[webhdfs](https://docs.rs/opendal/latest/opendal/services/struct.Webhdfs.html): 
[WebHDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html)
 Service Support.
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 32338980..33ad34a6 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -60,10 +60,10 @@ use crate::*;
 ///
 /// # Configuration
 ///
-/// - `endpoint`: set the endpoint for connection
+/// - `endpoint`: Set the endpoint for connection
 /// - `root`: Set the work directory for backend
-/// - `credential`:  login credentials
-/// - `tls`: tls mode
+/// - `user`: Set the login user
+/// - `password`: Set the login password
 ///
 /// You can refer to [`FtpBuilder`]'s docs for more information
 ///
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 02621986..d08135b0 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -114,6 +114,11 @@ mod s3;
 #[cfg(feature = "services-s3")]
 pub use s3::S3;
 
+#[cfg(feature = "services-sftp")]
+mod sftp;
+#[cfg(feature = "services-sftp")]
+pub use sftp::Sftp;
+
 #[cfg(feature = "services-sled")]
 mod sled;
 #[cfg(feature = "services-sled")]
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
new file mode 100644
index 00000000..4ae80d7f
--- /dev/null
+++ b/core/src/services/sftp/backend.rs
@@ -0,0 +1,507 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::min;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use async_trait::async_trait;
+use bb8::PooledConnection;
+use futures::executor::block_on;
+use log::debug;
+use openssh::RemoteChild;
+use openssh::Session;
+use openssh::SessionBuilder;
+use openssh::Stdio;
+use openssh_sftp_client::Sftp;
+use owning_ref::OwningHandle;
+use tokio::sync::OnceCell;
+
+use super::error::is_not_found;
+use super::error::is_sftp_protocol_error;
+use super::error::SftpError;
+use super::pager::SftpPager;
+use super::utils::SftpReader;
+use super::writer::SftpWriter;
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// SFTP services support. (only works on unix)
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [x] list
+/// - [ ] ~~scan~~
+/// - [ ] ~~presign~~
+/// - [ ] blocking
+///
+/// # Configuration
+///
+/// - `endpoint`: Set the endpoint for connection
+/// - `root`: Set the work directory for backend, default to `/home/$USER/`
+/// - `user`: Set the login user
+/// - `key`: Set the public key for login
+///
+/// It doesn't support password login, you can use public key instead.
+///
+/// You can refer to [`SftpBuilder`]'s docs for more information
+///
+/// # Example
+///
+/// ## Via Builder
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Ftp;
+/// use opendal::Object;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // create backend builder
+///     let mut builder = Sftp::default();
+///
+///     builder.endpoint("127.0.0.1").user("test").password("test");
+///
+///     let op: Operator = Operator::new(builder)?.finish();
+///     let _obj: Object = op.object("test_file");
+///     Ok(())
+/// }
+/// ```
+
+#[derive(Default)]
+pub struct SftpBuilder {
+    endpoint: Option<String>,
+    root: Option<String>,
+    user: Option<String>,
+    key: Option<String>,
+}
+
+impl Debug for SftpBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Builder")
+            .field("endpoint", &self.endpoint)
+            .field("root", &self.root)
+            .finish()
+    }
+}
+
+impl SftpBuilder {
+    /// set endpoint for sftp backend.
+    pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
+        self.endpoint = if endpoint.is_empty() {
+            None
+        } else {
+            Some(endpoint.to_string())
+        };
+
+        self
+    }
+
+    /// set root path for sftp backend.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// set user for sftp backend.
+    pub fn user(&mut self, user: &str) -> &mut Self {
+        self.user = if user.is_empty() {
+            None
+        } else {
+            Some(user.to_string())
+        };
+
+        self
+    }
+
+    /// set key path for sftp backend.
+    pub fn key(&mut self, key: &str) -> &mut Self {
+        self.key = if key.is_empty() {
+            None
+        } else {
+            Some(key.to_string())
+        };
+
+        self
+    }
+}
+
+impl Builder for SftpBuilder {
+    const SCHEME: Scheme = Scheme::Sftp;
+    type Accessor = SftpBackend;
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("sftp backend build started: {:?}", &self);
+        let endpoint = match self.endpoint.clone() {
+            Some(v) => v,
+            None => return Err(Error::new(ErrorKind::ConfigInvalid, "endpoint 
is empty")),
+        };
+
+        let user = match self.user.clone() {
+            Some(v) => v,
+            None => return Err(Error::new(ErrorKind::ConfigInvalid, "user is 
empty")),
+        };
+
+        let root = self
+            .root
+            .clone()
+            .map(|r| normalize_root(r.as_str()))
+            .unwrap_or(format!("/home/{}/", user));
+
+        debug!("sftp backend finished: {:?}", &self);
+
+        Ok(SftpBackend {
+            endpoint,
+            root,
+            user,
+            key: self.key.clone(),
+            sftp: OnceCell::new(),
+        })
+    }
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        let mut builder = SftpBuilder::default();
+
+        map.get("root").map(|v| builder.root(v));
+        map.get("endpoint").map(|v| builder.endpoint(v));
+        map.get("user").map(|v| builder.user(v));
+        map.get("key").map(|v| builder.key(v));
+
+        builder
+    }
+}
+
+#[derive(Clone)]
+pub struct Manager {
+    endpoint: String,
+    user: String,
+    key: Option<String>,
+}
+
+pub struct Connection {
+    // the remote child owns the ref to session, so we need to use owning 
handle
+    // The session will only create one child, so we can make sure the child 
can live
+    // as long as the session. (the session will be dropped when the 
connection is dropped)
+    // Related: https://stackoverflow.com/a/47260399
+    child: OwningHandle<Box<Session>, Box<RemoteChild<'static>>>,
+    pub sftp: Sftp,
+}
+
+#[async_trait]
+impl bb8::ManageConnection for Manager {
+    type Connection = Connection;
+    type Error = SftpError;
+
+    async fn connect(&self) -> std::result::Result<Self::Connection, 
Self::Error> {
+        let mut session = SessionBuilder::default();
+
+        session.user(self.user.clone());
+
+        if let Some(key) = &self.key {
+            session.keyfile(key);
+        }
+
+        let session = session.connect(self.endpoint.clone()).await?;
+
+        let sess = Box::new(session);
+        let mut oref = OwningHandle::new_with_fn(sess, unsafe {
+            |x| {
+                Box::new(
+                    block_on(
+                        (*x).subsystem("sftp")
+                            .stdin(Stdio::piped())
+                            .stdout(Stdio::piped())
+                            .spawn(),
+                    )
+                    .unwrap(),
+                )
+            }
+        });
+
+        let sftp = Sftp::new(
+            oref.stdin().take().unwrap(),
+            oref.stdout().take().unwrap(),
+            Default::default(),
+        )
+        .await?;
+
+        Ok(Connection { child: oref, sftp })
+    }
+
+    async fn is_valid(&self, conn: &mut Self::Connection) -> 
std::result::Result<(), Self::Error> {
+        conn.child.session().check().await?;
+        Ok(())
+    }
+
+    /// Always allow reuse conn.
+    fn has_broken(&self, _: &mut Self::Connection) -> bool {
+        false
+    }
+}
+
+/// Backend is used to serve `Accessor` support for sftp.
+pub struct SftpBackend {
+    endpoint: String,
+    root: String,
+    user: String,
+    key: Option<String>,
+    sftp: OnceCell<bb8::Pool<Manager>>,
+}
+
+impl Debug for SftpBackend {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend").finish()
+    }
+}
+
+#[async_trait]
+impl Accessor for SftpBackend {
+    type Reader = SftpReader;
+    type BlockingReader = ();
+    type Writer = SftpWriter;
+    type BlockingWriter = ();
+    type Pager = SftpPager;
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_root(self.root.as_str())
+            .set_scheme(Scheme::Sftp)
+            .set_capability(Capability {
+                stat: true,
+                read: true,
+                write: true,
+                list: true,
+                list_with_limit: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
+        let client = self.sftp_connect().await?;
+        let mut fs = client.sftp.fs();
+        fs.set_cwd(self.root.clone());
+
+        let paths: Vec<&str> = path.split_inclusive('/').collect();
+        let mut current = self.root.clone();
+        for p in paths {
+            if p.is_empty() {
+                continue;
+            }
+
+            current.push_str(p);
+            let res = fs.create_dir(p).await;
+
+            if let Err(e) = res {
+                // ignore error if dir already exists
+                if !is_sftp_protocol_error(&e) {
+                    return Err(e.into());
+                }
+            }
+            fs.set_cwd(current.clone());
+        }
+
+        return Ok(RpCreate::default());
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let client = self.sftp_connect().await?;
+
+        let mut fs = client.sftp.fs();
+        fs.set_cwd(self.root.clone());
+        let path = fs.canonicalize(path).await?;
+
+        let mut file = client.sftp.open(path.as_path()).await?;
+
+        let total_length = file.metadata().await?.len().ok_or(Error::new(
+            ErrorKind::NotFound,
+            format!("file not found: {}", path.to_str().unwrap()).as_str(),
+        ))?;
+
+        let br = args.range();
+        let (start, end) = match (br.offset(), br.size()) {
+            // Read a specific range.
+            (Some(offset), Some(size)) => (offset, min(offset + size, 
total_length)),
+            // Read from offset.
+            (Some(offset), None) => (offset, total_length),
+            // Read the last size bytes.
+            (None, Some(size)) => (
+                if total_length > size {
+                    total_length - size
+                } else {
+                    0
+                },
+                total_length,
+            ),
+            // Read the whole file.
+            (None, None) => (0, total_length),
+        };
+
+        let r = SftpReader::new(self.sftp_connect_owned().await?, path, start, 
end).await?;
+
+        Ok((RpRead::new(end - start), r))
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
+        if let Some((dir, _)) = path.rsplit_once('/') {
+            self.create_dir(dir, OpCreate::default()).await?;
+        }
+
+        let path = format!("{}{}", self.root, path);
+
+        Ok((
+            RpWrite::new(),
+            SftpWriter::new(self.sftp_connect_owned().await?, path),
+        ))
+    }
+
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let client = self.sftp_connect().await?;
+        let mut fs = client.sftp.fs();
+        fs.set_cwd(self.root.clone());
+
+        let meta = fs.metadata(path).await?;
+
+        Ok(RpStat::new(meta.into()))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let client = self.sftp_connect().await?;
+
+        let mut fs = client.sftp.fs();
+        fs.set_cwd(self.root.clone());
+
+        if path.ends_with('/') {
+            let file_path = format!("./{}", path);
+            let dir = match fs.open_dir(file_path.clone()).await {
+                Ok(dir) => dir,
+                Err(e) => {
+                    if is_not_found(&e) {
+                        return Ok(RpDelete::default());
+                    } else {
+                        return Err(e.into());
+                    }
+                }
+            }
+            .read_dir()
+            .await?;
+
+            for file in &dir {
+                let file_name = file.filename().to_str().unwrap();
+                if file_name == "." || file_name == ".." {
+                    continue;
+                }
+                let file_path = format!("{}{}", path, file_name);
+                self.delete(file_path.as_str(), OpDelete::default()).await?;
+            }
+
+            match fs.remove_dir(path).await {
+                Err(e) if !is_not_found(&e) => {
+                    return Err(e.into());
+                }
+                _ => {}
+            }
+        } else {
+            match fs.remove_file(path).await {
+                Err(e) if !is_not_found(&e) => {
+                    return Err(e.into());
+                }
+                _ => {}
+            }
+        };
+
+        Ok(RpDelete::default())
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        let client = self.sftp_connect().await?;
+        let mut fs = client.sftp.fs();
+        fs.set_cwd(self.root.clone());
+
+        let file_path = format!("./{}", path);
+
+        let mut dir = match fs.open_dir(file_path.clone()).await {
+            Ok(dir) => dir,
+            Err(e) => {
+                if is_not_found(&e) {
+                    return Ok((RpList::default(), SftpPager::empty()));
+                } else {
+                    return Err(e.into());
+                }
+            }
+        };
+        let dir = dir.read_dir().await?;
+
+        Ok((
+            RpList::default(),
+            SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+        ))
+    }
+}
+
+impl SftpBackend {
+    async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
+        let pool = self
+            .sftp
+            .get_or_try_init(|| async {
+                let manager = Manager {
+                    endpoint: self.endpoint.clone(),
+                    user: self.user.clone(),
+                    key: self.key.clone(),
+                };
+
+                bb8::Pool::builder().max_size(10).build(manager).await
+            })
+            .await?;
+
+        Ok(pool)
+    }
+
+    pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
+        let conn = self.pool().await?.get().await?;
+
+        Ok(conn)
+    }
+
+    pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static, 
Manager>> {
+        let conn = self.pool().await?.get_owned().await?;
+
+        Ok(conn)
+    }
+}
diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs
new file mode 100644
index 00000000..0e7dc2ef
--- /dev/null
+++ b/core/src/services/sftp/error.rs
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bb8::RunError;
+use openssh::Error as SshError;
+use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError};
+
+use crate::{Error, ErrorKind};
+
+#[derive(Debug)]
+pub enum SftpError {
+    SftpClientError(SftpClientError),
+    SshError(SshError),
+}
+
+impl From<SftpClientError> for Error {
+    fn from(e: SftpClientError) -> Self {
+        let kind = match &e {
+            SftpClientError::UnsupportedSftpProtocol { version: _ } => 
ErrorKind::Unsupported,
+            SftpClientError::SftpError(kind, _msg) => match kind {
+                SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
+                SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
+                SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
+                _ => ErrorKind::Unexpected,
+            },
+            _ => ErrorKind::Unexpected,
+        };
+
+        Error::new(kind, "sftp error").set_source(e)
+    }
+}
+
+impl From<SshError> for Error {
+    fn from(e: SshError) -> Self {
+        Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
+    }
+}
+
+impl From<SftpClientError> for SftpError {
+    fn from(e: SftpClientError) -> Self {
+        SftpError::SftpClientError(e)
+    }
+}
+
+impl From<SshError> for SftpError {
+    fn from(e: SshError) -> Self {
+        SftpError::SshError(e)
+    }
+}
+
+impl From<SftpError> for Error {
+    fn from(e: SftpError) -> Self {
+        match e {
+            SftpError::SftpClientError(e) => e.into(),
+            SftpError::SshError(e) => e.into(),
+        }
+    }
+}
+
+impl From<RunError<SftpError>> for Error {
+    fn from(e: RunError<SftpError>) -> Self {
+        match e {
+            RunError::User(err) => err.into(),
+            RunError::TimedOut => {
+                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+            }
+        }
+    }
+}
+
+pub(super) fn is_not_found(e: &SftpClientError) -> bool {
+    matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _))
+}
+
+pub(super) fn is_sftp_protocol_error(e: &SftpClientError) -> bool {
+    matches!(e, SftpClientError::SftpError(_, _))
+}
diff --git a/core/src/services/sftp/mod.rs b/core/src/services/sftp/mod.rs
new file mode 100644
index 00000000..d854429b
--- /dev/null
+++ b/core/src/services/sftp/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub use backend::SftpBuilder as Sftp;
+
+mod backend;
+mod error;
+mod pager;
+mod utils;
+mod writer;
diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs
new file mode 100644
index 00000000..e6b557c2
--- /dev/null
+++ b/core/src/services/sftp/pager.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use openssh_sftp_client::fs::DirEntry;
+
+use crate::raw::oio;
+use crate::Result;
+
+pub struct SftpPager {
+    dir: Box<[DirEntry]>,
+    path: String,
+    limit: Option<usize>,
+    complete: bool,
+}
+
+impl SftpPager {
+    pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) -> 
Self {
+        Self {
+            dir,
+            path,
+            limit,
+            complete: false,
+        }
+    }
+
+    pub fn empty() -> Self {
+        Self {
+            dir: Box::new([]),
+            path: String::new(),
+            limit: None,
+            complete: true,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for SftpPager {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        if self.complete {
+            return Ok(None);
+        }
+
+        // when listing the root directory, the prefix should be empty
+        if self.path == "/" {
+            self.path = "".to_owned();
+        }
+
+        let iter = self
+            .dir
+            .iter()
+            .filter(|e| {
+                // filter out "." and ".."
+                e.filename().to_str().unwrap() != "." && 
e.filename().to_str().unwrap() != ".."
+            })
+            .map(|e| map_entry(self.path.clone(), e.clone()));
+
+        let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
+            iter.take(limit).collect()
+        } else {
+            iter.collect()
+        };
+
+        self.complete = true;
+
+        if v.is_empty() {
+            Ok(None)
+        } else {
+            Ok(Some(v))
+        }
+    }
+}
+
+fn map_entry(prefix: String, value: DirEntry) -> oio::Entry {
+    let path = format!(
+        "{}{}{}",
+        prefix,
+        value.filename().to_str().unwrap(),
+        if value.file_type().unwrap().is_dir() {
+            "/"
+        } else {
+            ""
+        }
+    );
+
+    oio::Entry::new(path.as_str(), value.metadata().into())
+}
diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs
new file mode 100644
index 00000000..91387aad
--- /dev/null
+++ b/core/src/services/sftp/utils.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::SeekFrom;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use async_compat::Compat;
+use bb8::PooledConnection;
+use futures::executor::block_on;
+use openssh_sftp_client::file::TokioCompatFile;
+use openssh_sftp_client::metadata::MetaData as SftpMeta;
+use owning_ref::OwningHandle;
+
+use super::backend::Manager;
+use crate::raw::oio;
+use crate::raw::oio::into_reader::FdReader;
+use crate::raw::oio::ReadExt;
+use crate::EntryMode;
+use crate::Metadata;
+use crate::Result;
+
+pub struct SftpReader {
+    // similar situation to connection struct
+    // We can make sure the file can live as long as the connection.
+    file: OwningHandle<
+        Box<PooledConnection<'static, Manager>>,
+        Box<FdReader<Compat<TokioCompatFile<'static>>>>,
+    >,
+}
+
+impl SftpReader {
+    pub async fn new(
+        conn: PooledConnection<'static, Manager>,
+        path: PathBuf,
+        start: u64,
+        end: u64,
+    ) -> Result<Self> {
+        let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe 
{
+            let file = block_on((*conn).sftp.open(path)).unwrap();
+            let f = Compat::new(TokioCompatFile::from(file));
+            Box::new(oio::into_reader::from_fd(f, start, end))
+        });
+
+        file.seek(SeekFrom::Start(0)).await?;
+
+        Ok(SftpReader { file })
+    }
+}
+
+impl oio::Read for SftpReader {
+    fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        Pin::new(&mut *self.file).poll_read(cx, buf)
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        Pin::new(&mut *self.file).poll_seek(cx, pos)
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<bytes::Bytes>>> {
+        Pin::new(&mut *self.file).poll_next(cx)
+    }
+}
+
+impl From<SftpMeta> for Metadata {
+    fn from(meta: SftpMeta) -> Self {
+        let mode = meta
+            .file_type()
+            .map(|filetype| {
+                if filetype.is_file() {
+                    EntryMode::FILE
+                } else if filetype.is_dir() {
+                    EntryMode::DIR
+                } else {
+                    EntryMode::Unknown
+                }
+            })
+            .unwrap_or(EntryMode::Unknown);
+
+        let mut metadata = Metadata::new(mode);
+
+        if let Some(size) = meta.len() {
+            metadata.set_content_length(size);
+        }
+
+        if let Some(modified) = meta.modified() {
+            metadata.set_last_modified(modified.as_system_time().into());
+        }
+
+        metadata
+    }
+}
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
new file mode 100644
index 00000000..2fa5215d
--- /dev/null
+++ b/core/src/services/sftp/writer.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bb8::PooledConnection;
+use bytes::Bytes;
+
+use super::backend::Manager;
+use crate::raw::oio;
+use crate::{Error, ErrorKind, Result};
+
+pub struct SftpWriter {
+    conn: PooledConnection<'static, Manager>,
+    path: String,
+}
+
+impl SftpWriter {
+    pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self 
{
+        SftpWriter { conn, path }
+    }
+}
+
+#[async_trait]
+impl oio::Write for SftpWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let mut file = self.conn.sftp.create(&self.path).await?;
+
+        file.write_all(&bs).await?;
+
+        Ok(())
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "SFTP does not support aborting writes",
+        ))
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 5019d22e..7e52ab84 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -71,6 +71,8 @@ pub enum Scheme {
     Rocksdb,
     /// [s3][crate::services::S3]: AWS S3 alike services.
     S3,
+    /// [sftp][crate::services::Sftp]: SFTP services
+    Sftp,
     /// [sled][crate::services::Sled]: Sled services
     Sled,
     /// [Supabase][crate::services::Supabase]: Supabase storage service
@@ -166,6 +168,7 @@ impl From<Scheme> for &'static str {
             Scheme::Redis => "redis",
             Scheme::Rocksdb => "rocksdb",
             Scheme::S3 => "s3",
+            Scheme::Sftp => "sftp",
             Scheme::Sled => "sled",
             Scheme::Supabase => "supabase",
             Scheme::Oss => "oss",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index c9038d05..991b0a6c 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -110,6 +110,7 @@ cfg_if::cfg_if! { if #[cfg(feature = "services-redis")] { 
behavior_tests!(Redis)
 cfg_if::cfg_if! { if #[cfg(feature = "services-rocksdb")] { 
behavior_tests!(Rocksdb); }}
 behavior_tests!(Oss);
 behavior_tests!(S3);
+cfg_if::cfg_if! { if #[cfg(feature = "services-sftp")] { 
behavior_tests!(Sftp); }}
 cfg_if::cfg_if! { if #[cfg(feature = "services-sled")] { 
behavior_tests!(Sled); }}
 behavior_tests!(Webdav);
 behavior_tests!(Webhdfs);


Reply via email to