This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new a15b9dfd feat(services/sftp): setup integration tests (#2192)
a15b9dfd is described below

commit a15b9dfdb0391997bb61fb0ff0a8af02f8804cfa
Author: silver-ymz <[email protected]>
AuthorDate: Sun May 14 15:03:50 2023 +0800

    feat(services/sftp): setup integration tests (#2192)
    
    * feat(services/sftp): setup integration tests
    
    * add checkout
    
    * make sshd run in background
    
    * add public key
    
    * change keygen
    
    * change keygen
    
    * fix wrong cwd
    
    * fix wrong command
    
    * fix wrong premission
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * try using docker
    
    * fix wrong container args
    
    * fix wrong container args
    
    * fix wrong container
    
    * fix wrong container
    
    * fix wrong known hosts
    
    * try print ssh connect info
    
    * fix wrong permission
    
    * Make sftp running
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Ignore known host
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * fix hotkey auth
    
    * Set timeout for test job
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * remove bb8
    
    * support self defined known hosts strategy
    
    * change key path to absolute path
    
    * fix syntax error
    
    * fix hotkeys auth
    
    * wait sshd start
    
    * Make sftp working first
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * No need to wait
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Use absolute path
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * No need to check
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Try debug
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * give dict write premission
    
    * pass hotkey auth
    
    * wait sshd open
    
    * fix wrong env variable
    
    * remove change cwd
    
    * try previous impl. If not success, I will revert back then
    
    * fix format
    
    * simplify ci file
    
    * adapt to new api
    
    * remove owning_ref
    
    * change to single connection, but exist error
    
    * Boxing connect_sftp
    
    Co-authored-by: Jiahao XU <[email protected]>
    
    * add docs for max file descriptors
    
    * change Pager to Option<SftpPager>
    
    * remove timeout
    
    * typo
    
    * add log for sftp-client
    
    * add env_filter for log
    
    * fix merge typo
    
    * test for PR #77 in openssh-sftp-client
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * update branch
    
    Signed-off-by: silver-ymz <[email protected]>
    
    * Bump openssh-sftp-client to v0.13.4
    
    Signed-off-by: silver-ymz <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Signed-off-by: silver-ymz <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
    Co-authored-by: Jiahao XU <[email protected]>
---
 .env.example                            |   1 +
 .github/workflows/service_test_sftp.yml |  70 ++++++++
 Cargo.lock                              |  51 +++---
 core/Cargo.toml                         |  15 +-
 core/src/services/sftp/backend.rs       | 305 +++++++++++++++++---------------
 core/src/services/sftp/error.rs         |  12 --
 core/src/services/sftp/pager.rs         |  74 +++-----
 core/src/services/sftp/utils.rs         |  81 +++++----
 core/src/services/sftp/writer.rs        |  14 +-
 core/tests/behavior/utils.rs            |   1 +
 10 files changed, 356 insertions(+), 268 deletions(-)

diff --git a/.env.example b/.env.example
index f4835d52..d6e64ed2 100644
--- a/.env.example
+++ b/.env.example
@@ -67,6 +67,7 @@ OPENDAL_SFTP_ENDPOINT=ssh://<endpoint>
 OPENDAL_SFTP_ROOT=/path/to/dir
 OPENDAL_SFTP_USER=<user>
 OPENDAL_SFTP_KEY=<key_path>
+OPENDAL_SFTP_KNOWN_HOSTS_STRATEGY=<accept|add|strict>
 # sled
 OPENDAL_SLED_TEST=false
 OPENDAL_SLED_DATADIR=/path/to/database
diff --git a/.github/workflows/service_test_sftp.yml 
b/.github/workflows/service_test_sftp.yml
new file mode 100644
index 00000000..374a90be
--- /dev/null
+++ b/.github/workflows/service_test_sftp.yml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Sftp
+
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    branches:
+      - main
+    paths:
+      - "core/src/**"
+      - "core/tests/**"
+      - "!core/src/docs/**"
+      - "!core/src/services/**"
+      - "core/src/services/sftp/**"
+      - ".github/workflows/service_test_sftp.yml"
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  sftp-test:
+    runs-on: ubuntu-latest
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup sftp
+        run: |
+          mkdir -p target/ssh
+          ssh-keygen -t rsa -b 4096 -f target/ssh/id_rsa -q -N "" < /dev/null
+          docker run \
+              -v `pwd`/target/ssh/id_rsa.pub:/home/foo/.ssh/keys/id_rsa.pub:ro 
\
+              --ulimit nofile=65536:65536 \
+              -p 2222:22 -d atmoz/sftp \
+              foo::::upload
+
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+      - name: Test
+        shell: bash
+        timeout-minutes: 10
+        working-directory: core
+        run: cargo test sftp --features services-sftp -- --show-output
+        env:
+          RUST_BACKTRACE: full
+          RUST_LOG: debug
+          OPENDAL_SFTP_TEST: on
+          OPENDAL_SFTP_ENDPOINT: ssh://127.0.0.1:2222
+          OPENDAL_SFTP_ROOT: /upload/sftp_test/
+          OPENDAL_SFTP_USER: foo
+          OPENDAL_SFTP_KEY: ${{ github.workspace }}/target/ssh/id_rsa
+          OPENDAL_SFTP_KNOWN_HOSTS_STRATEGY: accept
diff --git a/Cargo.lock b/Cargo.lock
index 8a986e38..ba053170 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1158,11 +1158,11 @@ dependencies = [
 
 [[package]]
 name = "dirs"
-version = "5.0.0"
+version = "5.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd"
+checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
 dependencies = [
- "dirs-sys 0.4.0",
+ "dirs-sys 0.4.1",
 ]
 
 [[package]]
@@ -1178,13 +1178,14 @@ dependencies = [
 
 [[package]]
 name = "dirs-sys"
-version = "0.4.0"
+version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "04414300db88f70d74c5ff54e50f9e1d1737d9a5b90f53fcf2e95ca2a9ab554b"
+checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
 dependencies = [
  "libc",
+ "option-ext",
  "redox_users",
- "windows-sys 0.45.0",
+ "windows-sys 0.48.0",
 ]
 
 [[package]]
@@ -2537,7 +2538,7 @@ dependencies = [
  "axum",
  "chrono",
  "clap 4.2.5",
- "dirs 5.0.0",
+ "dirs 5.0.1",
  "futures",
  "opendal",
  "quick-xml 0.27.1",
@@ -2591,7 +2592,7 @@ dependencies = [
  "anyhow",
  "assert_cmd",
  "clap 4.2.5",
- "dirs 5.0.0",
+ "dirs 5.0.1",
  "env_logger",
  "futures",
  "log",
@@ -2630,6 +2631,7 @@ dependencies = [
  "chrono",
  "criterion",
  "dashmap",
+ "dirs 5.0.1",
  "dotenvy",
  "flagset",
  "futures",
@@ -2648,7 +2650,6 @@ dependencies = [
  "openssh-sftp-client",
  "opentelemetry 0.19.0",
  "opentelemetry-jaeger",
- "owning_ref",
  "parking_lot 0.12.1",
  "paste",
  "percent-encoding",
@@ -2746,26 +2747,30 @@ dependencies = [
 
 [[package]]
 name = "openssh-sftp-client"
-version = "0.12.2"
+version = "0.13.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f4fa8e5f26e549bd266d9bcd9e5b4fd344729985ef1a7f5ac3e51f3f96a4a620"
+checksum = "cbf7ee42fa3533261da10b179b27ce4fd7b6954b9919836d26d9a96f39c62d26"
 dependencies = [
  "bytes",
  "derive_destructure2",
+ "futures-core",
  "once_cell",
+ "openssh",
  "openssh-sftp-client-lowlevel",
  "openssh-sftp-error",
+ "pin-project",
  "scopeguard",
  "tokio",
  "tokio-io-utility",
  "tokio-util",
+ "tracing",
 ]
 
 [[package]]
 name = "openssh-sftp-client-lowlevel"
-version = "0.4.1"
+version = "0.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "406bf41d8372365497d5645e802a8dfe22008b8183edbe6c79e4b75614431daa"
+checksum = "8efd4c88a55c2baa1162e97f201a26a3d0b65773e6f942ada35ef455b1194ede"
 dependencies = [
  "awaitable",
  "bytes",
@@ -2780,11 +2785,12 @@ dependencies = [
 
 [[package]]
 name = "openssh-sftp-error"
-version = "0.3.0"
+version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3d836b428ead150165d1178ed0aa672791c13b3ae9616ea1e34d13730a2cb486"
+checksum = "f4c3356e914b8006417188efd534105d5bcb230b4a9fd67782a6b4a4e15fa006"
 dependencies = [
  "awaitable-error",
+ "openssh",
  "openssh-sftp-protocol-error",
  "ssh_format_error",
  "thiserror",
@@ -2959,6 +2965,12 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "option-ext"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
+
 [[package]]
 name = "ordered-float"
 version = "2.10.0"
@@ -2999,15 +3011,6 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
 
-[[package]]
-name = "owning_ref"
-version = "0.4.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
-dependencies = [
- "stable_deref_trait",
-]
-
 [[package]]
 name = "parking"
 version = "2.0.0"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 28be62b8..3817a64b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -138,8 +138,7 @@ services-s3 = [
 services-sftp = [
   "dep:openssh",
   "dep:openssh-sftp-client",
-  "dep:bb8",
-  "dep:owning_ref",
+  "dep:dirs",
   "futures/executor",
 ]
 services-sled = ["dep:sled"]
@@ -171,6 +170,7 @@ bb8 = { version = "0.8", optional = true }
 bytes = "1.2"
 chrono = "0.4.24"
 dashmap = { version = "5.4", optional = true }
+dirs = { version = "5.0.1", optional = true }
 flagset = "0.4"
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 hdrs = { version = "0.2", optional = true, features = ["async_file"] }
@@ -185,9 +185,11 @@ minitrace = { version = "0.4.0", optional = true }
 moka = { version = "0.10", optional = true, features = ["future"] }
 once_cell = "1"
 openssh = { version = "0.9.9", optional = true }
-openssh-sftp-client = { version = "0.12.2", optional = true }
+openssh-sftp-client = { version = "0.13.4", optional = true, features = [
+  "openssh",
+  "tracing",
+] }
 opentelemetry = { version = "0.19.0", optional = true }
-owning_ref = { version = "0.4.1", optional = true }
 parking_lot = "0.12"
 percent-encoding = "2"
 pin-project = "1"
@@ -229,5 +231,8 @@ sha2 = "0.10"
 size = "0.4"
 tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
 tracing-opentelemetry = "0.17"
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3", features = [
+  "env-filter",
+  "tracing-log",
+] }
 wiremock = "0.5"
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 6769e62b..bfdd9821 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -19,22 +19,20 @@ use std::cmp::min;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
+use std::path::Path;
+use std::path::PathBuf;
+use std::time::Duration;
 
 use async_trait::async_trait;
-use bb8::PooledConnection;
-use futures::executor::block_on;
+use futures::StreamExt;
 use log::debug;
-use openssh::RemoteChild;
-use openssh::Session;
+use openssh::KnownHosts;
 use openssh::SessionBuilder;
-use openssh::Stdio;
 use openssh_sftp_client::Sftp;
-use owning_ref::OwningHandle;
-use tokio::sync::OnceCell;
+use openssh_sftp_client::SftpOptions;
 
 use super::error::is_not_found;
 use super::error::is_sftp_protocol_error;
-use super::error::SftpError;
 use super::pager::SftpPager;
 use super::utils::SftpReader;
 use super::writer::SftpWriter;
@@ -44,6 +42,10 @@ use crate::*;
 
 /// SFTP services support. (only works on unix)
 ///
+/// Warning: Maximum number of file holdings is depend on the remote system 
configuration.
+/// For example, the default value is 255 in macos, and 1024 in linux. If you 
want to open
+/// lots of files, you should pay attention to close the file after using it.
+///
 /// # Capabilities
 ///
 /// This service can be used to:
@@ -66,6 +68,7 @@ use crate::*;
 /// - `root`: Set the work directory for backend, default to `/home/$USER/`
 /// - `user`: Set the login user
 /// - `key`: Set the public key for login
+/// - `known_hosts_strategy`: Set the strategy for known hosts, default to 
`Strict`
 ///
 /// It doesn't support password login, you can use public key instead.
 ///
@@ -86,7 +89,7 @@ use crate::*;
 ///     // create backend builder
 ///     let mut builder = Sftp::default();
 ///
-///     builder.endpoint("127.0.0.1").user("test").password("test");
+///     builder.endpoint("127.0.0.1").user("test").key("test_key");
 ///
 ///     let op: Operator = Operator::new(builder)?.finish();
 ///     let _obj: Object = op.object("test_file");
@@ -100,6 +103,7 @@ pub struct SftpBuilder {
     root: Option<String>,
     user: Option<String>,
     key: Option<String>,
+    known_hosts_strategy: Option<String>,
 }
 
 impl Debug for SftpBuilder {
@@ -155,6 +159,21 @@ impl SftpBuilder {
 
         self
     }
+
+    /// set known_hosts strategy for sftp backend.
+    /// available values:
+    /// - Strict (default)
+    /// - Accept
+    /// - Add
+    pub fn known_hosts_strategy(&mut self, strategy: &str) -> &mut Self {
+        self.known_hosts_strategy = if strategy.is_empty() {
+            None
+        } else {
+            Some(strategy.to_string())
+        };
+
+        self
+    }
 }
 
 impl Builder for SftpBuilder {
@@ -179,6 +198,25 @@ impl Builder for SftpBuilder {
             .map(|r| normalize_root(r.as_str()))
             .unwrap_or(format!("/home/{}/", user));
 
+        let known_hosts_strategy = match &self.known_hosts_strategy {
+            Some(v) => {
+                let v = v.to_lowercase();
+                if v == "strict" {
+                    KnownHosts::Strict
+                } else if v == "accept" {
+                    KnownHosts::Accept
+                } else if v == "add" {
+                    KnownHosts::Add
+                } else {
+                    return Err(Error::new(
+                        ErrorKind::ConfigInvalid,
+                        format!("unknown known_hosts strategy: {}", 
v).as_str(),
+                    ));
+                }
+            }
+            None => KnownHosts::Strict,
+        };
+
         debug!("sftp backend finished: {:?}", &self);
 
         Ok(SftpBackend {
@@ -186,7 +224,8 @@ impl Builder for SftpBuilder {
             root,
             user,
             key: self.key.clone(),
-            sftp: OnceCell::new(),
+            known_hosts_strategy,
+            client: tokio::sync::OnceCell::new(),
         })
     }
 
@@ -197,86 +236,21 @@ impl Builder for SftpBuilder {
         map.get("endpoint").map(|v| builder.endpoint(v));
         map.get("user").map(|v| builder.user(v));
         map.get("key").map(|v| builder.key(v));
+        map.get("known_hosts_strategy")
+            .map(|v| builder.known_hosts_strategy(v));
 
         builder
     }
 }
 
-#[derive(Clone)]
-pub struct Manager {
-    endpoint: String,
-    user: String,
-    key: Option<String>,
-}
-
-pub struct Connection {
-    // the remote child owns the ref to session, so we need to use owning 
handle
-    // The session will only create one child, so we can make sure the child 
can live
-    // as long as the session. (the session will be dropped when the 
connection is dropped)
-    // Related: https://stackoverflow.com/a/47260399
-    child: OwningHandle<Box<Session>, Box<RemoteChild<'static>>>,
-    pub sftp: Sftp,
-}
-
-#[async_trait]
-impl bb8::ManageConnection for Manager {
-    type Connection = Connection;
-    type Error = SftpError;
-
-    async fn connect(&self) -> std::result::Result<Self::Connection, 
Self::Error> {
-        let mut session = SessionBuilder::default();
-
-        session.user(self.user.clone());
-
-        if let Some(key) = &self.key {
-            session.keyfile(key);
-        }
-
-        let session = session.connect(self.endpoint.clone()).await?;
-
-        let sess = Box::new(session);
-        let mut oref = OwningHandle::new_with_fn(sess, unsafe {
-            |x| {
-                Box::new(
-                    block_on(
-                        (*x).subsystem("sftp")
-                            .stdin(Stdio::piped())
-                            .stdout(Stdio::piped())
-                            .spawn(),
-                    )
-                    .unwrap(),
-                )
-            }
-        });
-
-        let sftp = Sftp::new(
-            oref.stdin().take().unwrap(),
-            oref.stdout().take().unwrap(),
-            Default::default(),
-        )
-        .await?;
-
-        Ok(Connection { child: oref, sftp })
-    }
-
-    async fn is_valid(&self, conn: &mut Self::Connection) -> 
std::result::Result<(), Self::Error> {
-        conn.child.session().check().await?;
-        Ok(())
-    }
-
-    /// Always allow reuse conn.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
-    }
-}
-
 /// Backend is used to serve `Accessor` support for sftp.
 pub struct SftpBackend {
     endpoint: String,
     root: String,
     user: String,
     key: Option<String>,
-    sftp: OnceCell<bb8::Pool<Manager>>,
+    known_hosts_strategy: KnownHosts,
+    client: tokio::sync::OnceCell<Sftp>,
 }
 
 impl Debug for SftpBackend {
@@ -291,7 +265,7 @@ impl Accessor for SftpBackend {
     type BlockingReader = ();
     type Writer = SftpWriter;
     type BlockingWriter = ();
-    type Pager = SftpPager;
+    type Pager = Option<SftpPager>;
     type BlockingPager = ();
 
     fn info(&self) -> AccessorInfo {
@@ -318,18 +292,14 @@ impl Accessor for SftpBackend {
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let client = self.sftp_connect().await?;
-        let mut fs = client.sftp.fs();
-        fs.set_cwd(self.root.clone());
+        let client = self.connect().await?;
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
 
-        let paths: Vec<&str> = path.split_inclusive('/').collect();
-        let mut current = self.root.clone();
+        let paths = Path::new(&path).components();
+        let mut current = PathBuf::from(&self.root);
         for p in paths {
-            if p.is_empty() {
-                continue;
-            }
-
-            current.push_str(p);
+            current = current.join(p);
             let res = fs.create_dir(p).await;
 
             if let Err(e) = res {
@@ -338,20 +308,20 @@ impl Accessor for SftpBackend {
                     return Err(e.into());
                 }
             }
-            fs.set_cwd(current.clone());
+            fs.set_cwd(&current);
         }
 
         return Ok(RpCreateDir::default());
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let client = self.sftp_connect().await?;
+        let client = self.connect().await?;
 
-        let mut fs = client.sftp.fs();
-        fs.set_cwd(self.root.clone());
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
         let path = fs.canonicalize(path).await?;
 
-        let mut file = client.sftp.open(path.as_path()).await?;
+        let mut file = client.open(path.as_path()).await?;
 
         let total_length = file.metadata().await?.len().ok_or(Error::new(
             ErrorKind::NotFound,
@@ -377,7 +347,7 @@ impl Accessor for SftpBackend {
             (None, None) => (0, total_length),
         };
 
-        let r = SftpReader::new(self.sftp_connect_owned().await?, path, start, 
end).await?;
+        let r = SftpReader::new(file, start, end).await?;
 
         Ok((RpRead::new(end - start), r))
     }
@@ -394,18 +364,21 @@ impl Accessor for SftpBackend {
             self.create_dir(dir, OpCreateDir::default()).await?;
         }
 
-        let path = format!("{}{}", self.root, path);
+        let client = self.connect().await?;
 
-        Ok((
-            RpWrite::new(),
-            SftpWriter::new(self.sftp_connect_owned().await?, path),
-        ))
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
+        let path = fs.canonicalize(path).await?;
+
+        let file = client.create(&path).await?;
+
+        Ok((RpWrite::new(), SftpWriter::new(file)))
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
-        let client = self.sftp_connect().await?;
-        let mut fs = client.sftp.fs();
-        fs.set_cwd(self.root.clone());
+        let client = self.connect().await?;
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
 
         let meta = fs.metadata(path).await?;
 
@@ -413,14 +386,14 @@ impl Accessor for SftpBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let client = self.sftp_connect().await?;
+        let client = self.connect().await?;
 
-        let mut fs = client.sftp.fs();
-        fs.set_cwd(self.root.clone());
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
 
         if path.ends_with('/') {
             let file_path = format!("./{}", path);
-            let dir = match fs.open_dir(file_path.clone()).await {
+            let mut dir = match fs.open_dir(&file_path).await {
                 Ok(dir) => dir,
                 Err(e) => {
                     if is_not_found(&e) {
@@ -431,15 +404,23 @@ impl Accessor for SftpBackend {
                 }
             }
             .read_dir()
-            .await?;
+            .boxed();
 
-            for file in &dir {
-                let file_name = file.filename().to_str().unwrap();
-                if file_name == "." || file_name == ".." {
+            while let Some(file) = dir.next().await {
+                let file = file?;
+                let file_name = file.filename().to_str();
+                if file_name == Some(".") || file_name == Some("..") {
                     continue;
                 }
-                let file_path = format!("{}{}", path, file_name);
-                self.delete(file_path.as_str(), OpDelete::default()).await?;
+                let file_path = Path::new(&self.root).join(file.filename());
+                self.delete(
+                    file_path.to_str().ok_or(Error::new(
+                        ErrorKind::Unexpected,
+                        "unable to convert file path to str",
+                    ))?,
+                    OpDelete::default(),
+                )
+                .await?;
             }
 
             match fs.remove_dir(path).await {
@@ -461,58 +442,102 @@ impl Accessor for SftpBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
-        let client = self.sftp_connect().await?;
-        let mut fs = client.sftp.fs();
-        fs.set_cwd(self.root.clone());
+        let client = self.connect().await?;
+        let mut fs = client.fs();
+        fs.set_cwd(&self.root);
 
         let file_path = format!("./{}", path);
 
-        let mut dir = match fs.open_dir(file_path.clone()).await {
+        let dir = match fs.open_dir(&file_path).await {
             Ok(dir) => dir,
             Err(e) => {
                 if is_not_found(&e) {
-                    return Ok((RpList::default(), SftpPager::empty()));
+                    return Ok((RpList::default(), None));
                 } else {
                     return Err(e.into());
                 }
             }
-        };
-        let dir = dir.read_dir().await?;
+        }
+        .read_dir();
 
         Ok((
             RpList::default(),
-            SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+            Some(SftpPager::new(dir, path.to_owned(), args.limit())),
         ))
     }
 }
 
 impl SftpBackend {
-    async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
-        let pool = self
-            .sftp
-            .get_or_try_init(|| async {
-                let manager = Manager {
-                    endpoint: self.endpoint.clone(),
-                    user: self.user.clone(),
-                    key: self.key.clone(),
-                };
-
-                bb8::Pool::builder().max_size(10).build(manager).await
+    async fn connect(&self) -> Result<&Sftp> {
+        let sftp = self
+            .client
+            .get_or_try_init(|| {
+                Box::pin(connect_sftp(
+                    self.endpoint.as_str(),
+                    self.root.clone(),
+                    self.user.clone(),
+                    self.key.clone(),
+                    self.known_hosts_strategy.clone(),
+                ))
             })
             .await?;
 
-        Ok(pool)
+        Ok(sftp)
+    }
+}
+
+async fn connect_sftp(
+    endpoint: &str,
+    root: String,
+    user: String,
+    key: Option<String>,
+    known_hosts_strategy: KnownHosts,
+) -> Result<Sftp> {
+    let mut session = SessionBuilder::default();
+
+    session.user(user);
+
+    if let Some(key) = &key {
+        session.keyfile(key);
     }
 
-    pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
-        let conn = self.pool().await?.get().await?;
+    // set control directory to avoid temp files in root directory when panic
+    if let Some(dir) = dirs::runtime_dir() {
+        session.control_directory(dir);
+    }
 
-        Ok(conn)
+    #[cfg(target_os = "macos")]
+    {
+        let _ = std::fs::create_dir("/private/tmp/.opendal/");
+        session.control_directory("/private/tmp/.opendal/");
     }
 
-    pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static, 
Manager>> {
-        let conn = self.pool().await?.get_owned().await?;
+    session.server_alive_interval(Duration::from_secs(5));
+    session.known_hosts_check(known_hosts_strategy);
 
-        Ok(conn)
+    let session = session.connect(&endpoint).await?;
+
+    let sftp = Sftp::from_session(session, SftpOptions::default()).await?;
+
+    let mut fs = sftp.fs();
+    fs.set_cwd("/");
+
+    let paths = Path::new(&root).components();
+    let mut current = PathBuf::from("/");
+    for p in paths {
+        current = current.join(p);
+        let res = fs.create_dir(p).await;
+
+        if let Err(e) = res {
+            // ignore error if dir already exists
+            if !is_sftp_protocol_error(&e) {
+                return Err(e.into());
+            }
+        }
+        fs.set_cwd(&current);
     }
+
+    debug!("sftp connection created at {}", root);
+
+    Ok(sftp)
 }
diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs
index 0e7dc2ef..45178712 100644
--- a/core/src/services/sftp/error.rs
+++ b/core/src/services/sftp/error.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::RunError;
 use openssh::Error as SshError;
 use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError};
 
@@ -71,17 +70,6 @@ impl From<SftpError> for Error {
     }
 }
 
-impl From<RunError<SftpError>> for Error {
-    fn from(e: RunError<SftpError>) -> Self {
-        match e {
-            RunError::User(err) => err.into(),
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
-            }
-        }
-    }
-}
-
 pub(super) fn is_not_found(e: &SftpClientError) -> bool {
     matches!(e, SftpClientError::SftpError(SftpErrorKind::NoSuchFile, _))
 }
diff --git a/core/src/services/sftp/pager.rs b/core/src/services/sftp/pager.rs
index e6b557c2..81c0e22b 100644
--- a/core/src/services/sftp/pager.rs
+++ b/core/src/services/sftp/pager.rs
@@ -15,35 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::pin::Pin;
+
 use async_trait::async_trait;
+use futures::StreamExt;
 use openssh_sftp_client::fs::DirEntry;
+use openssh_sftp_client::fs::ReadDir;
 
 use crate::raw::oio;
 use crate::Result;
 
 pub struct SftpPager {
-    dir: Box<[DirEntry]>,
-    path: String,
-    limit: Option<usize>,
-    complete: bool,
+    dir: Pin<Box<ReadDir>>,
+    prefix: String,
+    limit: usize,
 }
 
 impl SftpPager {
-    pub fn new(dir: Box<[DirEntry]>, path: String, limit: Option<usize>) -> 
Self {
-        Self {
-            dir,
-            path,
-            limit,
-            complete: false,
-        }
-    }
+    pub fn new(dir: ReadDir, path: String, limit: Option<usize>) -> Self {
+        let prefix = if path == "/" { "".to_owned() } else { path };
+
+        let limit = limit.unwrap_or(usize::MAX);
 
-    pub fn empty() -> Self {
-        Self {
-            dir: Box::new([]),
-            path: String::new(),
-            limit: None,
-            complete: true,
+        SftpPager {
+            dir: Box::pin(dir),
+            prefix,
+            limit,
         }
     }
 }
@@ -51,41 +48,28 @@ impl SftpPager {
 #[async_trait]
 impl oio::Page for SftpPager {
     async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.complete {
+        if self.limit == 0 {
             return Ok(None);
         }
 
-        // when listing the root directory, the prefix should be empty
-        if self.path == "/" {
-            self.path = "".to_owned();
-        }
-
-        let iter = self
-            .dir
-            .iter()
-            .filter(|e| {
-                // filter out "." and ".."
-                e.filename().to_str().unwrap() != "." && 
e.filename().to_str().unwrap() != ".."
-            })
-            .map(|e| map_entry(self.path.clone(), e.clone()));
-
-        let v: Vec<oio::Entry> = if let Some(limit) = self.limit {
-            iter.take(limit).collect()
-        } else {
-            iter.collect()
-        };
-
-        self.complete = true;
+        let item = self.dir.next().await;
 
-        if v.is_empty() {
-            Ok(None)
-        } else {
-            Ok(Some(v))
+        match item {
+            Some(Ok(e)) => {
+                if e.filename().to_str() == Some(".") || e.filename().to_str() 
== Some("..") {
+                    self.next().await
+                } else {
+                    self.limit -= 1;
+                    Ok(Some(vec![map_entry(self.prefix.as_str(), e.clone())]))
+                }
+            }
+            Some(Err(e)) => Err(e.into()),
+            None => Ok(None),
         }
     }
 }
 
-fn map_entry(prefix: String, value: DirEntry) -> oio::Entry {
+fn map_entry(prefix: &str, value: DirEntry) -> oio::Entry {
     let path = format!(
         "{}{}{}",
         prefix,
diff --git a/core/src/services/sftp/utils.rs b/core/src/services/sftp/utils.rs
index 91387aad..303aee70 100644
--- a/core/src/services/sftp/utils.rs
+++ b/core/src/services/sftp/utils.rs
@@ -16,19 +16,18 @@
 // under the License.
 
 use std::io::SeekFrom;
-use std::path::PathBuf;
 use std::pin::Pin;
 use std::task::Context;
 use std::task::Poll;
 
 use async_compat::Compat;
-use bb8::PooledConnection;
-use futures::executor::block_on;
+use futures::AsyncBufRead;
+use futures::AsyncRead;
+use futures::AsyncSeek;
+use openssh_sftp_client::file::File;
 use openssh_sftp_client::file::TokioCompatFile;
 use openssh_sftp_client::metadata::MetaData as SftpMeta;
-use owning_ref::OwningHandle;
 
-use super::backend::Manager;
 use crate::raw::oio;
 use crate::raw::oio::into_reader::FdReader;
 use crate::raw::oio::ReadExt;
@@ -36,45 +35,61 @@ use crate::EntryMode;
 use crate::Metadata;
 use crate::Result;
 
-pub struct SftpReader {
-    // similar situation to connection struct
-    // We can make sure the file can live as long as the connection.
-    file: OwningHandle<
-        Box<PooledConnection<'static, Manager>>,
-        Box<FdReader<Compat<TokioCompatFile<'static>>>>,
-    >,
+pub struct SftpReaderInner {
+    file: Pin<Box<Compat<TokioCompatFile>>>,
 }
+pub type SftpReader = FdReader<SftpReaderInner>;
 
-impl SftpReader {
-    pub async fn new(
-        conn: PooledConnection<'static, Manager>,
-        path: PathBuf,
-        start: u64,
-        end: u64,
-    ) -> Result<Self> {
-        let mut file = OwningHandle::new_with_fn(Box::new(conn), |conn| unsafe 
{
-            let file = block_on((*conn).sftp.open(path)).unwrap();
-            let f = Compat::new(TokioCompatFile::from(file));
-            Box::new(oio::into_reader::from_fd(f, start, end))
-        });
+impl SftpReaderInner {
+    pub async fn new(file: File) -> Self {
+        let file = Compat::new(file.into());
+        Self {
+            file: Box::pin(file),
+        }
+    }
+}
 
-        file.seek(SeekFrom::Start(0)).await?;
+impl SftpReader {
+    /// Create a new reader from a file, starting at the given offset and 
ending at the given offset.
+    pub async fn new(file: File, start: u64, end: u64) -> Result<Self> {
+        let file = SftpReaderInner::new(file).await;
+        let mut r = oio::into_reader::from_fd(file, start, end);
+        r.seek(SeekFrom::Start(0)).await?;
+        Ok(r)
+    }
+}
 
-        Ok(SftpReader { file })
+impl AsyncRead for SftpReaderInner {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut [u8],
+    ) -> Poll<std::io::Result<usize>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.file).poll_read(cx, buf)
     }
 }
 
-impl oio::Read for SftpReader {
-    fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> 
Poll<Result<usize>> {
-        Pin::new(&mut *self.file).poll_read(cx, buf)
+impl AsyncBufRead for SftpReaderInner {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> 
Poll<std::io::Result<&[u8]>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.file).poll_fill_buf(cx)
     }
 
-    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
-        Pin::new(&mut *self.file).poll_seek(cx, pos)
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        let this = self.get_mut();
+        Pin::new(&mut this.file).consume(amt)
     }
+}
 
-    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<bytes::Bytes>>> {
-        Pin::new(&mut *self.file).poll_next(cx)
+impl AsyncSeek for SftpReaderInner {
+    fn poll_seek(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        pos: SeekFrom,
+    ) -> Poll<std::io::Result<u64>> {
+        let this = self.get_mut();
+        Pin::new(&mut this.file).poll_seek(cx, pos)
     }
 }
 
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 2fa5215d..48126ca8 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -16,30 +16,26 @@
 // under the License.
 
 use async_trait::async_trait;
-use bb8::PooledConnection;
 use bytes::Bytes;
+use openssh_sftp_client::file::File;
 
-use super::backend::Manager;
 use crate::raw::oio;
 use crate::{Error, ErrorKind, Result};
 
 pub struct SftpWriter {
-    conn: PooledConnection<'static, Manager>,
-    path: String,
+    file: File,
 }
 
 impl SftpWriter {
-    pub fn new(conn: PooledConnection<'static, Manager>, path: String) -> Self 
{
-        SftpWriter { conn, path }
+    pub fn new(file: File) -> Self {
+        SftpWriter { file }
     }
 }
 
 #[async_trait]
 impl oio::Write for SftpWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let mut file = self.conn.sftp.create(&self.path).await?;
-
-        file.write_all(&bs).await?;
+        self.file.write_all(&bs).await?;
 
         Ok(())
     }
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 9a4671db..8f30bbdc 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -42,6 +42,7 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
     let _ = tracing_subscriber::fmt()
         .pretty()
         .with_test_writer()
+        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
         .try_init();
     let _ = dotenvy::dotenv();
 


Reply via email to