This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a15b9dfd feat(services/sftp): setup integration tests (#2192)
a15b9dfd is described below
commit a15b9dfdb0391997bb61fb0ff0a8af02f8804cfa
Author: silver-ymz <[email protected]>
AuthorDate: Sun May 14 15:03:50 2023 +0800
feat(services/sftp): setup integration tests (#2192)
* feat(services/sftp): setup integration tests
* add checkout
* make sshd run in background
* add public key
* change keygen
* change keygen
* fix wrong cwd
* fix wrong command
* fix wrong premission
* fix
* fix
* fix
* fix
* fix
* try using docker
* fix wrong container args
* fix wrong container args
* fix wrong container
* fix wrong container
* fix wrong known hosts
* try print ssh connect info
* fix wrong permission
* Make sftp running
Signed-off-by: Xuanwo <[email protected]>
* Ignore known host
Signed-off-by: Xuanwo <[email protected]>
* fix hotkey auth
* Set timeout for test job
Signed-off-by: Xuanwo <[email protected]>
* remove bb8
* support self defined known hosts strategy
* change key path to absolute path
* fix syntax error
* fix hotkeys auth
* wait sshd start
* Make sftp working first
Signed-off-by: Xuanwo <[email protected]>
* No need to wait
Signed-off-by: Xuanwo <[email protected]>
* Use absolute path
Signed-off-by: Xuanwo <[email protected]>
* No need to check
Signed-off-by: Xuanwo <[email protected]>
* Try debug
Signed-off-by: Xuanwo <[email protected]>
* give dict write premission
* pass hotkey auth
* wait sshd open
* fix wrong env variable
* remove change cwd
* try previous impl. If not success, I will revert back then
* fix format
* simplify ci file
* adapt to new api
* remove owning_ref
* change to single connection, but exist error
* Boxing connect_sftp
Co-authored-by: Jiahao XU <[email protected]>
* add docs for max file descriptors
* change Pager to Option<SftpPager>
* remove timeout
* typo
* add log for sftp-client
* add env_filter for log
* fix merge typo
* test for PR #77 in openssh-sftp-client
Signed-off-by: silver-ymz <[email protected]>
* update branch
Signed-off-by: silver-ymz <[email protected]>
* Bump openssh-sftp-client to v0.13.4
Signed-off-by: silver-ymz <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: silver-ymz <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
Co-authored-by: Jiahao XU <[email protected]>
---
.env.example | 1 +
.github/workflows/service_test_sftp.yml | 70 ++++++++
Cargo.lock | 51 +++---
core/Cargo.toml | 15 +-
core/src/services/sftp/backend.rs | 305 +++++++++++++++++---------------
core/src/services/sftp/error.rs | 12 --
core/src/services/sftp/pager.rs | 74 +++-----
core/src/services/sftp/utils.rs | 81 +++++----
core/src/services/sftp/writer.rs | 14 +-
core/tests/behavior/utils.rs | 1 +
10 files changed, 356 insertions(+), 268 deletions(-)
diff --git a/.env.example b/.env.example
index f4835d52..d6e64ed2 100644
--- a/.env.example
+++ b/.env.example
@@ -67,6 +67,7 @@ OPENDAL_SFTP_ENDPOINT=ssh://<endpoint>
OPENDAL_SFTP_ROOT=/path/to/dir
OPENDAL_SFTP_USER=<user>
OPENDAL_SFTP_KEY=<key_path>
+OPENDAL_SFTP_KNOWN_HOSTS_STRATEGY=<accept|add|strict>
# sled
OPENDAL_SLED_TEST=false
OPENDAL_SLED_DATADIR=/path/to/database
diff --git a/.github/workflows/service_test_sftp.yml
b/.github/workflows/service_test_sftp.yml
new file mode 100644
index 00000000..374a90be
--- /dev/null
+++ b/.github/workflows/service_test_sftp.yml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Sftp
+
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "core/src/**"
+ - "core/tests/**"
+ - "!core/src/docs/**"
+ - "!core/src/services/**"
+ - "core/src/services/sftp/**"
+ - ".github/workflows/service_test_sftp.yml"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ sftp-test:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup sftp
+ run: |
+ mkdir -p target/ssh
+ ssh-keygen -t rsa -b 4096 -f target/ssh/id_rsa -q -N "" < /dev/null
+ docker run \
+ -v `pwd`/target/ssh/id_rsa.pub:/home/foo/.ssh/keys/id_rsa.pub:ro
\
+ --ulimit nofile=65536:65536 \
+ -p 2222:22 -d atmoz/sftp \
+ foo::::upload
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ - name: Test
+ shell: bash
+ timeout-minutes: 10
+ working-directory: core
+ run: cargo test sftp --features services-sftp -- --show-output
+ env:
+ RUST_BACKTRACE: full
+ RUST_LOG: debug
+ OPENDAL_SFTP_TEST: on
+ OPENDAL_SFTP_ENDPOINT: ssh://127.0.0.1:2222
+ OPENDAL_SFTP_ROOT: /upload/sftp_test/
+ OPENDAL_SFTP_USER: foo
+ OPENDAL_SFTP_KEY: ${{ github.workspace }}/target/ssh/id_rsa
+ OPENDAL_SFTP_KNOWN_HOSTS_STRATEGY: accept
diff --git a/Cargo.lock b/Cargo.lock
index 8a986e38..ba053170 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1158,11 +1158,11 @@ dependencies = [
[[package]]
name = "dirs"
-version = "5.0.0"
+version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd"
+checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
dependencies = [
- "dirs-sys 0.4.0",
+ "dirs-sys 0.4.1",
]
[[package]]
@@ -1178,13 +1178,14 @@ dependencies = [
[[package]]
name = "dirs-sys"
-version = "0.4.0"
+version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "04414300db88f70d74c5ff54e50f9e1d1737d9a5b90f53fcf2e95ca2a9ab554b"
+checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
dependencies = [
"libc",
+ "option-ext",
"redox_users",
- "windows-sys 0.45.0",
+ "windows-sys 0.48.0",
]
[[package]]
@@ -2537,7 +2538,7 @@ dependencies = [
"axum",
"chrono",
"clap 4.2.5",
- "dirs 5.0.0",
+ "dirs 5.0.1",
"futures",
"opendal",
"quick-xml 0.27.1",
@@ -2591,7 +2592,7 @@ dependencies = [
"anyhow",
"assert_cmd",
"clap 4.2.5",
- "dirs 5.0.0",
+ "dirs 5.0.1",
"env_logger",
"futures",
"log",
@@ -2630,6 +2631,7 @@ dependencies = [
"chrono",
"criterion",
"dashmap",
+ "dirs 5.0.1",
"dotenvy",
"flagset",
"futures",
@@ -2648,7 +2650,6 @@ dependencies = [
"openssh-sftp-client",
"opentelemetry 0.19.0",
"opentelemetry-jaeger",
- "owning_ref",
"parking_lot 0.12.1",
"paste",
"percent-encoding",
@@ -2746,26 +2747,30 @@ dependencies = [
[[package]]
name = "openssh-sftp-client"
-version = "0.12.2"
+version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620"
+checksum = "cbf7ee42fa3533261da10b179b27ce4fd7b6954b9919836d26d9a96f39c62d26"
dependencies = [
"bytes",
"derive_destructure2",
+ "futures-core",
"once_cell",
+ "openssh",
"openssh-sftp-client-lowlevel",
"openssh-sftp-error",
+ "pin-project",
"scopeguard",
"tokio",
"tokio-io-utility",
"tokio-util",
+ "tracing",
]
[[package]]
name = "openssh-sftp-client-lowlevel"
-version = "0.4.1"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa"
+checksum = "8efd4c88a55c2baa1162e97f201a26a3d0b65773e6f942ada35ef455b1194ede"
dependencies = [
"awaitable",
"bytes",
@@ -2780,11 +2785,12 @@ dependencies = [
[[package]]
name = "openssh-sftp-error"
-version = "0.3.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486"
+checksum = "f4c3356e914b8006417188efd534105d5bcb230b4a9fd67782a6b4a4e15fa006"
dependencies = [
"awaitable-error",
+ "openssh",
"openssh-sftp-protocol-error",
"ssh_format_error",
"thiserror",
@@ -2959,6 +2965,12 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "option-ext"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
+
[[package]]
name = "ordered-float"
version = "2.10.0"
@@ -2999,15 +3011,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
-[[package]]
-name = "owning_ref"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
-dependencies = [
- "stable_deref_trait",
-]
-
[[package]]
name = "parking"
version = "2.0.0"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 28be62b8..3817a64b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -138,8 +138,7 @@ services-s3 = [
services-sftp = [
"dep:openssh",
"dep:openssh-sftp-client",
- "dep:bb8",
- "dep:owning_ref",
+ "dep:dirs",
"futures/executor",
]
services-sled = ["dep:sled"]
@@ -171,6 +170,7 @@ bb8 = { version = "0.8", optional = true }
bytes = "1.2"
chrono = "0.4.24"
dashmap = { version = "5.4", optional = true }
+dirs = { version = "5.0.1", optional = true }
flagset = "0.4"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hdrs = { version = "0.2", optional = true, features = ["async_file"] }
@@ -185,9 +185,11 @@ minitrace = { version = "0.4.0", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
once_cell = "1"
openssh = { version = "0.9.9", optional = true }
-openssh-sftp-client = { version = "0.12.2", optional = true }
+openssh-sftp-client = { version = "0.13.4", optional = true, features = [
+ "openssh",
+ "tracing",
+] }
opentelemetry = { version = "0.19.0", optional = true }
-owning_ref = { version = "0.4.1", optional = true }
parking_lot = "0.12"
percent-encoding = "2"
pin-project = "1"
@@ -229,5 +231,8 @@ sha2 = "0.10"
size = "0.4"
tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
tracing-opentelemetry = "0.17"
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3", features = [
+ "env-filter",
+ "tracing-log",
+] }
wiremock = "0.5"
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
index 6769e62b..bfdd9821 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -19,22 +19,20 @@ use std::cmp::min;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
+use std::path::Path;
+use std::path::PathBuf;
+use std::time::Duration;
use async_trait::async_trait;
-use bb8::PooledConnection;
-use futures::executor::block_on;
+use futures::StreamExt;
use log::debug;
-use openssh::RemoteChild;
-use openssh::Session;
+use openssh::KnownHosts;
use openssh::SessionBuilder;
-use openssh::Stdio;
use openssh_sftp_client::Sftp;
-use owning_ref::OwningHandle;
-use tokio::sync::OnceCell;
+use openssh_sftp_client::SftpOptions;
use super::error::is_not_found;
use super::error::is_sftp_protocol_error;
-use super::error::SftpError;
use super::pager::SftpPager;
use super::utils::SftpReader;
use super::writer::SftpWriter;
@@ -44,6 +42,10 @@ use crate::*;
/// SFTP services support. (only works on unix)
///
+/// Warning: Maximum number of file holdings is depend on the remote system
configuration.
+/// For example, the default value is 255 in macos, and 1024 in linux. If you
want to open
+/// lots of files, you should pay attention to close the file after using it.
+///
/// # Capabilities
///
/// This service can be used to:
@@ -66,6 +68,7 @@ use crate::*;
/// - `root`: Set the work directory for backend, default to `/home/$USER/`
/// - `user`: Set the login user
/// - `key`: Set the public key for login
+/// - `known_hosts_strategy`: Set the strategy for known hosts, default to
`Strict`
///
/// It doesn't support password login, you can use public key instead.
///
@@ -86,7 +89,7 @@ use crate::*;
/// // create backend builder
/// let mut builder = Sftp::default();
///
-/// builder.endpoint("127.0.0.1").user("test").password("test");
+/// builder.endpoint("127.0.0.1").user("test").key("test_key");
///
/// let op: Operator = Operator::new(builder)?.finish();
/// let _obj: Object = op.object("test_file");
@@ -100,6 +103,7 @@ pub struct SftpBuilder {
root: Option<String>,
user: Option<String>,
key: Option<String>,
+ known_hosts_strategy: Option<String>,
}
impl Debug for SftpBuilder {
@@ -155,6 +159,21 @@ impl SftpBuilder {
self
}
+
+ /// set known_hosts strategy for sftp backend.
+ /// available values:
+ /// - Strict (default)
+ /// - Accept
+ /// - Add
+ pub fn known_hosts_strategy(&mut self, strategy: &str) -> &mut Self {
+ self.known_hosts_strategy = if strategy.is_empty() {
+ None
+ } else {
+ Some(strategy.to_string())
+ };
+
+ self
+ }
}
impl Builder for SftpBuilder {
@@ -179,6 +198,25 @@ impl Builder for SftpBuilder {
.map(|r| normalize_root(r.as_str()))
.unwrap_or(format!("/home/{}/", user));
+ let known_hosts_strategy = match &self.known_hosts_strategy {
+ Some(v) => {
+ let v = v.to_lowercase();
+ if v == "strict" {
+ KnownHosts::Strict
+ } else if v == "accept" {
+ KnownHosts::Accept
+ } else if v == "add" {
+ KnownHosts::Add
+ } else {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ format!("unknown known_hosts strategy: {}",
v).as_str(),
+ ));
+ }
+ }
+ None => KnownHosts::Strict,
+ };
+
debug!("sftp backend finished: {:?}", &self);
Ok(SftpBackend {
@@ -186,7 +224,8 @@ impl Builder for SftpBuilder {
root,
user,
key: self.key.clone(),
- sftp: OnceCell::new(),
+ known_hosts_strategy,
+ client: tokio::sync::OnceCell::new(),
})
}
@@ -197,86 +236,21 @@ impl Builder for SftpBuilder {
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("user").map(|v| builder.user(v));
map.get("key").map(|v| builder.key(v));
+ map.get("known_hosts_strategy")
+ .map(|v| builder.known_hosts_strategy(v));
builder
}
}
-#[derive(Clone)]
-pub struct Manager {
- endpoint: String,
- user: String,
- key: Option<String>,
-}
-
-pub struct Connection {
- // the remote child owns the ref to session, so we need to use owning
handle
- // The session will only create one child, so we can make sure the child
can live
- // as long as the session. (the session will be dropped when the
connection is dropped)
- // Related: https://stackoverflow.com/a/47260399
- child: OwningHandle<Box<Session>, Box<RemoteChild<'static>>>,
- pub sftp: Sftp,
-}
-
-#[async_trait]
-impl bb8::ManageConnection for Manager {
- type Connection = Connection;
- type Error = SftpError;
-
- async fn connect(&self) -> std::result::Result<Self::Connection,
Self::Error> {
- let mut session = SessionBuilder::default();
-
- session.user(self.user.clone());
-
- if let Some(key) = &self.key {
- session.keyfile(key);
- }
-
- let session = session.connect(self.endpoint.clone()).await?;
-
- let sess = Box::new(session);
- let mut oref = OwningHandle::new_with_fn(sess, unsafe {
- |x| {
- Box::new(
- block_on(
- (*x).subsystem("sftp")
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .spawn(),
- )
- .unwrap(),
- )
- }
- });
-
- let sftp = Sftp::new(
- oref.stdin().take().unwrap(),
- oref.stdout().take().unwrap(),
- Default::default(),
- )
- .await?;
-
- Ok(Connection { child: oref, sftp })
- }
-
- async fn is_valid(&self, conn: &mut Self::Connection) ->
std::result::Result<(), Self::Error> {
- conn.child.session().check().await?;
- Ok(())
- }
-
- /// Always allow reuse conn.
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
- }
-}
-
/// Backend is used to serve `Accessor` support for sftp.
pub struct SftpBackend {
endpoint: String,
root: String,
user: String,
key: Option<String>,
- sftp: OnceCell<bb8::Pool<Manager>>,
+ known_hosts_strategy: KnownHosts,
+ client: tokio::sync::OnceCell<Sftp>,
}
impl Debug for SftpBackend {
@@ -291,7 +265,7 @@ impl Accessor for SftpBackend {
type BlockingReader = ();
type Writer = SftpWriter;
type BlockingWriter = ();
- type Pager = SftpPager;
+ type Pager = Option<SftpPager>;
type BlockingPager = ();
fn info(&self) -> AccessorInfo {
@@ -318,18 +292,14 @@ impl Accessor for SftpBackend {
}
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
- let paths: Vec<&str> = path.split_inclusive('/').collect();
- let mut current = self.root.clone();
+ let paths = Path::new(&path).components();
+ let mut current = PathBuf::from(&self.root);
for p in paths {
- if p.is_empty() {
- continue;
- }
-
- current.push_str(p);
+ current = current.join(p);
let res = fs.create_dir(p).await;
if let Err(e) = res {
@@ -338,20 +308,20 @@ impl Accessor for SftpBackend {
return Err(e.into());
}
}
- fs.set_cwd(current.clone());
+ fs.set_cwd(¤t);
}
return Ok(RpCreateDir::default());
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let client = self.sftp_connect().await?;
+ let client = self.connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await?;
- let mut file = client.sftp.open(path.as_path()).await?;
+ let mut file = client.open(path.as_path()).await?;
let total_length = file.metadata().await?.len().ok_or(Error::new(
ErrorKind::NotFound,
@@ -377,7 +347,7 @@ impl Accessor for SftpBackend {
(None, None) => (0, total_length),
};
- let r = SftpReader::new(self.sftp_connect_owned().await?, path, start,
end).await?;
+ let r = SftpReader::new(file, start, end).await?;
Ok((RpRead::new(end - start), r))
}
@@ -394,18 +364,21 @@ impl Accessor for SftpBackend {
self.create_dir(dir, OpCreateDir::default()).await?;
}
- let path = format!("{}{}", self.root, path);
+ let client = self.connect().await?;
- Ok((
- RpWrite::new(),
- SftpWriter::new(self.sftp_connect_owned().await?, path),
- ))
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
+ let path = fs.canonicalize(path).await?;
+
+ let file = client.create(&path).await?;
+
+ Ok((RpWrite::new(), SftpWriter::new(file)))
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let meta = fs.metadata(path).await?;
@@ -413,14 +386,14 @@ impl Accessor for SftpBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let client = self.sftp_connect().await?;
+ let client = self.connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
if path.ends_with('/') {
let file_path = format!("./{}", path);
- let dir = match fs.open_dir(file_path.clone()).await {
+ let mut dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
@@ -431,15 +404,23 @@ impl Accessor for SftpBackend {
}
}
.read_dir()
- .await?;
+ .boxed();
- for file in &dir {
- let file_name = file.filename().to_str().unwrap();
- if file_name == "." || file_name == ".." {
+ while let Some(file) = dir.next().await {
+ let file = file?;
+ let file_name = file.filename().to_str();
+ if file_name == Some(".") || file_name == Some("..") {
continue;
}
- let file_path = format!("{}{}", path, file_name);
- self.delete(file_path.as_str(), OpDelete::default()).await?;
+ let file_path = Path::new(&self.root).join(file.filename());
+ self.delete(
+ file_path.to_str().ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "unable to convert file path to str",
+ ))?,
+ OpDelete::default(),
+ )
+ .await?;
}
match fs.remove_dir(path).await {
@@ -461,58 +442,102 @@ impl Accessor for SftpBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let client = self.sftp_connect().await?;
- let mut fs = client.sftp.fs();
- fs.set_cwd(self.root.clone());
+ let client = self.connect().await?;
+ let mut fs = client.fs();
+ fs.set_cwd(&self.root);
let file_path = format!("./{}", path);
- let mut dir = match fs.open_dir(file_path.clone()).await {
+ let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
if is_not_found(&e) {
- return Ok((RpList::default(), SftpPager::empty()));
+ return Ok((RpList::default(), None));
} else {
return Err(e.into());
}
}
- };
- let dir = dir.read_dir().await?;
+ }
+ .read_dir();
Ok((
RpList::default(),
- SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+ Some(SftpPager::new(dir, path.to_owned(), args.limit())),
))
}
}
impl SftpBackend {
- async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
- let pool = self
- .sftp
- .get_or_try_init(|| async {
- let manager = Manager {
- endpoint: self.endpoint.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- };
-
- bb8::Pool::builder().max_size(10).build(manager).await
+ async fn connect(&self) -> Result<&Sftp> {
+ let sftp = self
+ .client
+ .get_or_try_init(|| {
+ Box::pin(connect_sftp(
+ self.endpoint.as_str(),
+ self.root.clone(),
+ self.user.clone(),
+ self.key.clone(),
+ self.known_hosts_strategy.clone(),
+ ))
})
.await?;
- Ok(pool)
+ Ok(sftp)
+ }
+}
+
+async fn connect_sftp(
+ endpoint: &str,
+ root: String,
+ user: String,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+ let mut session = SessionBuilder::default();
+
+ session.user(user);
+
+ if let Some(key) = &key {
+ session.keyfile(key);
}
- pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
- let conn = self.pool().await?.get().await?;
+ // set control directory to avoid temp files in root directory when panic
+ if let Some(dir) = dirs::runtime_dir() {
+ session.control_directory(dir);
+ }
- Ok(conn)
+ #[cfg(target_os = "macos")]
+ {
+ let _ = std::fs::create_dir("/private/tmp/.opendal/");
+ session.control_directory("/private/tmp/.opendal/");
}
- pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static,
Manager>> {
- let conn = self.pool().await?.get_owned().await?;
+ session.server_alive_interval(Duration::from_secs(5));
+ session.known_hosts_check(known_hosts_strategy);
- Ok(conn)
+ let session = session.connect(&endpoint).await?;
+
+ let sftp = Sftp::from_session(session, SftpOptions::default()).await?;
+
+ let mut fs = sftp.fs();
+ fs.set_cwd("/");
+
+ let paths = Path::new(&root).components();
+ let mut current = PathBuf::from("/");
+ for p in paths {
+ current = current.join(p);
+ let res = fs.create_dir(p).await;
+
+ if let Err(e) = res {
+ // ignore error if dir already exists
+ if !is_sftp_protocol_error(&e) {
+ return Err(e.into());
+ }
+ }
+ fs.set_cwd(¤t);
}
+
+ debug!("sftp connection created at {}", root);
+
+ Ok(sftp)
}
diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs
index 0e7dc2ef..45178712 100644
--- a/core/src/services/sftp/error.rs
+++ b/core/src/services/sftp/error.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use bb8::RunError;
use openssh::Error as SshError;
use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError};
@@ -71,17 +70,6 @@ impl From<SftpError> for Error {
}
}
-impl From<RunError<SftpError>> for Error {
- fn from(e: RunError<SftpError>) -> Self {
- match e {
- RunError::User(err) => err.into(),
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
- }
- }
- }
-}
-
pub(super) fn is_not_found(e: &SftpClientError) -> bool {
matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _))
}
diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs
index e6b557c2..81c0e22b 100644
--- a/core/src/services/sftp/pager.rs
+++ b/core/src/services/sftp/pager.rs
@@ -15,35 +15,32 @@
// specific language governing permissions and limitations
// under the License.
+use std::pin::Pin;
+
use async_trait::async_trait;
+use futures::StreamExt;
use openssh_sftp_client::fs::DirEntry;
+use openssh_sftp_client::fs::ReadDir;
use crate::raw::oio;
use crate::Result;
pub struct SftpPager {
- dir: Box<[DirEntry]>,
- path: String,
- limit: Option<usize>,
- complete: bool,
+ dir: Pin<Box<ReadDir>>,
+ prefix: String,
+ limit: usize,
}
impl SftpPager {
- pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) ->
Self {
- Self {
- dir,
- path,
- limit,
- complete: false,
- }
- }
+ pub fn new(dir: ReadDir, path: String, limit: Option<usize>) -> Self {
+ let prefix = if path == "/" { "".to_owned() } else { path };
+
+ let limit = limit.unwrap_or(usize::MAX);
- pub fn empty() -> Self {
- Self {
- dir: Box::new([]),
- path: String::new(),
- limit: None,
- complete: true,
+ SftpPager {
+ dir: Box::pin(dir),
+ prefix,
+ limit,
}
}
}
@@ -51,41 +48,28 @@ impl SftpPager {
#[async_trait]
impl oio::Page for SftpPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
- if self.complete {
+ if self.limit == 0 {
return Ok(None);
}
- // when listing the root directory, the prefix should be empty
- if self.path == "/" {
- self.path = "".to_owned();
- }
-
- let iter = self
- .dir
- .iter()
- .filter(|e| {
- // filter out "." and ".."
- e.filename().to_str().unwrap() != "." &&
e.filename().to_str().unwrap() != ".."
- })
- .map(|e| map_entry(self.path.clone(), e.clone()));
-
- let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
- iter.take(limit).collect()
- } else {
- iter.collect()
- };
-
- self.complete = true;
+ let item = self.dir.next().await;
- if v.is_empty() {
- Ok(None)
- } else {
- Ok(Some(v))
+ match item {
+ Some(Ok(e)) => {
+ if e.filename().to_str() == Some(".") || e.filename().to_str()
== Some("..") {
+ self.next().await
+ } else {
+ self.limit -= 1;
+ Ok(Some(vec![map_entry(self.prefix.as_str(), e.clone())]))
+ }
+ }
+ Some(Err(e)) => Err(e.into()),
+ None => Ok(None),
}
}
}
-fn map_entry(prefix: String, value: DirEntry) -> oio::Entry {
+fn map_entry(prefix: &str, value: DirEntry) -> oio::Entry {
let path = format!(
"{}{}{}",
prefix,
diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs
index 91387aad..303aee70 100644
--- a/core/src/services/sftp/utils.rs
+++ b/core/src/services/sftp/utils.rs
@@ -16,19 +16,18 @@
// under the License.
use std::io::SeekFrom;
-use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use async_compat::Compat;
-use bb8::PooledConnection;
-use futures::executor::block_on;
+use futures::AsyncBufRead;
+use futures::AsyncRead;
+use futures::AsyncSeek;
+use openssh_sftp_client::file::File;
use openssh_sftp_client::file::TokioCompatFile;
use openssh_sftp_client::metadata::MetaData as SftpMeta;
-use owning_ref::OwningHandle;
-use super::backend::Manager;
use crate::raw::oio;
use crate::raw::oio::into_reader::FdReader;
use crate::raw::oio::ReadExt;
@@ -36,45 +35,61 @@ use crate::EntryMode;
use crate::Metadata;
use crate::Result;
-pub struct SftpReader {
- // similar situation to connection struct
- // We can make sure the file can live as long as the connection.
- file: OwningHandle<
- Box<PooledConnection<'static, Manager>>,
- Box<FdReader<Compat<TokioCompatFile<'static>>>>,
- >,
+pub struct SftpReaderInner {
+ file: Pin<Box<Compat<TokioCompatFile>>>,
}
+pub type SftpReader = FdReader<SftpReaderInner>;
-impl SftpReader {
- pub async fn new(
- conn: PooledConnection<'static, Manager>,
- path: PathBuf,
- start: u64,
- end: u64,
- ) -> Result<Self> {
- let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe
{
- let file = block_on((*conn).sftp.open(path)).unwrap();
- let f = Compat::new(TokioCompatFile::from(file));
- Box::new(oio::into_reader::from_fd(f, start, end))
- });
+impl SftpReaderInner {
+ pub async fn new(file: File) -> Self {
+ let file = Compat::new(file.into());
+ Self {
+ file: Box::pin(file),
+ }
+ }
+}
- file.seek(SeekFrom::Start(0)).await?;
+impl SftpReader {
+ /// Create a new reader from a file, starting at the given offset and
ending at the given offset.
+ pub async fn new(file: File, start: u64, end: u64) -> Result<Self> {
+ let file = SftpReaderInner::new(file).await;
+ let mut r = oio::into_reader::from_fd(file, start, end);
+ r.seek(SeekFrom::Start(0)).await?;
+ Ok(r)
+ }
+}
- Ok(SftpReader { file })
+impl AsyncRead for SftpReaderInner {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<std::io::Result<usize>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.file).poll_read(cx, buf)
}
}
-impl oio::Read for SftpReader {
- fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) ->
Poll<Result<usize>> {
- Pin::new(&mut *self.file).poll_read(cx, buf)
+impl AsyncBufRead for SftpReaderInner {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) ->
Poll<std::io::Result<&[u8]>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.file).poll_fill_buf(cx)
}
- fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
- Pin::new(&mut *self.file).poll_seek(cx, pos)
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ let this = self.get_mut();
+ Pin::new(&mut this.file).consume(amt)
}
+}
- fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<bytes::Bytes>>> {
- Pin::new(&mut *self.file).poll_next(cx)
+impl AsyncSeek for SftpReaderInner {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ pos: SeekFrom,
+ ) -> Poll<std::io::Result<u64>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.file).poll_seek(cx, pos)
}
}
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 2fa5215d..48126ca8 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -16,30 +16,26 @@
// under the License.
use async_trait::async_trait;
-use bb8::PooledConnection;
use bytes::Bytes;
+use openssh_sftp_client::file::File;
-use super::backend::Manager;
use crate::raw::oio;
use crate::{Error, ErrorKind, Result};
pub struct SftpWriter {
- conn: PooledConnection<'static, Manager>,
- path: String,
+ file: File,
}
impl SftpWriter {
- pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self
{
- SftpWriter { conn, path }
+ pub fn new(file: File) -> Self {
+ SftpWriter { file }
}
}
#[async_trait]
impl oio::Write for SftpWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- let mut file = self.conn.sftp.create(&self.path).await?;
-
- file.write_all(&bs).await?;
+ self.file.write_all(&bs).await?;
Ok(())
}
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 9a4671db..8f30bbdc 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -42,6 +42,7 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
let _ = dotenvy::dotenv();