This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 1bb63ab67 refactor: Make oio::Write accept Buf instead (#3021)
1bb63ab67 is described below
commit 1bb63ab676293cb20cb90fbb177d58bd25340c8c
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 00:59:16 2023 +0800
refactor: Make oio::Write accept Buf instead (#3021)
---
core/benches/oio/utils.rs | 4 +-
core/benches/oio/write.rs | 4 +-
core/src/layers/blocking.rs | 2 +-
core/src/layers/complete.rs | 33 ++--
core/src/layers/concurrent_limit.rs | 4 +-
core/src/layers/error_context.rs | 4 +-
core/src/layers/logging.rs | 11 +-
core/src/layers/madsim.rs | 2 +-
core/src/layers/metrics.rs | 8 +-
core/src/layers/minitrace.rs | 4 +-
core/src/layers/oteltrace.rs | 4 +-
core/src/layers/prometheus.rs | 4 +-
core/src/layers/retry.rs | 8 +-
core/src/layers/throttle.rs | 8 +-
core/src/layers/timeout.rs | 4 +-
core/src/layers/tracing.rs | 4 +-
core/src/raw/adapters/kv/backend.rs | 24 ++-
core/src/raw/adapters/typed_kv/backend.rs | 52 +++---
core/src/raw/oio/buf.rs | 220 +++++++++++++++++++++++
core/src/raw/oio/mod.rs | 3 +
core/src/raw/oio/write/api.rs | 14 +-
core/src/raw/oio/write/append_object_write.rs | 13 +-
core/src/raw/oio/write/compose_write.rs | 5 +-
core/src/raw/oio/write/exact_buf_write.rs | 54 +++---
core/src/raw/oio/write/multipart_upload_write.rs | 9 +-
core/src/raw/oio/write/one_shot_write.rs | 12 +-
core/src/services/azblob/writer.rs | 11 +-
core/src/services/azdfs/writer.rs | 15 +-
core/src/services/cos/writer.rs | 12 +-
core/src/services/dropbox/writer.rs | 9 +-
core/src/services/fs/writer.rs | 37 +---
core/src/services/ftp/writer.rs | 8 +-
core/src/services/gcs/writer.rs | 12 +-
core/src/services/gdrive/writer.rs | 10 +-
core/src/services/ghac/writer.rs | 14 +-
core/src/services/hdfs/writer.rs | 37 +---
core/src/services/ipmfs/writer.rs | 10 +-
core/src/services/obs/writer.rs | 12 +-
core/src/services/onedrive/writer.rs | 10 +-
core/src/services/oss/writer.rs | 12 +-
core/src/services/s3/writer.rs | 13 +-
core/src/services/sftp/writer.rs | 10 +-
core/src/services/supabase/writer.rs | 12 +-
core/src/services/vercel_artifacts/writer.rs | 9 +-
core/src/services/wasabi/writer.rs | 9 +-
core/src/services/webdav/writer.rs | 8 +-
core/src/services/webhdfs/writer.rs | 9 +-
core/src/types/operator/blocking_operator.rs | 8 +-
core/src/types/operator/operator.rs | 10 +-
core/src/types/writer.rs | 42 ++---
50 files changed, 519 insertions(+), 334 deletions(-)
diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index bde28481e..1671057f0 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -26,8 +26,8 @@ pub struct BlackHoleWriter;
#[async_trait]
impl oio::Write for BlackHoleWriter {
- async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> {
- Ok(bs.len() as u64)
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) ->
opendal::Result<usize> {
+ Ok(bs.remaining())
}
async fn abort(&mut self) -> opendal::Result<()> {
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index a227d4901..8da2a6dd0 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -48,8 +48,8 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
let mut bs = content.clone();
while !bs.is_empty() {
- let n = w.write(bs.clone()).await.unwrap();
- bs.advance(n as usize);
+ let n = w.write(&bs).await.unwrap();
+ bs.advance(n);
}
w.close().await.unwrap();
})
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 6e530023c..6beb8881b 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -196,7 +196,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for
BlockingWrapper<I> {
}
impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.handle.block_on(self.inner.write(bs))
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 83bf12d6a..048905bae 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -711,11 +711,15 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let n = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+ let n = w.write(bs).await?;
+ self.written += n as u64;
if let Some(size) = self.size {
- if self.written + n as u64 > size {
+ if self.written > size {
return Err(Error::new(
ErrorKind::ContentTruncated,
&format!(
@@ -726,11 +730,6 @@ where
}
}
- let w = self.inner.as_mut().ok_or_else(|| {
- Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
- })?;
- let n = w.write(bs).await?;
- self.written += n;
Ok(n)
}
@@ -773,11 +772,15 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let n = bs.len();
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+ let n = w.write(bs)?;
+ self.written += n as u64;
if let Some(size) = self.size {
- if self.written + n as u64 > size {
+ if self.written > size {
return Err(Error::new(
ErrorKind::ContentTruncated,
&format!(
@@ -788,13 +791,7 @@ where
}
}
- let w = self.inner.as_mut().ok_or_else(|| {
- Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
- })?;
-
- w.write(bs)?;
- self.written += n as u64;
- Ok(n as u64)
+ Ok(n)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 99fa4e413..bdc473384 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -285,7 +285,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ConcurrentLimitWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).await
}
@@ -299,7 +299,7 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs)
}
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index e325cd0f7..15d7a0dd0 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -403,7 +403,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
#[async_trait::async_trait]
impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).await.map_err(|err| {
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
@@ -429,7 +429,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}
impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1f6d7456a..f0cea32e8 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1252,10 +1252,10 @@ impl<W> LoggingWriter<W> {
#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
match self.inner.write(bs).await {
Ok(n) => {
- self.written += n;
+ self.written += n as u64;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write
{}B",
@@ -1349,11 +1349,10 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
match self.inner.write(bs) {
Ok(n) => {
- self.written += n;
+ self.written += n as u64;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data write
{}B",
@@ -1361,7 +1360,7 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
WriteOperation::BlockingWrite,
self.path,
self.written,
- size
+ n
);
Ok(n)
}
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 432e4bcf4..b0b5cf179 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -302,7 +302,7 @@ pub struct MadsimWriter {
#[async_trait]
impl oio::Write for MadsimWriter {
- async fn write(&mut self, bs: Bytes) -> crate::Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> crate::Result<usize> {
#[cfg(madsim)]
{
let req = Request::Write(self.path.to_string(), bs);
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 36e4957aa..b16f87a40 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -847,12 +847,12 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MetricWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for MetricWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.await
.map(|n| {
- self.bytes += n;
+ self.bytes += n as u64;
n
})
.map_err(|err| {
@@ -877,11 +877,11 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
- self.bytes += n;
+ self.bytes += n as u64;
n
})
.map_err(|err| {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 149410ac8..f54f40593 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,7 +337,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MinitraceWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.in_span(Span::enter_with_parent(
@@ -369,7 +369,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 0722a11a7..cceca3a69 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -313,7 +313,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
OtelTraceWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).await
}
@@ -327,7 +327,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs)
}
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 0d8f99c22..2e294c9e5 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -662,7 +662,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.await
@@ -695,7 +695,7 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner
.write(bs)
.map(|n| {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 3e207df9a..3746008ab 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -872,11 +872,11 @@ impl<R: oio::BlockingRead, I: RetryInterceptor>
oio::BlockingRead for RetryWrapp
#[async_trait]
impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let mut backoff = self.builder.build();
loop {
- match self.inner.write(bs.clone()).await {
+ match self.inner.write(bs).await {
Ok(v) => return Ok(v),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
@@ -952,8 +952,8 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
}
impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for
RetryWrapper<R, I> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- { || self.inner.write(bs.clone()) }
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ { || self.inner.write(bs) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 021d1cad9..aea7b6381 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -216,8 +216,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ThrottleWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
loop {
match self.limiter.check_n(buf_length) {
@@ -251,8 +251,8 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ThrottleWrapper<R> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let buf_length = NonZeroU32::new(bs.remaining() as u32).unwrap();
loop {
match self.limiter.check_n(buf_length) {
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 68ac5a41d..c02a4d58a 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -322,8 +322,8 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
#[async_trait]
impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let timeout = self.io_timeout(bs.len() as u64);
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let timeout = self.io_timeout(bs.remaining() as u64);
tokio::time::timeout(timeout, self.inner.write(bs))
.await
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index c5d006980..b829f8376 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -324,7 +324,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs).await
}
@@ -350,7 +350,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.inner.write(bs)
}
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index c0823d59e..513bda67e 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use super::Adapter;
use crate::raw::*;
@@ -390,11 +389,14 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
// TODO: we need to support append in the future.
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
- self.buf = Some(bs.into());
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.chunk().len();
- Ok(size as u64)
+ let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
+ buf.extend_from_slice(bs.chunk());
+ self.buf = Some(buf);
+
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
@@ -414,11 +416,15 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
- self.buf = Some(bs.into());
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.chunk().len();
+
+ let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
+ buf.extend_from_slice(bs.chunk());
+
+ self.buf = Some(buf);
- Ok(size as u64)
+ Ok(size)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 948c2b5ad..1e0864de7 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -22,7 +22,6 @@ use bytes::Bytes;
use super::Adapter;
use super::Value;
-use crate::raw::oio::VectorCursor;
use crate::raw::*;
use crate::*;
@@ -363,7 +362,7 @@ pub struct KvWriter<S> {
path: String,
op: OpWrite,
- buf: VectorCursor,
+ buf: Option<Vec<u8>>,
}
impl<S> KvWriter<S> {
@@ -372,11 +371,13 @@ impl<S> KvWriter<S> {
kv,
path,
op,
- buf: VectorCursor::new(),
+ buf: None,
}
}
- fn build(&self) -> Value {
+ fn build(&mut self) -> Value {
+ let value = self.buf.take().map(Bytes::from).unwrap_or_default();
+
let mut metadata = Metadata::new(EntryMode::FILE);
if let Some(v) = self.op.cache_control() {
metadata.set_cache_control(v);
@@ -390,49 +391,58 @@ impl<S> KvWriter<S> {
if let Some(v) = self.op.content_length() {
metadata.set_content_length(v);
} else {
- metadata.set_content_length(self.buf.len() as u64);
+ metadata.set_content_length(value.len() as u64);
}
- Value {
- metadata,
- value: self.buf.peak_all(),
- }
+ Value { metadata, value }
}
}
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
// TODO: we need to support append in the future.
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
- self.buf.push(bs);
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.chunk().len();
+
+ let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
+ buf.extend_from_slice(bs.chunk());
+
+ self.buf = Some(buf);
- Ok(size as u64)
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
- self.buf.clear();
-
+ self.buf = None;
Ok(())
}
async fn close(&mut self) -> Result<()> {
- self.kv.set(&self.path, self.build()).await?;
+ let kv = self.kv.clone();
+ let value = self.build();
+
+ kv.set(&self.path, value).await?;
Ok(())
}
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
- self.buf.push(bs);
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.chunk().len();
+
+ let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
+ buf.extend_from_slice(bs.chunk());
+
+ self.buf = Some(buf);
- Ok(size as u64)
+ Ok(size)
}
fn close(&mut self) -> Result<()> {
- self.kv.blocking_set(&self.path, self.build())?;
+ let kv = self.kv.clone();
+ let value = self.build();
+ kv.blocking_set(&self.path, value)?;
Ok(())
}
}
diff --git a/core/src/raw/oio/buf.rs b/core/src/raw/oio/buf.rs
new file mode 100644
index 000000000..36d67a2e3
--- /dev/null
+++ b/core/src/raw/oio/buf.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+use std::{cmp, ptr};
+
+/// WriteBuf is used in [`oio::Write`] to provide a trait similar to
[`bytes::Buf`].
+///
+/// The biggest difference is that `Buf`'s `copy_to_slice` and `copy_to_bytes`
only needs `&self`
+/// instead of `&mut self`.
+pub trait WriteBuf: Send + Sync {
+ /// Returns the number of bytes between the current position and the end
of the buffer.
+ ///
+ /// This value is greater than or equal to the length of the slice
returned by chunk().
+ ///
+ /// # Notes
+ ///
+ /// Implementations of remaining should ensure that the return value does
not change unless a
+ /// call is made to advance or any other function that is documented to
change the Buf's
+ /// current position.
+ fn remaining(&self) -> usize;
+
+ /// Returns a slice starting at the current position and of length between
0 and
+ /// Buf::remaining(). Note that this can return shorter slice (this allows
non-continuous
+ /// internal representation).
+ ///
+ /// # Notes
+ ///
+ /// This function should never panic. Once the end of the buffer is
reached, i.e.,
+ /// Buf::remaining returns 0, calls to chunk() should return an empty
slice.
+ fn chunk(&self) -> &[u8];
+
+ /// Advance the internal cursor of the Buf
+ ///
+ /// The next call to chunk() will return a slice starting cnt bytes
further into the underlying buffer.
+ ///
+ /// Panics
+ /// This function may panic if cnt > self.remaining().
+ fn advance(&mut self, cnt: usize);
+
+ /// Copies current chunk into dst.
+ ///
+ /// Returns the number of bytes copied.
+ ///
+ /// # Notes
+ ///
+ /// Users should not assume the returned bytes is the same as the
Buf::remaining().
+ fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
+ let src = self.chunk();
+ let size = cmp::min(src.len(), dst.len());
+
+ // # Safety
+ //
+ // `src` and `dst` are guaranteed have enough space for `size` bytes.
+ unsafe {
+ ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), size);
+ }
+
+ size
+ }
+
+ /// Copies current chunk into a bytes.
+ ///
+ /// This function may be optimized by the underlying type to avoid actual
copies.
+ /// For example, Bytes implementation will do a shallow copy (ref-count
increment).
+ ///
+ /// # Notes
+ ///
+ /// Users should not assume the returned bytes is the same as the
Buf::remaining().
+ fn copy_to_bytes(&self, len: usize) -> Bytes {
+ let src = self.chunk();
+ let size = cmp::min(src.len(), len);
+
+ let mut ret = BytesMut::with_capacity(size);
+ ret.extend_from_slice(&src[..size]);
+ ret.freeze()
+ }
+}
+
+macro_rules! deref_forward_buf {
+ () => {
+ fn remaining(&self) -> usize {
+ (**self).remaining()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ (**self).chunk()
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ (**self).advance(cnt)
+ }
+
+ fn copy_to_slice(&self, dst: &mut [u8]) -> usize {
+ (**self).copy_to_slice(dst)
+ }
+
+ fn copy_to_bytes(&self, len: usize) -> Bytes {
+ (**self).copy_to_bytes(len)
+ }
+ };
+}
+
+impl<T: WriteBuf + ?Sized> WriteBuf for &mut T {
+ deref_forward_buf!();
+}
+
+impl<T: WriteBuf + ?Sized> WriteBuf for Box<T> {
+ deref_forward_buf!();
+}
+
+impl WriteBuf for &[u8] {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.len()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ self
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ *self = &self[cnt..];
+ }
+}
+
+impl<T: AsRef<[u8]> + Send + Sync> WriteBuf for std::io::Cursor<T> {
+ fn remaining(&self) -> usize {
+ let len = self.get_ref().as_ref().len();
+ let pos = self.position();
+
+ if pos >= len as u64 {
+ return 0;
+ }
+
+ len - pos as usize
+ }
+
+ fn chunk(&self) -> &[u8] {
+ let len = self.get_ref().as_ref().len();
+ let pos = self.position();
+
+ if pos >= len as u64 {
+ return &[];
+ }
+
+ &self.get_ref().as_ref()[pos as usize..]
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ let pos = (self.position() as usize)
+ .checked_add(cnt)
+ .expect("overflow");
+
+ assert!(pos <= self.get_ref().as_ref().len());
+ self.set_position(pos as u64);
+ }
+}
+
+impl WriteBuf for Bytes {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.len()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ self
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ bytes::Buf::advance(self, cnt)
+ }
+
+ #[inline]
+ fn copy_to_bytes(&self, len: usize) -> Bytes {
+ let size = cmp::min(self.len(), len);
+ self.slice(..size)
+ }
+}
+
+impl WriteBuf for BytesMut {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.len()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ self
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ bytes::Buf::advance(self, cnt)
+ }
+
+ #[inline]
+ fn copy_to_bytes(&self, len: usize) -> Bytes {
+ let size = cmp::min(self.len(), len);
+ Bytes::copy_from_slice(&self[..size])
+ }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 1b24bec9c..8c353ca50 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -41,3 +41,6 @@ pub use cursor::VectorCursor;
mod entry;
pub use entry::Entry;
+
+mod buf;
+pub use buf::WriteBuf;
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index eaf3ba0de..622303ff4 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -19,8 +19,8 @@ use std::fmt::Display;
use std::fmt::Formatter;
use async_trait::async_trait;
-use bytes::Bytes;
+use crate::raw::*;
use crate::*;
/// WriteOperation is the name for APIs of Writer.
@@ -81,7 +81,7 @@ pub trait Write: Unpin + Send + Sync {
///
/// It's possible that `n < bs.len()`, caller should pass the remaining
bytes
/// repeatedly until all bytes has been written.
- async fn write(&mut self, bs: Bytes) -> Result<u64>;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>;
/// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
@@ -92,7 +92,7 @@ pub trait Write: Unpin + Send + Sync {
#[async_trait]
impl Write for () {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let _ = bs;
unimplemented!("write is required to be implemented for oio::Write")
@@ -118,7 +118,7 @@ impl Write for () {
/// To make Writer work as expected, we must add this impl.
#[async_trait]
impl<T: Write + ?Sized> Write for Box<T> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
(**self).write(bs).await
}
@@ -137,14 +137,14 @@ pub type BlockingWriter = Box<dyn BlockingWrite>;
/// BlockingWrite is the trait that OpenDAL returns to callers.
pub trait BlockingWrite: Send + Sync + 'static {
/// Write whole content at once.
- fn write(&mut self, bs: Bytes) -> Result<u64>;
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize>;
/// Close the writer and make sure all data has been flushed.
fn close(&mut self) -> Result<()>;
}
impl BlockingWrite for () {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let _ = bs;
unimplemented!("write is required to be implemented for
oio::BlockingWrite")
@@ -162,7 +162,7 @@ impl BlockingWrite for () {
///
/// To make BlockingWriter work as expected, we must add this impl.
impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
(**self).write(bs)
}
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index d6bb819df..2a5e30609 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use crate::raw::*;
use crate::*;
@@ -78,15 +77,19 @@ impl<W> oio::Write for AppendObjectWriter<W>
where
W: AppendObjectWrite,
{
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let offset = self.offset().await?;
- let size = bs.len() as u64;
+ let size = bs.remaining();
self.inner
- .append(offset, size, AsyncBody::Bytes(bs))
+ .append(
+ offset,
+ size as u64,
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
+ )
.await
- .map(|_| self.offset = Some(offset + size))?;
+ .map(|_| self.offset = Some(offset + size as u64))?;
Ok(size)
}
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
index dac833fcf..e2c6638af 100644
--- a/core/src/raw/oio/write/compose_write.rs
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -39,7 +39,6 @@
//! type_alias_impl_trait has been stabilized.
use async_trait::async_trait;
-use bytes::Bytes;
use crate::raw::*;
use crate::*;
@@ -56,7 +55,7 @@ pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
#[async_trait]
impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
match self {
Self::One(one) => one.write(bs).await,
Self::Two(two) => two.write(bs).await,
@@ -94,7 +93,7 @@ pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write,
THREE: oio::Write> {
impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
for ThreeWaysWriter<ONE, TWO, THREE>
{
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
match self {
Self::One(one) => one.write(bs).await,
Self::Two(two) => two.write(bs).await,
diff --git a/core/src/raw/oio/write/exact_buf_write.rs
b/core/src/raw/oio/write/exact_buf_write.rs
index 7ba788e3d..d4a868472 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,8 +18,9 @@
use std::cmp::min;
use async_trait::async_trait;
-use bytes::{Buf, Bytes, BytesMut};
+use bytes::{Bytes, BytesMut};
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -61,21 +62,20 @@ enum Buffer {
#[async_trait]
impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
- async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
loop {
match &mut self.buffer {
Buffer::Empty => {
- if bs.len() >= self.buffer_size {
- bs.truncate(self.buffer_size);
- self.buffer = Buffer::Consuming(bs);
- return Ok(self.buffer_size as u64);
+ if bs.remaining() >= self.buffer_size {
+ self.buffer =
Buffer::Consuming(bs.copy_to_bytes(self.buffer_size));
+ return Ok(self.buffer_size);
}
- let size = bs.len() as u64;
- let mut fill = BytesMut::with_capacity(bs.len());
- fill.extend_from_slice(&bs);
+ let chunk = bs.chunk();
+ let mut fill = BytesMut::with_capacity(chunk.len());
+ fill.extend_from_slice(chunk);
self.buffer = Buffer::Filling(fill);
- return Ok(size);
+ return Ok(chunk.len());
}
Buffer::Filling(fill) => {
if fill.len() >= self.buffer_size {
@@ -83,18 +83,17 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
continue;
}
- let size = min(self.buffer_size - fill.len(), bs.len());
- fill.extend_from_slice(&bs[..size]);
- bs.advance(size);
- return Ok(size as u64);
+ let size = min(self.buffer_size - fill.len(),
bs.chunk().len());
+ fill.extend_from_slice(&bs.chunk()[..size]);
+ return Ok(size);
}
Buffer::Consuming(consume) => {
// Make sure filled buffer has been flushed.
//
// TODO: maybe we can re-fill it after a successful write.
while !consume.is_empty() {
- let n = self.inner.write(consume.clone()).await?;
- consume.advance(n as usize);
+ let n = self.inner.write(consume).await?;
+ consume.advance(n);
}
self.buffer = Buffer::Empty;
}
@@ -120,8 +119,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
//
// TODO: maybe we can re-fill it after a successful write.
while !consume.is_empty() {
- let n = self.inner.write(consume.clone()).await?;
- consume.advance(n as usize);
+ let n = self.inner.write(&consume).await?;
+ consume.advance(n);
}
self.buffer = Buffer::Empty;
break;
@@ -152,11 +151,14 @@ mod tests {
#[async_trait]
impl Write for MockWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- debug!("test_fuzz_exact_buf_writer: flush size: {}", bs.len());
-
- self.buf.extend_from_slice(&bs);
- Ok(bs.len() as u64)
+ async fn write(&mut self, bs: &dyn WriteBuf) -> Result<usize> {
+ debug!(
+ "test_fuzz_exact_buf_writer: flush size: {}",
+ bs.chunk().len()
+ );
+
+ self.buf.extend_from_slice(bs.chunk());
+ Ok(bs.chunk().len())
}
async fn abort(&mut self) -> Result<()> {
@@ -184,8 +186,8 @@ mod tests {
let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
- let n = w.write(bs.clone()).await?;
- bs.advance(n as usize);
+ let n = w.write(&bs).await?;
+ bs.advance(n);
}
w.close().await?;
@@ -223,7 +225,7 @@ mod tests {
let mut bs = Bytes::from(content.clone());
while !bs.is_empty() {
- let n = writer.write(bs.clone()).await?;
+ let n = writer.write(&bs).await?;
bs.advance(n as usize);
}
}
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 13d1b4aa2..39660a2f7 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use crate::raw::*;
use crate::*;
@@ -120,22 +119,22 @@ impl<W> oio::Write for MultipartUploadWriter<W>
where
W: MultipartUploadWrite,
{
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let upload_id = self.upload_id().await?;
- let size = bs.len();
+ let size = bs.remaining();
self.inner
.write_part(
&upload_id,
self.parts.len(),
size as u64,
- AsyncBody::Bytes(bs),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)
.await
.map(|v| self.parts.push(v))?;
- Ok(size as u64)
+ Ok(size)
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index 3681242ce..a02c64445 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use crate::raw::*;
use crate::*;
@@ -32,7 +31,7 @@ pub trait OneShotWrite: Send + Sync + Unpin {
/// write_once write all data at once.
///
/// Implementations should make sure that the data is written correctly at
once.
- async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+ async fn write_once(&self, body: &dyn oio::WriteBuf) -> Result<()>;
}
/// OneShotWrite is used to implement [`Write`] based on one shot.
@@ -49,12 +48,9 @@ impl<W: OneShotWrite> OneShotWriter<W> {
#[async_trait]
impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let cursor = oio::Cursor::from(bs);
-
- let size = cursor.len() as u64;
- self.inner.write_once(size, Box::new(cursor)).await?;
-
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+ self.inner.write_once(bs).await?;
Ok(size)
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index b088aa57a..9745b1387 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::AzblobCore;
@@ -161,11 +160,12 @@ impl AzblobWriter {
#[async_trait]
impl oio::Write for AzblobWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
if self.op.append() {
- self.append_oneshot(size, AsyncBody::Bytes(bs)).await?;
+ self.append_oneshot(size as u64,
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+ .await?;
} else {
if self.op.content_length().is_none() {
return Err(Error::new(
@@ -174,7 +174,8 @@ impl oio::Write for AzblobWriter {
));
}
- self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+ self.write_oneshot(size as u64,
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+ .await?;
}
Ok(size)
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index e56f9eca9..f9bec7346 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::AzdfsCore;
@@ -41,9 +40,7 @@ impl AzdfsWriter {
#[async_trait]
impl oio::Write for AzdfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
-
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
let mut req = self.core.azdfs_create_request(
&self.path,
"file",
@@ -68,9 +65,13 @@ impl oio::Write for AzdfsWriter {
}
}
- let mut req =
- self.core
- .azdfs_update_request(&self.path, Some(bs.len()),
AsyncBody::Bytes(bs))?;
+ let size = bs.remaining();
+
+ let mut req = self.core.azdfs_update_request(
+ &self.path,
+ Some(size),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
+ )?;
self.core.sign(&mut req).await?;
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index af2a6ebd0..e0cc8be73 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -18,12 +18,10 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Buf;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -52,14 +50,15 @@ impl CosWriter {
#[async_trait]
impl oio::OneShotWrite for CosWriter {
- async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+ async fn write_once(&self, buf: &dyn oio::WriteBuf) -> Result<()> {
+ let size = buf.remaining();
let mut req = self.core.cos_put_object_request(
&self.path,
- Some(size),
+ Some(size as u64),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- AsyncBody::Stream(stream),
+ AsyncBody::Bytes(buf.copy_to_bytes(size)),
)?;
self.core.sign(&mut req).await?;
@@ -98,7 +97,8 @@ impl oio::MultipartUploadWrite for CosWriter {
let bs = resp.into_body().bytes().await?;
let result: InitiateMultipartUploadResult =
-
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+ quick_xml::de::from_reader(bytes::Buf::reader(bs))
+ .map_err(new_xml_deserialize_error)?;
Ok(result.upload_id)
}
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 00998c77d..c39e95eaf 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::DropboxCore;
@@ -40,8 +39,8 @@ impl DropboxWriter {
#[async_trait]
impl oio::Write for DropboxWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
let resp = self
.core
@@ -49,14 +48,14 @@ impl oio::Write for DropboxWriter {
&self.path,
Some(size),
self.op.content_type(),
- AsyncBody::Bytes(bs),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)
.await?;
let status = resp.status();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(size as u64)
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 4a9a0471d..13e03ae2c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -15,14 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use std::io::Seek;
-use std::io::SeekFrom;
use std::io::Write;
use std::path::PathBuf;
use async_trait::async_trait;
-use bytes::Bytes;
-use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
use super::error::parse_io_error;
@@ -33,7 +29,6 @@ pub struct FsWriter<F> {
target_path: PathBuf,
tmp_path: Option<PathBuf>,
f: F,
- pos: u64,
}
impl<F> FsWriter<F> {
@@ -42,28 +37,14 @@ impl<F> FsWriter<F> {
target_path,
tmp_path,
f,
- pos: 0,
}
}
}
#[async_trait]
impl oio::Write for FsWriter<tokio::fs::File> {
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
-
- self.f
- .seek(SeekFrom::Start(self.pos))
- .await
- .map_err(parse_io_error)?;
- self.f.write_all(&bs).await.map_err(parse_io_error)?;
- self.pos += size;
-
- Ok(size)
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ self.f.write(bs.chunk()).await.map_err(parse_io_error)
}
async fn abort(&mut self) -> Result<()> {
@@ -87,18 +68,8 @@ impl oio::Write for FsWriter<tokio::fs::File> {
}
impl oio::BlockingWrite for FsWriter<std::fs::File> {
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- self.f
- .seek(SeekFrom::Start(self.pos))
- .map_err(parse_io_error)?;
- self.f.write_all(&bs).map_err(parse_io_error)?;
- self.pos += bs.len() as u64;
-
- Ok(bs.len() as u64)
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ self.f.write(bs.chunk()).map_err(parse_io_error)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 3488255f9..2b0673d67 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use futures::AsyncWriteExt;
use super::backend::FtpBackend;
@@ -41,8 +40,9 @@ impl FtpWriter {
#[async_trait]
impl oio::Write for FtpWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+ let bs = bs.copy_to_bytes(size);
let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
@@ -52,7 +52,7 @@ impl oio::Write for FtpWriter {
ftp_stream.finalize_put_stream(data_stream).await?;
- Ok(size as u64)
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 68e13b27f..d8dbe7597 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -118,16 +118,16 @@ impl GcsWriter {
#[async_trait]
impl oio::Write for GcsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
let location = match &self.location {
Some(location) => location,
None => {
- if self.op.content_length().unwrap_or_default() == bs.len() as
u64
- && self.written == 0
+ if self.op.content_length().unwrap_or_default() == size as u64
&& self.written == 0
{
- self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+ self.write_oneshot(size as u64,
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+ .await?;
return Ok(size);
} else {
@@ -138,7 +138,7 @@ impl oio::Write for GcsWriter {
}
};
- self.buffer.push(bs);
+ self.buffer.push(bs.copy_to_bytes(size));
// Return directly if the buffer is not full
if self.buffer.len() <= self.write_fixed_size {
return Ok(size);
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index 8d69bf8e2..c77a8989d 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -95,12 +95,14 @@ impl GdriveWriter {
#[async_trait]
impl oio::Write for GdriveWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
if self.file_id.is_none() {
- self.write_create(size, bs).await?;
+ self.write_create(size as u64, bs.copy_to_bytes(size))
+ .await?;
} else {
- self.write_overwrite(size, bs).await?;
+ self.write_overwrite(size as u64, bs.copy_to_bytes(size))
+ .await?;
}
Ok(size)
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a7db6acab..13b27ccf4 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use super::backend::GhacBackend;
use super::error::parse_error;
@@ -42,18 +41,23 @@ impl GhacWriter {
#[async_trait]
impl oio::Write for GhacWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+
let req = self
.backend
- .ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs))
+ .ghac_upload(
+ self.cache_id,
+ size as u64,
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
+ )
.await?;
let resp = self.backend.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
- self.size += size;
+ self.size += size as u64;
Ok(size)
} else {
Err(parse_error(resp)
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 90499e47c..47cdb53fe 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -18,7 +18,6 @@
use std::io::Write;
use async_trait::async_trait;
-use bytes::Bytes;
use futures::AsyncWriteExt;
use super::error::parse_io_error;
@@ -27,35 +26,18 @@ use crate::*;
pub struct HdfsWriter<F> {
f: F,
- /// The position of current written bytes in the buffer.
- ///
- /// We will maintain the posstion in pos to make sure the buffer is
written correctly.
- pos: usize,
}
impl<F> HdfsWriter<F> {
pub fn new(f: F) -> Self {
- Self { f, pos: 0 }
+ Self { f }
}
}
#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
-
- while self.pos < bs.len() {
- let n = self
- .f
- .write(&bs[self.pos..])
- .await
- .map_err(parse_io_error)?;
- self.pos += n;
- }
- // Reset pos to 0 for next write.
- self.pos = 0;
-
- Ok(size as u64)
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ self.f.write(bs.chunk()).await.map_err(parse_io_error)
}
async fn abort(&mut self) -> Result<()> {
@@ -73,17 +55,8 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
}
impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
- fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
-
- while self.pos < bs.len() {
- let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
- self.pos += n;
- }
- // Reset pos to 0 for next write.
- self.pos = 0;
-
- Ok(size as u64)
+ fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ self.f.write(bs.chunk()).map_err(parse_io_error)
}
fn close(&mut self) -> Result<()> {
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index d3f98e77b..6b6ab6a4d 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::IpmfsBackend;
@@ -38,9 +37,12 @@ impl IpmfsWriter {
#[async_trait]
impl oio::Write for IpmfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
- let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+ let resp = self
+ .backend
+ .ipmfs_write(&self.path, bs.copy_to_bytes(size))
+ .await?;
let status = resp.status();
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index d3b1e119f..38dc660db 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -18,13 +18,11 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Buf;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
use crate::raw::oio::MultipartUploadPart;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -53,13 +51,14 @@ impl ObsWriter {
#[async_trait]
impl oio::OneShotWrite for ObsWriter {
- async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let size = bs.remaining();
let mut req = self.core.obs_put_object_request(
&self.path,
- Some(size),
+ Some(size as u64),
self.op.content_type(),
self.op.cache_control(),
- AsyncBody::Stream(stream),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)?;
self.core.sign(&mut req).await?;
@@ -93,7 +92,8 @@ impl oio::MultipartUploadWrite for ObsWriter {
let bs = resp.into_body().bytes().await?;
let result: InitiateMultipartUploadResult =
-
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+ quick_xml::de::from_reader(bytes::Buf::reader(bs))
+ .map_err(new_xml_deserialize_error)?;
Ok(result.upload_id)
}
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index abb846871..d9eabb8eb 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Buf;
use bytes::Bytes;
use http::StatusCode;
@@ -46,8 +45,9 @@ impl OneDriveWriter {
#[async_trait]
impl oio::Write for OneDriveWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+ let bs = bs.copy_to_bytes(size);
if size <= Self::MAX_SIMPLE_SIZE {
self.write_simple(bs).await?;
@@ -55,7 +55,7 @@ impl oio::Write for OneDriveWriter {
self.write_chunked(bs).await?;
}
- Ok(size as u64)
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
@@ -167,7 +167,7 @@ impl OneDriveWriter {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let result: OneDriveUploadSessionCreationResponseBody =
-
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
Ok(result)
}
_ => Err(parse_error(resp).await?),
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 27aa09011..8fd7ac656 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,12 +18,10 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Buf;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -52,14 +50,15 @@ impl OssWriter {
#[async_trait]
impl oio::OneShotWrite for OssWriter {
- async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let size = bs.remaining();
let mut req = self.core.oss_put_object_request(
&self.path,
- Some(size),
+ Some(size as u64),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- AsyncBody::Stream(stream),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
false,
)?;
@@ -100,7 +99,8 @@ impl oio::MultipartUploadWrite for OssWriter {
let bs = resp.into_body().bytes().await?;
let result: InitiateMultipartUploadResult =
-
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+ quick_xml::de::from_reader(bytes::Buf::reader(bs))
+ .map_err(new_xml_deserialize_error)?;
Ok(result.upload_id)
}
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index a27341d71..7c5ddf924 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,12 +18,10 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Buf;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
-use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
@@ -49,14 +47,16 @@ impl S3Writer {
#[async_trait]
impl oio::OneShotWrite for S3Writer {
- async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let size = bs.remaining();
+
let mut req = self.core.s3_put_object_request(
&self.path,
- Some(size),
+ Some(size as u64),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- AsyncBody::Stream(stream),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)?;
self.core.sign(&mut req).await?;
@@ -95,7 +95,8 @@ impl oio::MultipartUploadWrite for S3Writer {
let bs = resp.into_body().bytes().await?;
let result: InitiateMultipartUploadResult =
-
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+ quick_xml::de::from_reader(bytes::Buf::reader(bs))
+ .map_err(new_xml_deserialize_error)?;
Ok(result.upload_id)
}
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 2df725ab2..90e605152 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -16,13 +16,10 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use openssh_sftp_client::file::File;
use crate::raw::oio;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Result;
+use crate::*;
pub struct SftpWriter {
file: File,
@@ -36,9 +33,8 @@ impl SftpWriter {
#[async_trait]
impl oio::Write for SftpWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
- self.file.write_all(&bs).await?;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = self.file.write(bs.chunk()).await?;
Ok(size)
}
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index 5c57d7a95..95a2ee92f 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -67,14 +67,10 @@ impl SupabaseWriter {
#[async_trait]
impl oio::Write for SupabaseWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- if bs.is_empty() {
- return Ok(9);
- }
-
- let size = bs.len();
- self.upload(bs).await?;
- Ok(size as u64)
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
+ self.upload(bs.copy_to_bytes(size)).await?;
+ Ok(size)
}
async fn abort(&mut self) -> Result<()> {
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index efb223df6..36e62b734 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::VercelArtifactsBackend;
@@ -39,15 +38,15 @@ impl VercelArtifactsWriter {
#[async_trait]
impl oio::Write for VercelArtifactsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
let resp = self
.backend
.vercel_artifacts_put(
self.path.as_str(),
self.op.content_length().unwrap(),
- AsyncBody::Bytes(bs),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)
.await?;
@@ -56,7 +55,7 @@ impl oio::Write for VercelArtifactsWriter {
match status {
StatusCode::OK | StatusCode::ACCEPTED => {
resp.into_body().consume().await?;
- Ok(size as u64)
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index 55d898ccc..d0509ebbc 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::*;
@@ -41,8 +40,8 @@ impl WasabiWriter {
#[async_trait]
impl oio::Write for WasabiWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
let resp = self
.core
@@ -52,14 +51,14 @@ impl oio::Write for WasabiWriter {
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- AsyncBody::Bytes(bs),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)
.await?;
match resp.status() {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(size as u64)
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index 30b9827f2..413fe891a 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::WebdavBackend;
@@ -62,10 +61,11 @@ impl WebdavWriter {
#[async_trait]
impl oio::Write for WebdavWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len() as u64;
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
- self.write_oneshot(size, AsyncBody::Bytes(bs)).await?;
+ self.write_oneshot(size as u64,
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+ .await?;
Ok(size)
}
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index 8dcbc9dc3..34e6b20fa 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::WebhdfsBackend;
@@ -39,8 +38,8 @@ impl WebhdfsWriter {
#[async_trait]
impl oio::Write for WebhdfsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<u64> {
- let size = bs.len();
+ async fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
+ let size = bs.remaining();
let req = self
.backend
@@ -48,7 +47,7 @@ impl oio::Write for WebhdfsWriter {
&self.path,
Some(size),
self.op.content_type(),
- AsyncBody::Bytes(bs),
+ AsyncBody::Bytes(bs.copy_to_bytes(size)),
)
.await?;
@@ -58,7 +57,7 @@ impl oio::Write for WebhdfsWriter {
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
- Ok(size as u64)
+ Ok(size)
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index c1d34f87d..3468f8b6b 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -20,6 +20,7 @@ use std::io::Read;
use bytes::Bytes;
use super::operator_functions::*;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -550,7 +551,7 @@ impl BlockingOperator {
self.inner().clone(),
path,
(OpWrite::default().with_content_length(bs.len() as u64), bs),
- |inner, path, (args, bs)| {
+ |inner, path, (args, mut bs)| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "write path is a
directory")
@@ -561,7 +562,10 @@ impl BlockingOperator {
}
let (_, mut w) = inner.blocking_write(&path, args)?;
- w.write(bs)?;
+ while bs.remaining() > 0 {
+ let n = w.write(&bs)?;
+ bs.advance(n);
+ }
w.close()?;
Ok(())
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index ab97a2df2..b51c2bc60 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -17,7 +17,7 @@
use std::time::Duration;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use futures::stream;
use futures::AsyncReadExt;
use futures::Stream;
@@ -716,7 +716,7 @@ impl Operator {
self.inner().clone(),
path,
(OpWrite::default().with_content_length(bs.len() as u64), bs),
- |inner, path, (args, bs)| {
+ |inner, path, (args, mut bs)| {
let fut = async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(Error::new(
@@ -729,7 +729,11 @@ impl Operator {
}
let (_, mut w) = inner.write(&path, args).await?;
- w.write(bs).await?;
+ while bs.remaining() > 0 {
+ let n = w.write(&bs).await?;
+ bs.advance(n);
+ }
+
w.close().await?;
Ok(())
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 3ae7f6f4c..0474d6ea4 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -22,13 +22,14 @@ use std::task::ready;
use std::task::Context;
use std::task::Poll;
-use bytes::{Buf, Bytes};
+use bytes::Bytes;
use futures::future::BoxFuture;
use futures::AsyncWrite;
use futures::FutureExt;
use futures::TryStreamExt;
use crate::raw::oio::Write;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -91,10 +92,9 @@ impl Writer {
};
let mut bs = bs.into();
-
- while !bs.is_empty() {
- let n = w.write(bs.clone()).await?;
- bs.advance(n as usize);
+ while bs.remaining() > 0 {
+ let n = w.write(&bs).await?;
+ bs.advance(n);
}
Ok(())
@@ -149,10 +149,10 @@ impl Writer {
let mut written = 0;
while let Some(bs) = sink_from.try_next().await? {
let mut bs = bs.into();
- while bs.has_remaining() {
- let n = w.write(bs.clone()).await?;
- bs.advance(n as usize);
- written += n;
+ while bs.remaining() > 0 {
+ let n = w.write(&bs).await?;
+ bs.advance(n);
+ written += n as u64;
}
}
Ok(written)
@@ -262,10 +262,11 @@ impl AsyncWrite for Writer {
let mut w = w
.take()
.expect("invalid state of writer: Idle state with
empty write");
- let bs = Bytes::from(buf.to_vec());
+ // FIXME: This will the buf everytime, we should avoid
this.
+ let bs = Bytes::copy_from_slice(buf);
let fut = async move {
- let n = w.write(bs).await?;
- Ok((n as usize, w))
+ let n = w.write(&bs).await?;
+ Ok((n, w))
};
self.state = State::Write(Box::pin(fut));
}
@@ -334,10 +335,11 @@ impl tokio::io::AsyncWrite for Writer {
let mut w = w
.take()
.expect("invalid state of writer: Idle state with
empty write");
- let bs = Bytes::from(buf.to_vec());
+ // FIXME: This will the buf everytime, we should avoid
this.
+ let bs = Bytes::copy_from_slice(buf);
let fut = async move {
- let n = w.write(bs).await?;
- Ok((n as usize, w))
+ let n = w.write(&bs).await?;
+ Ok((n, w))
};
self.state = State::Write(Box::pin(fut));
}
@@ -416,10 +418,9 @@ impl BlockingWriter {
/// Write into inner writer.
pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
let mut bs = bs.into();
-
- while !bs.is_empty() {
- let n = self.inner.write(bs.clone())?;
- bs.advance(n as usize);
+ while bs.remaining() > 0 {
+ let n = self.inner.write(&bs)?;
+ bs.advance(n);
}
Ok(())
@@ -434,8 +435,7 @@ impl BlockingWriter {
impl io::Write for BlockingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner
- .write(Bytes::from(buf.to_vec()))
- .map(|n| n as usize)
+ .write(&buf)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}