This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 6e171fddee refactor(core)!: Make oio::Write always write all given
buffer (#4880)
6e171fddee is described below
commit 6e171fddeef675abc65b135747ee17d093781035
Author: Xuanwo <[email protected]>
AuthorDate: Fri Jul 12 13:48:48 2024 +0800
refactor(core)!: Make oio::Write always write all given buffer (#4880)
* Remove returning n in write
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
* Fix write
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/async_backtrace.rs | 4 +-
core/src/layers/await_tree.rs | 4 +-
core/src/layers/blocking.rs | 2 +-
core/src/layers/complete.rs | 7 +-
core/src/layers/concurrent_limit.rs | 4 +-
core/src/layers/dtrace.rs | 14 ++-
core/src/layers/error_context.rs | 14 ++-
core/src/layers/logging.rs | 25 +++---
core/src/layers/metrics.rs | 17 ++--
core/src/layers/minitrace.rs | 4 +-
core/src/layers/oteltrace.rs | 4 +-
core/src/layers/prometheus.rs | 20 +++--
core/src/layers/prometheus_client.rs | 16 ++--
core/src/layers/retry.rs | 8 +-
core/src/layers/throttle.rs | 4 +-
core/src/layers/timeout.rs | 2 +-
core/src/layers/tracing.rs | 4 +-
core/src/raw/adapters/kv/backend.rs | 10 +--
core/src/raw/adapters/typed_kv/backend.rs | 10 +--
core/src/raw/enum_utils.rs | 4 +-
core/src/raw/oio/write/api.rs | 30 ++-----
core/src/raw/oio/write/append_write.rs | 8 +-
core/src/raw/oio/write/block_write.rs | 10 +--
core/src/raw/oio/write/multipart_write.rs | 10 +--
core/src/raw/oio/write/one_shot_write.rs | 5 +-
core/src/raw/oio/write/position_write.rs | 10 +--
core/src/raw/oio/write/range_write.rs | 10 +--
core/src/services/aliyun_drive/writer.rs | 6 +-
core/src/services/alluxio/writer.rs | 7 +-
core/src/services/compfs/writer.rs | 24 +++---
core/src/services/fs/writer.rs | 19 +++--
core/src/services/ftp/writer.rs | 25 +++---
core/src/services/ghac/writer.rs | 4 +-
core/src/services/hdfs/writer.rs | 18 +++-
core/src/services/hdfs_native/writer.rs | 2 +-
core/src/services/sftp/writer.rs | 13 ++-
core/src/types/blocking_write/std_writer.rs | 5 +-
core/src/types/context/write.rs | 127 +++++-----------------------
core/src/types/write/writer.rs | 9 +-
39 files changed, 221 insertions(+), 298 deletions(-)
diff --git a/core/src/layers/async_backtrace.rs
b/core/src/layers/async_backtrace.rs
index 6d77591ca0..290171c29c 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
AsyncBacktraceWrapper<R> {
impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
#[async_backtrace::framed]
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}
@@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R>
{
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 7bb20d42fb..58fd73e8fe 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
AwaitTreeWrapper<R> {
}
impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> +
MaybeSend {
+ fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}",
WriteOperation::Write.into_static()))
@@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 70830981c8..7293ced965 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for
BlockingWrapper<I> {
}
impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.handle.block_on(self.inner.write(bs))
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index c2156a0418..68b0340aa7 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
@@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- let n = w.write(bs)?;
- Ok(n)
+ w.write(bs)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index a1a61ad01d..87ad19b50c 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}
@@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}
diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs
index c71277ed9a..51b001313b 100644
--- a/core/src/layers/dtrace.rs
+++ b/core/src/layers/dtrace.rs
@@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
DtraceLayerWrapper<R> {
}
impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
.await
- .map(|n| {
- probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
- n
+ .map(|_| {
+ probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
@@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
- .map(|n| {
- probe_lazy!(opendal, blocking_writer_write_ok,
c_path.as_ptr(), n);
- n
+ .map(|_| {
+ probe_lazy!(opendal, blocking_writer_write_ok,
c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, blocking_writer_write_error,
c_path.as_ptr());
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 86ae9dba80..cabe84b053 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
}
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
.await
- .map(|n| {
- self.processed += n as u64;
- n
+ .map(|_| {
+ self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
@@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T>
{
}
impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
- .map(|n| {
- self.processed += n as u64;
- n
+ .map(|_| {
+ self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1e0d80d264..507745c6d1 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> {
}
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
- match self.inner.write(bs.clone()).await {
- Ok(n) => {
- self.written += n as u64;
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ let size = bs.len();
+ match self.inner.write(bs).await {
+ Ok(_) => {
trace!(
target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> input data
{}B, write {}B",
+ "service={} operation={} path={} written={}B -> data write
{}B",
self.ctx.scheme,
WriteOperation::Write,
self.path,
self.written,
- bs.len(),
- n,
+ size,
);
- Ok(n)
+ Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
match self.inner.write(bs.clone()) {
- Ok(n) => {
- self.written += n as u64;
+ Ok(_) => {
trace!(
target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> input data
{}B, write {}B",
+ "service={} operation={} path={} written={}B -> data write
{}B",
self.ctx.scheme,
WriteOperation::BlockingWrite,
self.path,
self.written,
bs.len(),
- n
);
- Ok(n)
+ Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 47d7a8a936..decbad81a8 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MetricWrapper<R> {
}
impl<R: oio::Write> oio::Write for MetricWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let start = Instant::now();
+ let size = bs.len();
self.inner
.write(bs)
.await
- .map(|n| {
- self.bytes_counter.increment(n as u64);
+ .map(|_| {
+ self.bytes_counter.increment(size as u64);
self.requests_duration_seconds
.record(start.elapsed().as_secs_f64());
- n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
@@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
+ let size = bs.len();
+
self.inner
.write(bs)
- .map(|n| {
- self.bytes_counter.increment(n as u64);
- n
+ .map(|_| {
+ self.bytes_counter.increment(size as u64);
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 983d18b6b8..bca4a2e584 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MinitraceWrapper<R> {
}
impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> +
MaybeSend {
+ fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
@@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 6fb5d582ba..b04ad22fdd 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
OtelTraceWrapper<R> {
}
impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> +
MaybeSend {
+ fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend {
self.inner.write(bs)
}
@@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index ee740d55c4..a4359ead72 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ let size = bs.len();
+
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Write.into_static(),
@@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
timer.observe_duration();
match res {
- Ok(n) => {
+ Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
- .observe(n as f64);
- Ok(n)
+ .observe(size as f64);
+ Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
@@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
+ let size = bs.len();
+
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
@@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
PrometheusMetricWrapper<R> {
timer.observe_duration();
match res {
- Ok(n) => {
+ Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
- .observe(n as f64);
- Ok(n)
+ .observe(size as f64);
+ Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/prometheus_client.rs
b/core/src/layers/prometheus_client.rs
index 463e409275..58fef7e894 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -627,24 +627,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let start = Instant::now();
+ let size = bs.len();
self.inner
.write(bs)
.await
- .map(|n| {
+ .map(|_| {
self.metrics.observe_bytes_total(
self.scheme,
WriteOperation::Write.into_static(),
- n,
+ size,
);
self.metrics.observe_request_duration(
self.scheme,
WriteOperation::Write.into_static(),
start.elapsed(),
);
- n
})
.map_err(|err| {
self.metrics.increment_errors_total(
@@ -704,23 +704,23 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let start = Instant::now();
+ let size = bs.len();
self.inner
.write(bs)
- .map(|n| {
+ .map(|_| {
self.metrics.observe_bytes_total(
self.scheme,
WriteOperation::BlockingWrite.into_static(),
- n,
+ size,
);
self.metrics.observe_request_duration(
self.scheme,
WriteOperation::BlockingWrite.into_static(),
start.elapsed(),
);
- n
})
.map_err(|err| {
self.metrics.increment_errors_total(
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index c48307fdbc..4d42098394 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -626,7 +626,7 @@ impl<R: oio::BlockingRead, I: RetryInterceptor>
oio::BlockingRead for RetryWrapp
}
impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
use backon::RetryableWithContext;
let inner = self.take_inner()?;
@@ -694,7 +694,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
}
impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for
RetryWrapper<R, I> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
{ || self.inner.as_mut().unwrap().write(bs.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
@@ -938,8 +938,8 @@ mod tests {
struct MockWriter {}
impl oio::Write for MockWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
- Ok(bs.len())
+ async fn write(&mut self, _: Buffer) -> Result<()> {
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3b66e327f3..f73f33d3ca 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ThrottleWrapper<R> {
}
impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
loop {
@@ -226,7 +226,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
loop {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 246049dfbf..1cbc0c5ac1 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -350,7 +350,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
}
impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let fut = self.inner.write(bs);
Self::io_timeout(self.timeout, WriteOperation::Write.into_static(),
fut).await
}
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 4ba829a6ec..4a2dc4bc00 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -286,7 +286,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> +
MaybeSend {
+ fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend {
self.inner.write(bs)
}
@@ -312,7 +312,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index bb08e4cb19..625e7ea982 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -242,10 +242,9 @@ impl<S> KvWriter<S> {
unsafe impl<S: Adapter> Sync for KvWriter<S> {}
impl<S: Adapter> oio::Write for KvWriter<S> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
- let ret = bs.len();
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
self.buffer.push(bs);
- Ok(ret)
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -260,10 +259,9 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
- let ret = bs.len();
+ fn write(&mut self, bs: Buffer) -> Result<()> {
self.buffer.push(bs);
- Ok(ret)
+ Ok(())
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index ecce2eb879..fd6271691b 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -275,12 +275,11 @@ impl<S> KvWriter<S> {
}
impl<S: Adapter> oio::Write for KvWriter<S> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
- let size = bs.len();
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);
- Ok(size)
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -303,12 +302,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
- let size = bs.len();
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let mut buf = self.buf.take().unwrap_or_default();
buf.push(bs);
self.buf = Some(buf);
- Ok(size)
+ Ok(())
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index c22411904d..111da78be0 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -70,7 +70,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead>
oio::BlockingRead for TwoWa
}
impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
match self {
Self::One(v) => v.write(bs).await,
Self::Two(v) => v.write(bs).await,
@@ -129,7 +129,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE:
oio::BlockingRead> o
impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
for ThreeWays<ONE, TWO, THREE>
{
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
match self {
Self::One(v) => v.write(bs).await,
Self::Two(v) => v.write(bs).await,
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index e6c7c05918..4ec53adab6 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync {
///
/// # Behavior
///
- /// - `Ok(n)` means `n` bytes has been written successfully.
+ /// - `Ok(())` means all bytes has been written successfully.
/// - `Err(err)` means error happens and no bytes has been written.
- ///
- /// It's possible that `n < bs.len()`, caller should pass the remaining
bytes
- /// repeatedly until all bytes has been written.
- #[cfg(not(target_arch = "wasm32"))]
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> +
MaybeSend;
- #[cfg(target_arch = "wasm32")]
- fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>>;
+ fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend;
/// Close the writer and make sure all data has been flushed.
- #[cfg(not(target_arch = "wasm32"))]
fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
- #[cfg(target_arch = "wasm32")]
- fn close(&mut self) -> impl Future<Output = Result<()>>;
/// Abort the pending writer.
- #[cfg(not(target_arch = "wasm32"))]
fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
- #[cfg(target_arch = "wasm32")]
- fn abort(&mut self) -> impl Future<Output = Result<()>>;
}
impl Write for () {
- async fn write(&mut self, _: Buffer) -> Result<usize> {
+ async fn write(&mut self, _: Buffer) -> Result<()> {
unimplemented!("write is required to be implemented for oio::Write")
}
@@ -121,7 +109,7 @@ impl Write for () {
}
pub trait WriteDyn: Unpin + Send + Sync {
- fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>>;
+ fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>>;
fn close_dyn(&mut self) -> BoxedFuture<Result<()>>;
@@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync {
}
impl<T: Write + ?Sized> WriteDyn for T {
- fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<usize>> {
+ fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture<Result<()>> {
Box::pin(self.write(bs))
}
@@ -143,7 +131,7 @@ impl<T: Write + ?Sized> WriteDyn for T {
}
impl<T: WriteDyn + ?Sized> Write for Box<T> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
self.deref_mut().write_dyn(bs).await
}
@@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static {
///
/// It's possible that `n < bs.len()`, caller should pass the remaining
bytes
/// repeatedly until all bytes has been written.
- fn write(&mut self, bs: Buffer) -> Result<usize>;
+ fn write(&mut self, bs: Buffer) -> Result<()>;
/// Close the writer and make sure all data has been flushed.
fn close(&mut self) -> Result<()>;
}
impl BlockingWrite for () {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
let _ = bs;
unimplemented!("write is required to be implemented for
oio::BlockingWrite")
@@ -195,7 +183,7 @@ impl BlockingWrite for () {
///
/// To make BlockingWriter work as expected, we must add this impl.
impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, bs: Buffer) -> Result<()> {
(**self).write(bs)
}
diff --git a/core/src/raw/oio/write/append_write.rs
b/core/src/raw/oio/write/append_write.rs
index 2f48b68307..06c72cc5e2 100644
--- a/core/src/raw/oio/write/append_write.rs
+++ b/core/src/raw/oio/write/append_write.rs
@@ -80,7 +80,7 @@ impl<W> oio::Write for AppendWriter<W>
where
W: AppendWrite,
{
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let offset = match self.offset {
Some(offset) => offset,
None => {
@@ -91,12 +91,10 @@ where
};
let size = bs.len();
- self.inner
- .append(offset, size as u64, Buffer::from(bs.to_bytes()))
- .await?;
+ self.inner.append(offset, size as u64, bs).await?;
// Update offset after succeed.
self.offset = Some(offset + size as u64);
- Ok(size)
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/block_write.rs
b/core/src/raw/oio/write/block_write.rs
index 99c76562ca..cd0ec43b45 100644
--- a/core/src/raw/oio/write/block_write.rs
+++ b/core/src/raw/oio/write/block_write.rs
@@ -162,10 +162,10 @@ impl<W> oio::Write for BlockWriter<W>
where
W: BlockWrite,
{
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
if !self.started && self.cache.is_none() {
- let size = self.fill_cache(bs);
- return Ok(size);
+ self.fill_cache(bs);
+ return Ok(());
}
// The block upload process has been started.
@@ -181,8 +181,8 @@ where
})
.await?;
self.cache = None;
- let size = self.fill_cache(bs);
- Ok(size)
+ self.fill_cache(bs);
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/multipart_write.rs
b/core/src/raw/oio/write/multipart_write.rs
index 0d893d7cb3..44a33c7a4b 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -203,14 +203,14 @@ impl<W> oio::Write for MultipartWriter<W>
where
W: MultipartWrite,
{
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let upload_id = match self.upload_id.clone() {
Some(v) => v,
None => {
// Fill cache with the first write.
if self.cache.is_none() {
- let size = self.fill_cache(bs);
- return Ok(size);
+ self.fill_cache(bs);
+ return Ok(());
}
let upload_id = self.w.initiate_part().await?;
@@ -234,8 +234,8 @@ where
.await?;
self.cache = None;
self.next_part_number += 1;
- let size = self.fill_cache(bs);
- Ok(size)
+ self.fill_cache(bs);
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index cd056c1461..938973c33a 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -50,16 +50,15 @@ impl<W: OneShotWrite> OneShotWriter<W> {
}
impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
match &self.buffer {
Some(_) => Err(Error::new(
ErrorKind::Unsupported,
"OneShotWriter doesn't support multiple write",
)),
None => {
- let size = bs.len();
self.buffer = Some(bs);
- Ok(size)
+ Ok(())
}
}
}
diff --git a/core/src/raw/oio/write/position_write.rs
b/core/src/raw/oio/write/position_write.rs
index 3dbf5c93ef..5aa5ff3294 100644
--- a/core/src/raw/oio/write/position_write.rs
+++ b/core/src/raw/oio/write/position_write.rs
@@ -124,10 +124,10 @@ impl<W: PositionWrite> PositionWriter<W> {
}
impl<W: PositionWrite> oio::Write for PositionWriter<W> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
if self.cache.is_none() {
- let size = self.fill_cache(bs);
- return Ok(size);
+ let _ = self.fill_cache(bs);
+ return Ok(());
}
let bytes = self.cache.clone().expect("pending write must exist");
@@ -144,8 +144,8 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
.await?;
self.cache = None;
self.next_offset += length;
- let size = self.fill_cache(bs);
- Ok(size)
+ let _ = self.fill_cache(bs);
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 67ae619dd9..f44f06ad9c 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -155,14 +155,14 @@ impl<W: RangeWrite> RangeWriter<W> {
}
impl<W: RangeWrite> oio::Write for RangeWriter<W> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let location = match self.location.clone() {
Some(location) => location,
None => {
// Fill cache with the first write.
if self.cache.is_none() {
- let size = self.fill_cache(bs);
- return Ok(size);
+ self.fill_cache(bs);
+ return Ok(());
}
let location = self.w.initiate_range().await?;
@@ -187,8 +187,8 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
.await?;
self.cache = None;
self.next_offset += length;
- let size = self.fill_cache(bs);
- Ok(size)
+ self.fill_cache(bs);
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/aliyun_drive/writer.rs
b/core/src/services/aliyun_drive/writer.rs
index 30ca2ef94e..461764541f 100644
--- a/core/src/services/aliyun_drive/writer.rs
+++ b/core/src/services/aliyun_drive/writer.rs
@@ -51,7 +51,7 @@ impl AliyunDriveWriter {
}
impl oio::Write for AliyunDriveWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let (upload_id, file_id) = match (self.upload_id.as_ref(),
self.file_id.as_ref()) {
(Some(upload_id), Some(file_id)) => (upload_id, file_id),
_ => {
@@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter {
return Err(Error::new(ErrorKind::Unexpected, "cannot find
upload_url"));
};
- let size = bs.len();
-
if let Err(err) = self.core.upload(upload_url, bs).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
@@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter {
self.part_number += 1;
- Ok(size)
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/alluxio/writer.rs
b/core/src/services/alluxio/writer.rs
index e5ca807b60..f452b6b10b 100644
--- a/core/src/services/alluxio/writer.rs
+++ b/core/src/services/alluxio/writer.rs
@@ -43,7 +43,7 @@ impl AlluxioWriter {
}
impl oio::Write for AlluxioWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let stream_id = match self.stream_id {
Some(stream_id) => stream_id,
None => {
@@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter {
stream_id
}
};
- self.core
- .write(stream_id, Buffer::from(bs.to_bytes()))
- .await
+ self.core.write(stream_id, bs).await?;
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/compfs/writer.rs
b/core/src/services/compfs/writer.rs
index 2e12ea2dc7..749cab10ff 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use std::{io::Cursor, sync::Arc};
-
-use compio::{buf::buf_try, fs::File, io::AsyncWrite};
-
use super::core::CompfsCore;
use crate::raw::*;
use crate::*;
+use compio::io::AsyncWriteExt;
+use compio::{buf::buf_try, fs::File};
+use std::{io::Cursor, sync::Arc};
#[derive(Debug)]
pub struct CompfsWriter {
@@ -36,17 +35,22 @@ impl CompfsWriter {
}
impl oio::Write for CompfsWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ /// FIXME
+ ///
+ /// the write_all doesn't work correctly if `bs` is non-contiguous.
+ ///
+ /// The IoBuf::buf_len() only returns the length of the current buffer.
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let mut file = self.file.clone();
- let n = self
- .core
+ self.core
.exec(move || async move {
- let (n, _) = buf_try!(@try file.write(bs).await);
- Ok(n)
+ buf_try!(@try file.write_all(bs).await);
+ Ok(())
})
.await?;
- Ok(n)
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index b9e84d736c..d0d1bc6674 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -53,11 +53,15 @@ impl<F> FsWriter<F> {
unsafe impl<F> Sync for FsWriter<F> {}
impl oio::Write for FsWriter<tokio::fs::File> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, mut bs: Buffer) -> Result<()> {
let f = self.f.as_mut().expect("FsWriter must be initialized");
- // TODO: use write_vectored instead.
- f.write(bs.chunk()).await.map_err(new_std_io_error)
+ while bs.has_remaining() {
+ let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+ bs.advance(n);
+ }
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -88,10 +92,15 @@ impl oio::Write for FsWriter<tokio::fs::File> {
}
impl oio::BlockingWrite for FsWriter<std::fs::File> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, mut bs: Buffer) -> Result<()> {
let f = self.f.as_mut().expect("FsWriter must be initialized");
- f.write(bs.chunk()).map_err(new_std_io_error)
+ while bs.has_remaining() {
+ let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+ bs.advance(n);
+ }
+
+ Ok(())
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ec21609c18..f7f6864dd9 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,7 +53,7 @@ impl FtpWriter {
}
impl oio::Write for FtpWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, mut bs: Buffer) -> Result<()> {
let path = if let Some(tmp_path) = &self.tmp_path {
tmp_path
} else {
@@ -69,17 +69,20 @@ impl oio::Write for FtpWriter {
));
}
- let size = self
- .data_stream
- .as_mut()
- .unwrap()
- .write(bs.chunk())
- .await
- .map_err(|err| {
- Error::new(ErrorKind::Unexpected, "copy from ftp
stream").set_source(err)
- })?;
+ while bs.has_remaining() {
+ let n = self
+ .data_stream
+ .as_mut()
+ .unwrap()
+ .write(bs.chunk())
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "copy from ftp
stream").set_source(err)
+ })?;
+ bs.advance(n);
+ }
- Ok(size)
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index 00bce7fac3..3b58dd0fc2 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -38,7 +38,7 @@ impl GhacWriter {
}
impl oio::Write for GhacWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
let offset = self.size;
@@ -61,7 +61,7 @@ impl oio::Write for GhacWriter {
}
self.size += size as u64;
- Ok(size)
+ Ok(())
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 96708c1ab1..0f60014f41 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -53,10 +53,15 @@ impl<F> HdfsWriter<F> {
}
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, mut bs: Buffer) -> Result<()> {
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
- f.write(bs.chunk()).await.map_err(new_std_io_error)
+ while bs.has_remaining() {
+ let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+ bs.advance(n);
+ }
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
}
impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
- fn write(&mut self, bs: Buffer) -> Result<usize> {
+ fn write(&mut self, mut bs: Buffer) -> Result<()> {
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
- f.write(bs.chunk()).map_err(new_std_io_error)
+ while bs.has_remaining() {
+ let n = f.write(bs.chunk()).map_err(new_std_io_error)?;
+ bs.advance(n);
+ }
+
+ Ok(())
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/hdfs_native/writer.rs
b/core/src/services/hdfs_native/writer.rs
index e6fb0205e4..4cab45b3be 100644
--- a/core/src/services/hdfs_native/writer.rs
+++ b/core/src/services/hdfs_native/writer.rs
@@ -31,7 +31,7 @@ impl HdfsNativeWriter {
}
impl oio::Write for HdfsNativeWriter {
- async fn write(&mut self, _bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, _bs: Buffer) -> Result<()> {
todo!()
}
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 69bff86245..9b86c78957 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -39,8 +39,17 @@ impl SftpWriter {
}
impl oio::Write for SftpWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
- self.file.write(bs.chunk()).await.map_err(new_std_io_error)
+ async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+ while bs.has_remaining() {
+ let n = self
+ .file
+ .write(bs.chunk())
+ .await
+ .map_err(new_std_io_error)?;
+ bs.advance(n);
+ }
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/types/blocking_write/std_writer.rs
b/core/src/types/blocking_write/std_writer.rs
index 5b18467e36..91accbde53 100644
--- a/core/src/types/blocking_write/std_writer.rs
+++ b/core/src/types/blocking_write/std_writer.rs
@@ -103,10 +103,9 @@ impl Write for StdWriter {
return Ok(());
};
- let n = w
- .write(Buffer::from(bs))
+ w.write(Buffer::from(bs))
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other,
err))?;
- self.buf.advance(n);
+ self.buf.clean();
}
}
}
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index 557341da3d..6e2464cfce 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -18,7 +18,6 @@
use crate::raw::oio::Write;
use crate::raw::*;
use crate::*;
-use bytes::Buf;
use std::sync::Arc;
/// WriteContext holds the immutable context for give write operation.
@@ -136,7 +135,9 @@ impl WriteGenerator<oio::Writer> {
/// Write the entire buffer into writer.
pub async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
let Some(chunk_size) = self.chunk_size else {
- return self.w.write_dyn(bs).await;
+ let size = bs.len();
+ self.w.write_dyn(bs).await?;
+ return Ok(size);
};
if self.buffer.len() + bs.len() < chunk_size {
@@ -153,10 +154,8 @@ impl WriteGenerator<oio::Writer> {
if !self.exact {
let fill_size = bs.len();
self.buffer.push(bs);
- let mut buf = self.buffer.take().collect();
- let written = self.w.write_dyn(buf.clone()).await?;
- buf.advance(written);
- self.buffer.push(buf);
+ let buf = self.buffer.take().collect();
+ self.w.write_dyn(buf).await?;
return Ok(fill_size);
}
@@ -167,10 +166,8 @@ impl WriteGenerator<oio::Writer> {
// Action:
// - write existing buffer in chunk_size to make more rooms for
writing data.
if self.buffer.len() >= chunk_size {
- let mut buf = self.buffer.take().collect();
- let written = self.w.write_dyn(buf.clone()).await?;
- buf.advance(written);
- self.buffer.push(buf);
+ let buf = self.buffer.take().collect();
+ self.w.write_dyn(buf).await?;
}
// Condition
@@ -192,8 +189,8 @@ impl WriteGenerator<oio::Writer> {
break;
}
- let written =
self.w.write_dyn(self.buffer.clone().collect()).await?;
- self.buffer.advance(written);
+ let buf = self.buffer.take().collect();
+ self.w.write_dyn(buf).await?;
}
self.w.close().await
@@ -225,7 +222,9 @@ impl WriteGenerator<oio::BlockingWriter> {
/// Write the entire buffer into writer.
pub fn write(&mut self, mut bs: Buffer) -> Result<usize> {
let Some(chunk_size) = self.chunk_size else {
- return self.w.write(bs);
+ let size = bs.len();
+ self.w.write(bs)?;
+ return Ok(size);
};
if self.buffer.len() + bs.len() < chunk_size {
@@ -242,10 +241,8 @@ impl WriteGenerator<oio::BlockingWriter> {
if !self.exact {
let fill_size = bs.len();
self.buffer.push(bs);
- let mut buf = self.buffer.take().collect();
- let written = self.w.write(buf.clone())?;
- buf.advance(written);
- self.buffer.push(buf);
+ let buf = self.buffer.take().collect();
+ self.w.write(buf)?;
return Ok(fill_size);
}
@@ -256,10 +253,8 @@ impl WriteGenerator<oio::BlockingWriter> {
// Action:
// - write existing buffer in chunk_size to make more rooms for
writing data.
if self.buffer.len() >= chunk_size {
- let mut buf = self.buffer.take().collect();
- let written = self.w.write(buf.clone())?;
- buf.advance(written);
- self.buffer.push(buf);
+ let buf = self.buffer.take().collect();
+ self.w.write(buf)?;
}
// Condition
@@ -281,8 +276,8 @@ impl WriteGenerator<oio::BlockingWriter> {
break;
}
- let written = self.w.write(self.buffer.clone().collect())?;
- self.buffer.advance(written);
+ let buf = self.buffer.take().collect();
+ self.w.write(buf)?;
}
self.w.close()
@@ -293,8 +288,8 @@ impl WriteGenerator<oio::BlockingWriter> {
mod tests {
use super::*;
use crate::raw::oio::Write;
- use bytes::Buf;
use bytes::Bytes;
+ use bytes::{Buf, BufMut};
use log::debug;
use pretty_assertions::assert_eq;
use rand::thread_rng;
@@ -309,13 +304,12 @@ mod tests {
}
impl Write for MockWriter {
- async fn write(&mut self, bs: Buffer) -> Result<usize> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
- let chunk = bs.chunk();
let mut buf = self.buf.lock().await;
- buf.extend_from_slice(chunk);
- Ok(chunk.len())
+ buf.put(bs);
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -478,83 +472,6 @@ mod tests {
Ok(())
}
- struct PartialWriter {
- buf: Arc<Mutex<Vec<u8>>>,
- }
-
- impl Write for PartialWriter {
- async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
- let mut buf = self.buf.lock().await;
-
- if Buffer::count(&bs) > 1 {
- // Always leaves last buffer for non-contiguous buffer.
- let mut written = 0;
- while Buffer::count(&bs) > 1 {
- let chunk = bs.chunk();
- buf.extend_from_slice(chunk);
- written += chunk.len();
- bs.advance(chunk.len());
- }
- Ok(written)
- } else {
- let chunk = bs.chunk();
- buf.extend_from_slice(chunk);
- Ok(chunk.len())
- }
- }
-
- async fn close(&mut self) -> Result<()> {
- Ok(())
- }
-
- async fn abort(&mut self) -> Result<()> {
- Ok(())
- }
- }
-
- #[tokio::test]
- async fn test_inexact_buf_writer_partial_send() -> Result<()> {
- let _ = tracing_subscriber::fmt()
- .pretty()
- .with_test_writer()
- .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
- .try_init();
-
- let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(
- Box::new(PartialWriter { buf: buf.clone() }),
- Some(10),
- false,
- );
-
- let mut rng = thread_rng();
- let mut expected = vec![];
-
- let mut new_content = |size| {
- let mut content = vec![0; size];
- rng.fill_bytes(&mut content);
- expected.extend_from_slice(&content);
- Bytes::from(content)
- };
-
- // content < chunk size.
- let content = new_content(5);
- assert_eq!(5, w.write(content.into()).await?);
- // Non-contiguous buffer.
- let content = Buffer::from(vec![new_content(3), new_content(2)]);
- assert_eq!(5, w.write(content).await?);
-
- w.close().await?;
-
- let buf = buf.lock().await;
- assert_eq!(buf.len(), expected.len());
- assert_eq!(
- format!("{:x}", Sha256::digest(&*buf)),
- format!("{:x}", Sha256::digest(&expected))
- );
- Ok(())
- }
-
#[tokio::test]
async fn test_fuzz_exact_buf_writer() -> Result<()> {
let _ = tracing_subscriber::fmt()
diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs
index dc81d3d716..114ec886aa 100644
--- a/core/src/types/write/writer.rs
+++ b/core/src/types/write/writer.rs
@@ -141,6 +141,7 @@ impl Writer {
let n = self.inner.write(bs.clone()).await?;
bs.advance(n);
}
+
Ok(())
}
@@ -153,12 +154,8 @@ impl Writer {
/// Optimize this function to avoid unnecessary copy.
pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> {
let mut bs = bs;
- let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
- while !bs.is_empty() {
- let n = self.inner.write(bs.clone()).await?;
- bs.advance(n);
- }
- Ok(())
+ let bs = Buffer::from(bs.copy_to_bytes(bs.remaining()));
+ self.write(bs).await
}
/// Abort the writer and clean up all written data.