This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch xuanwo/unftp-sbe-copy-loop-mem in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 4fb5d5d9c988473b8b258c3eefb8950a2ddd7bae Author: Xuanwo <[email protected]> AuthorDate: Mon Feb 23 14:22:30 2026 +0800 fix(integrations/unftp-sbe): avoid copy flush amplification --- integrations/unftp-sbe/src/lib.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/integrations/unftp-sbe/src/lib.rs b/integrations/unftp-sbe/src/lib.rs index 80c443e55..58642a1d4 100644 --- a/integrations/unftp-sbe/src/lib.rs +++ b/integrations/unftp-sbe/src/lib.rs @@ -64,6 +64,7 @@ use libunftp::auth::UserDetail; use libunftp::storage::{self, Error, StorageBackend}; use opendal::Operator; +use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; @@ -137,6 +138,25 @@ fn convert_path(path: &Path) -> storage::Result<&str> { }) } +async fn copy_read_write_loop<R, W>(input: &mut R, output: &mut W) -> std::io::Result<u64> +where + R: tokio::io::AsyncRead + Unpin + ?Sized, + W: tokio::io::AsyncWrite + Unpin + ?Sized, +{ + let mut copied = 0u64; + let mut buf = [0u8; 8 * 1024]; + + loop { + let n = input.read(&mut buf).await?; + if n == 0 { + return Ok(copied); + } + + output.write_all(&buf[..n]).await?; + copied += n as u64; + } +} + #[async_trait::async_trait] impl<User: UserDetail> StorageBackend<User> for OpendalStorage { type Metadata = OpendalMetadata; @@ -214,7 +234,8 @@ impl<User: UserDetail> StorageBackend<User> for OpendalStorage { .map_err(convert_err)? .into_futures_async_write() .compat_write(); - let copy_result = tokio::io::copy(&mut input, &mut w).await; + // Avoid `tokio::io::copy`'s pending-read flush path and keep buffering policy explicit. + let copy_result = copy_read_write_loop(&mut input, &mut w).await; let shutdown_result = w.shutdown().await; match (copy_result, shutdown_result) { (Ok(len), Ok(())) => Ok(len),
