This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f021b71e5 fix(integrations/unftp-sbe): avoid copy flush amplification
(#7217)
f021b71e5 is described below
commit f021b71e5a886a312cb9886c4a7645b60d389713
Author: Xuanwo <[email protected]>
AuthorDate: Mon Feb 23 20:20:15 2026 +0800
fix(integrations/unftp-sbe): avoid copy flush amplification (#7217)
---
integrations/unftp-sbe/src/lib.rs | 23 ++++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/integrations/unftp-sbe/src/lib.rs
b/integrations/unftp-sbe/src/lib.rs
index bb767f454..55e4d01ab 100644
--- a/integrations/unftp-sbe/src/lib.rs
+++ b/integrations/unftp-sbe/src/lib.rs
@@ -64,6 +64,7 @@ use opendal::Operator;
use unftp_core::auth::UserDetail;
use unftp_core::storage::{self, Error, StorageBackend};
+use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt,
FuturesAsyncWriteCompatExt};
@@ -137,6 +138,25 @@ fn convert_path(path: &Path) -> storage::Result<&str> {
})
}
+async fn copy_read_write_loop<R, W>(input: &mut R, output: &mut W) ->
std::io::Result<u64>
+where
+ R: tokio::io::AsyncRead + Unpin + ?Sized,
+ W: tokio::io::AsyncWrite + Unpin + ?Sized,
+{
+ let mut copied = 0u64;
+ let mut buf = [0u8; 8 * 1024];
+
+ loop {
+ let n = input.read(&mut buf).await?;
+ if n == 0 {
+ return Ok(copied);
+ }
+
+ output.write_all(&buf[..n]).await?;
+ copied += n as u64;
+ }
+}
+
#[async_trait::async_trait]
impl<User: UserDetail> StorageBackend<User> for OpendalStorage {
type Metadata = OpendalMetadata;
@@ -214,7 +234,8 @@ impl<User: UserDetail> StorageBackend<User> for
OpendalStorage {
.map_err(convert_err)?
.into_futures_async_write()
.compat_write();
- let copy_result = tokio::io::copy(&mut input, &mut w).await;
+ // Avoid `tokio::io::copy`'s pending-read flush path and keep
buffering policy explicit.
+ let copy_result = copy_read_write_loop(&mut input, &mut w).await;
let shutdown_result = w.shutdown().await;
match (copy_result, shutdown_result) {
(Ok(len), Ok(())) => Ok(len),