NobodyXu commented on code in PR #2192:
URL: 
https://github.com/apache/incubator-opendal/pull/2192#discussion_r1186011952


##########
core/src/services/sftp/backend.rs:
##########
@@ -465,43 +466,94 @@ impl Accessor for SftpBackend {
                     return Err(e.into());
                 }
             }
-        };
-        let dir = dir.read_dir().await?;
+        }
+        .read_dir();
 
         Ok((
             RpList::default(),
-            SftpPager::new(dir.into_inner(), path.to_owned(), args.limit()),
+            SftpPager::new(client, dir, path.to_owned(), args.limit()),
         ))
     }
 }
 
 impl SftpBackend {
-    async fn pool(&self) -> Result<&bb8::Pool<Manager>> {
-        let pool = self
-            .sftp
-            .get_or_try_init(|| async {
-                let manager = Manager {
-                    endpoint: self.endpoint.clone(),
-                    user: self.user.clone(),
-                    key: self.key.clone(),
-                };
-
-                bb8::Pool::builder().max_size(10).build(manager).await
-            })
-            .await?;
-
-        Ok(pool)
-    }
+    async fn connect(&self) -> std::result::Result<Connection, Error> {
+        let mut session = SessionBuilder::default();
 
-    pub async fn sftp_connect(&self) -> Result<PooledConnection<'_, Manager>> {
-        let conn = self.pool().await?.get().await?;
+        session.user(self.user.clone());
 
-        Ok(conn)
-    }
+        if let Some(key) = &self.key {
+            session.keyfile(key);
+        }
+
+        // set control directory to avoid temp files in root directory when 
panic
+        session.control_directory("/tmp");
+        session.server_alive_interval(Duration::from_secs(5));
+        session.known_hosts_check(self.known_hosts_strategy.clone());
 
-    pub async fn sftp_connect_owned(&self) -> Result<PooledConnection<'static, 
Manager>> {
-        let conn = self.pool().await?.get_owned().await?;
+        // when connection > 10, it will wait others to finish
+        let permit = self.cnt.clone().acquire_owned().await.map_err(|_| {
+            Error::new(ErrorKind::Unexpected, "failed to acquire connection 
permit")
+        })?;
+
+        let session = session.connect(&self.endpoint).await?;
+
+        let sess = Box::new(session);
+        let mut oref = OwningHandle::new_with_fn(sess, unsafe {
+            |x| {
+                Box::new(
+                    block_on(
+                        (*x).subsystem("sftp")
+                            .stdin(Stdio::piped())
+                            .stdout(Stdio::piped())
+                            .spawn(),
+                    )
+                    .unwrap(),
+                )
+            }
+        });
+
+        let sftp = Sftp::new(
+            oref.stdin().take().unwrap(),
+            oref.stdout().take().unwrap(),
+            Default::default(),
+        )
+        .await?;

Review Comment:
   @silver-ymz This should work for you:
   
   ```rust
           let once_cell = 
std::sync::Arc::new(once_cell::sync::OnceCell::new());
           let once_cell_cloned = std::sync::Arc::clone(&stdio);
   
           let mut future = Box::pin(async move {
               let res = session.subsystem("sftp")
                   .stdin(Stdio::piped())
                   .stdout(Stdio::piped())
                   .stderr(Stdio::null())
                   .spawn()
                   .await;
               let mut child = match res {
                   Ok(child) => child,
                   Err(err) => {
                       once_cell_cloned.set(Err(err)).unwrap(); // Err
                       drop(once_cell_cloned);
                       return;
                   }
               };
   
               let stdin = child.stdin().take().unwrap();
               let stdout = child.stdout().take().unwrap();
   
               once_cell_cloned.set(Ok((stdin, stdout))).unwrap(); // Ok
   
               // drop it so that once_cell will be the sole reference into it
               // and can use Arc::get_mut
               drop(once_cell_cloned);
   
               // Wait forever until being dropped
               std::future::pending().await;
   
               // Use child, session after await to keep them alive
               drop(child);
               drop(session);
           });
   
           let fut = future.as_mut();
           let (stdin, stdout) = std::future::poll_fn(|cx| {
               loop {
                   fut.as_mut().poll(cx);
                   if let Some(once_cell) = Arc::get_mut(&mut once_cell) {
                       // future must have set some value before dropping
                       // once_cell_cloned
                       break once_cell.take().unwrap()
                   }
               }
           }).await?;
           let sftp = Sftp::new_with_auxiliary(
               stdin,
               stdout,
               Default::default(),
               openssh_sftp_client::SftpAuxiliaryData::PinnedFuture(future),
           )
           .await?;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to