This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f021b71e5 fix(integrations/unftp-sbe): avoid copy flush amplification 
(#7217)
f021b71e5 is described below

commit f021b71e5a886a312cb9886c4a7645b60d389713
Author: Xuanwo <[email protected]>
AuthorDate: Mon Feb 23 20:20:15 2026 +0800

    fix(integrations/unftp-sbe): avoid copy flush amplification (#7217)
---
 integrations/unftp-sbe/src/lib.rs | 23 ++++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/integrations/unftp-sbe/src/lib.rs 
b/integrations/unftp-sbe/src/lib.rs
index bb767f454..55e4d01ab 100644
--- a/integrations/unftp-sbe/src/lib.rs
+++ b/integrations/unftp-sbe/src/lib.rs
@@ -64,6 +64,7 @@ use opendal::Operator;
 use unftp_core::auth::UserDetail;
 use unftp_core::storage::{self, Error, StorageBackend};
 
+use tokio::io::AsyncReadExt;
 use tokio::io::AsyncWriteExt;
 use tokio_util::compat::{FuturesAsyncReadCompatExt, 
FuturesAsyncWriteCompatExt};
 
@@ -137,6 +138,25 @@ fn convert_path(path: &Path) -> storage::Result<&str> {
     })
 }
 
+async fn copy_read_write_loop<R, W>(input: &mut R, output: &mut W) -> 
std::io::Result<u64>
+where
+    R: tokio::io::AsyncRead + Unpin + ?Sized,
+    W: tokio::io::AsyncWrite + Unpin + ?Sized,
+{
+    let mut copied = 0u64;
+    let mut buf = [0u8; 8 * 1024];
+
+    loop {
+        let n = input.read(&mut buf).await?;
+        if n == 0 {
+            return Ok(copied);
+        }
+
+        output.write_all(&buf[..n]).await?;
+        copied += n as u64;
+    }
+}
+
 #[async_trait::async_trait]
 impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
     type Metadata = OpendalMetadata;
@@ -214,7 +234,8 @@ impl<User: UserDetail> StorageBackend<User> for 
OpendalStorage {
             .map_err(convert_err)?
             .into_futures_async_write()
             .compat_write();
-        let copy_result = tokio::io::copy(&mut input, &mut w).await;
+        // Avoid `tokio::io::copy`'s pending-read flush path and keep 
buffering policy explicit.
+        let copy_result = copy_read_write_loop(&mut input, &mut w).await;
         let shutdown_result = w.shutdown().await;
         match (copy_result, shutdown_result) {
             (Ok(len), Ok(())) => Ok(len),

Reply via email to