This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 56e0c6ad refactor: Polish Writer API by merging append and write
together (#2036)
56e0c6ad is described below
commit 56e0c6addb6d7564387051a251efba24477f0128
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 18:10:11 2023 +0800
refactor: Polish Writer API by merging append and write together (#2036)
* Save work
Signed-off-by: Xuanwo <[email protected]>
* Remove append
Signed-off-by: Xuanwo <[email protected]>
* Add test for copy
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
* Fix unit tests
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/concurrent_limit.rs | 8 --
core/src/layers/error_context.rs | 16 ---
core/src/layers/logging.rs | 66 ----------
core/src/layers/metrics.rs | 23 ----
core/src/layers/minitrace.rs | 16 ---
core/src/layers/oteltrace.rs | 8 --
core/src/layers/prometheus.rs | 33 -----
core/src/layers/retry.rs | 35 ------
core/src/layers/tracing.rs | 16 ---
core/src/raw/adapters/kv/backend.rs | 44 +++----
core/src/raw/oio/cursor.rs | 171 ++++++++++++++++++++++++++
core/src/raw/oio/mod.rs | 1 +
core/src/raw/oio/write.rs | 60 +++------
core/src/services/azblob/backend.rs | 4 +-
core/src/services/azblob/writer.rs | 9 --
core/src/services/azdfs/backend.rs | 4 +-
core/src/services/azdfs/writer.rs | 9 --
core/src/services/fs/writer.rs | 22 ----
core/src/services/ftp/backend.rs | 4 +-
core/src/services/ftp/writer.rs | 9 --
core/src/services/gcs/backend.rs | 25 +---
core/src/services/gcs/core.rs | 10 +-
core/src/services/gcs/writer.rs | 176 +++++++++++++++++----------
core/src/services/ghac/backend.rs | 4 +-
core/src/services/ghac/writer.rs | 9 --
core/src/services/hdfs/writer.rs | 25 ----
core/src/services/ipmfs/backend.rs | 4 +-
core/src/services/ipmfs/writer.rs | 9 --
core/src/services/obs/backend.rs | 10 +-
core/src/services/obs/core.rs | 5 -
core/src/services/obs/writer.rs | 10 --
core/src/services/oss/backend.rs | 18 +--
core/src/services/oss/writer.rs | 108 +++++++++++++---
core/src/services/s3/backend.rs | 32 +----
core/src/services/s3/core.rs | 5 -
core/src/services/s3/writer.rs | 131 ++++++++++++++++----
core/src/services/wasabi/backend.rs | 2 +-
core/src/services/wasabi/writer.rs | 80 +++++-------
core/src/services/webdav/backend.rs | 4 +-
core/src/services/webdav/writer.rs | 9 --
core/src/services/webhdfs/backend.rs | 4 +-
core/src/services/webhdfs/writer.rs | 9 --
core/src/types/operator/blocking_operator.rs | 22 ++--
core/src/types/operator/operator.rs | 26 ++--
core/src/types/ops.rs | 32 ++---
core/src/types/writer.rs | 97 ++++++++++++---
core/tests/behavior/utils.rs | 11 ++
core/tests/behavior/write.rs | 50 ++++++--
48 files changed, 756 insertions(+), 729 deletions(-)
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 99e947b3..c1ccb76d 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -316,10 +316,6 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.write(bs).await
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs).await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
@@ -334,10 +330,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
ConcurrentLimitWrapper<R> {
self.inner.write(bs)
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs)
- }
-
fn close(&mut self) -> Result<()> {
self.inner.close()
}
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 29724613..5417a040 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -411,14 +411,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs).await.map_err(|err| {
- err.with_operation(WriteOperation::Append)
- .with_context("service", self.scheme)
- .with_context("path", &self.path)
- })
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
err.with_operation(WriteOperation::Append)
@@ -445,14 +437,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for
ErrorContextWrapper<T> {
})
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs).map_err(|err| {
- err.with_operation(WriteOperation::BlockingAppend)
- .with_context("service", self.scheme)
- .with_context("path", &self.path)
- })
- }
-
fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
err.with_operation(WriteOperation::BlockingClose)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 706287ad..abecc48f 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1386,39 +1386,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- match self.inner.append(bs).await {
- Ok(_) => {
- self.written += size as u64;
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={} -> data write
{}B",
- self.scheme,
- WriteOperation::Append,
- self.path,
- self.written,
- size
- );
- Ok(())
- }
- Err(err) => {
- if let Some(lvl) = self.failure_level {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={} -> data
write failed: {err:?}",
- self.scheme,
- WriteOperation::Append,
- self.path,
- self.written,
- )
- }
- Err(err)
- }
- }
- }
-
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
@@ -1504,39 +1471,6 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
}
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- match self.inner.append(bs) {
- Ok(_) => {
- self.written += size as u64;
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={} -> data write
{}B",
- self.scheme,
- WriteOperation::BlockingAppend,
- self.path,
- self.written,
- size
- );
- Ok(())
- }
- Err(err) => {
- if let Some(lvl) = self.failure_level {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={} -> data
write failed: {err:?}",
- self.scheme,
- WriteOperation::BlockingAppend,
- self.path,
- self.written,
- )
- }
- Err(err)
- }
- }
- }
-
fn close(&mut self) -> Result<()> {
match self.inner.close() {
Ok(_) => Ok(()),
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 2aff9003..6bdfe19f 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -925,18 +925,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
})
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- self.inner
- .append(bs)
- .await
- .map(|_| self.bytes += size as u64)
- .map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
@@ -964,17 +952,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
MetricWrapper<R> {
})
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- self.inner
- .append(bs)
- .map(|_| self.bytes += size as u64)
- .map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 3e91bac6..35c48d27 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,16 +337,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
.await
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner
- .append(bs)
- .in_span(Span::enter_with_parent(
- WriteOperation::Append.into_static(),
- &self.span,
- ))
- .await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner
.abort()
@@ -375,12 +365,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
MinitraceWrapper<R> {
self.inner.write(bs)
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- let _span =
-
Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(),
&self.span);
- self.inner.append(bs)
- }
-
fn close(&mut self) -> Result<()> {
let _span =
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(),
&self.span);
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index c607e975..f9f3a931 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -339,10 +339,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
self.inner.write(bs).await
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs).await
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
@@ -357,10 +353,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
OtelTraceWrapper<R> {
self.inner.write(bs)
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs)
- }
-
fn close(&mut self) -> Result<()> {
self.inner.close()
}
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 4689c383..6919abc4 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -720,23 +720,6 @@ impl<R: oio::Write> oio::Write for
PrometheusMetricWrapper<R> {
})
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- self.inner
- .append(bs)
- .await
- .map(|_| {
- self.stats
- .bytes_total
- .with_label_values(&[&self.scheme,
Operation::Write.into_static()])
- .observe(size as f64)
- })
- .map_err(|err| {
- self.stats.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
@@ -769,22 +752,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
PrometheusMetricWrapper<R> {
})
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- let size = bs.len();
- self.inner
- .append(bs)
- .map(|_| {
- self.stats
- .bytes_total
- .with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
- .observe(size as f64)
- })
- .map_err(|err| {
- self.stats.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 401bd53e..497c1ca8 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -651,27 +651,6 @@ impl<R: oio::Write> oio::Write for RetryWrapper<R> {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let mut backoff = self.builder.build();
-
- loop {
- match self.inner.append(bs.clone()).await {
- Ok(v) => return Ok(v),
- Err(e) if !e.is_temporary() => return Err(e),
- Err(e) => match backoff.next() {
- None => return Err(e),
- Some(dur) => {
- warn!(target: "opendal::service",
- "operation={} path={} -> pager retry after {}s:
error={:?}",
- WriteOperation::Append, self.path,
dur.as_secs_f64(), e);
- tokio::time::sleep(dur).await;
- continue;
- }
- },
- }
- }
- }
-
async fn abort(&mut self) -> Result<()> {
let mut backoff = self.builder.build();
@@ -730,20 +709,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
RetryWrapper<R> {
.map_err(|e| e.set_persistent())
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- { || self.inner.append(bs.clone()) }
- .retry(&self.builder)
- .when(|e| e.is_temporary())
- .notify(move |err, dur| {
- warn!(
- target: "opendal::service",
- "operation={} -> pager retry after {}s: error={:?}",
- WriteOperation::BlockingAppend, dur.as_secs_f64(), err)
- })
- .call()
- .map_err(|e| e.set_persistent())
- }
-
fn close(&mut self) -> Result<()> {
{ || self.inner.close() }
.retry(&self.builder)
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 8f9b8a5d..2782d049 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -344,14 +344,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
self.inner.write(bs).await
}
- #[tracing::instrument(
- parent = &self.span,
- level = "trace",
- skip_all)]
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs).await
- }
-
#[tracing::instrument(
parent = &self.span,
level = "trace",
@@ -378,14 +370,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
TracingWrapper<R> {
self.inner.write(bs)
}
- #[tracing::instrument(
- parent = &self.span,
- level = "trace",
- skip_all)]
- fn append(&mut self, bs: Bytes) -> Result<()> {
- self.inner.append(bs)
- }
-
#[tracing::instrument(
parent = &self.span,
level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 15dc7beb..dae293e0 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -107,13 +107,27 @@ impl<S: Adapter> Accessor for Backend<S> {
Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
}
- async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
}
- fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
@@ -264,35 +278,17 @@ impl<S> KvWriter<S> {
buf: None,
}
}
-
- fn extend_buf(&mut self, bs: Bytes) {
- if let Some(buf) = self.buf.as_mut() {
- buf.extend(bs);
- } else {
- self.buf = Some(bs.into())
- }
- }
}
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
+ // TODO: we need to support append in the future.
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.buf = Some(bs.into());
Ok(())
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- if let Err(e) = self.kv.append(&self.path,
bs.to_vec().as_slice()).await {
- if e.kind() == ErrorKind::Unsupported {
- self.extend_buf(bs);
- } else {
- return Err(e);
- }
- }
- Ok(())
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
@@ -316,12 +312,6 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
Ok(())
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- self.extend_buf(bs);
-
- Ok(())
- }
-
fn close(&mut self) -> Result<()> {
if let Some(buf) = self.buf.as_deref() {
self.kv.blocking_set(&self.path, buf)?;
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index c0282ffd..1e34c9b9 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::VecDeque;
use std::io::Read;
use std::io::SeekFrom;
use std::task::Context;
use std::task::Poll;
+use bytes::Buf;
use bytes::Bytes;
+use bytes::BytesMut;
use crate::raw::*;
use crate::*;
@@ -138,3 +141,171 @@ impl oio::BlockingRead for Cursor {
}
}
}
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+pub struct VectorCursor {
+ inner: VecDeque<Bytes>,
+ size: usize,
+}
+
+impl Default for VectorCursor {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl VectorCursor {
+ /// Create a new vector cursor.
+ pub fn new() -> Self {
+ Self {
+ inner: VecDeque::new(),
+ size: 0,
+ }
+ }
+
+ /// Returns `true` if current vector is empty.
+ pub fn is_empty(&self) -> bool {
+ self.size == 0
+ }
+
+ /// Return current bytes size of current vector.
+ pub fn len(&self) -> usize {
+ self.size
+ }
+
+ /// Push a new bytes into vector cursor.
+ pub fn push(&mut self, bs: Bytes) {
+ self.size += bs.len();
+ self.inner.push_back(bs);
+ }
+
+ /// Pop a bytes from vector cursor.
+ pub fn pop(&mut self) {
+ let bs = self.inner.pop_back();
+ self.size -= bs.expect("pop bytes must exist").len()
+ }
+
+ /// Clear the entire vector.
+ pub fn clear(&mut self) {
+ self.inner.clear();
+ self.size = 0;
+ }
+
+ /// Peak will read and copy exactly n bytes from current cursor
+ /// without change it's content.
+ ///
+ /// This function is useful if you want to read a fixed size
+ /// content to make sure it aligned.
+ ///
+ /// # Panics
+ ///
+ /// Panics if n is larger than current size.
+ ///
+ /// # TODO
+ ///
+ /// Optimize to avoid data copy.
+ pub fn peak_exact(&self, n: usize) -> Bytes {
+ assert!(n <= self.size, "peak size must smaller than current size");
+
+ // Avoid data copy if n is smaller than first chunk.
+ if self.inner[0].len() >= n {
+ return self.inner[0].slice(..n);
+ }
+
+ let mut bs = BytesMut::with_capacity(n);
+ let mut n = n;
+ for b in &self.inner {
+ if n == 0 {
+ break;
+ }
+ let len = b.len().min(n);
+ bs.extend_from_slice(&b[..len]);
+ n -= len;
+ }
+ bs.freeze()
+ }
+
+ /// peak_at_least will read and copy at least n bytes from current
+ /// cursor without change it's content.
+ ///
+ /// This function is useful if you only want to make sure the
+ /// returning bytes is larger.
+ ///
+ /// # Panics
+ ///
+ /// Panics if n is larger than current size.
+ ///
+ /// # TODO
+ ///
+ /// Optimize to avoid data copy.
+ pub fn peak_at_least(&self, n: usize) -> Bytes {
+ assert!(n <= self.size, "peak size must smaller than current size");
+
+ // Avoid data copy if n is smaller than first chunk.
+ if self.inner[0].len() >= n {
+ return self.inner[0].clone();
+ }
+
+ let mut bs = BytesMut::with_capacity(n);
+ let mut n = n;
+ for b in &self.inner {
+ if n == 0 {
+ break;
+ }
+ let len = b.len().min(n);
+ bs.extend_from_slice(&b[..len]);
+ n -= len;
+ }
+ bs.freeze()
+ }
+
+ /// Take will consume n bytes from current cursor.
+ ///
+ /// # Panics
+ ///
+ /// Panics if n is larger than current size.
+ pub fn take(&mut self, n: usize) {
+ assert!(n <= self.size, "take size must smamller than current size");
+
+ // Update current size
+ self.size -= n;
+
+ let mut n = n;
+ while n > 0 {
+ assert!(!self.inner.is_empty(), "inner must not be empty");
+
+ if self.inner[0].len() <= n {
+ n -= self.inner[0].len();
+ self.inner.pop_front();
+ } else {
+ self.inner[0].advance(n);
+ n = 0;
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_vector_cursor() {
+ let mut vc = VectorCursor::new();
+
+ vc.push(Bytes::from("hello"));
+ vc.push(Bytes::from("world"));
+
+ assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+ assert_eq!(vc.peak_exact(1), Bytes::from("h"));
+ assert_eq!(vc.peak_exact(4), Bytes::from("hell"));
+ assert_eq!(vc.peak_exact(10), Bytes::from("helloworld"));
+
+ vc.take(1);
+ assert_eq!(vc.peak_exact(1), Bytes::from("e"));
+ vc.take(1);
+ assert_eq!(vc.peak_exact(1), Bytes::from("l"));
+ vc.take(5);
+ assert_eq!(vc.peak_exact(1), Bytes::from("r"));
+ }
+}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index f82af4ca..56b48dda 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -43,6 +43,7 @@ pub use write::Writer;
mod cursor;
pub use cursor::Cursor;
+pub use cursor::VectorCursor;
mod into_streamable;
pub use into_streamable::into_streamable_reader;
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index 05f93a93..a2437312 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -76,23 +76,30 @@ impl From<WriteOperation> for &'static str {
pub type Writer = Box<dyn Write>;
/// Write is the trait that OpenDAL returns to callers.
+///
+/// # Notes
+///
+/// There are two possible two cases:
+///
+/// - Sized: The total size of the object is known in advance.
+/// - Unsized: The total size of the object is unknown in advance.
+///
+/// And it's possible that the given bs length is less than the total
+/// content length. Users will call write multiple times to write
+/// the whole data.
#[async_trait]
pub trait Write: Unpin + Send + Sync {
- /// Write whole content at once.
+ /// Write given into writer.
+ ///
+ /// # Notes
+ ///
+ /// It's possible that the given bs length is less than the total
+ /// content length. And users will call write multiple times.
///
- /// To append multiple bytes together, use `append` instead.
+ /// Please make sure `write` is safe to re-enter.
async fn write(&mut self, bs: Bytes) -> Result<()>;
- /// Append bytes to the writer.
- ///
- /// It is highly recommended to align the length of the input bytes
- /// into blocks of 4MiB (except the last block) for better performance
- /// and compatibility.
- async fn append(&mut self, bs: Bytes) -> Result<()>;
-
- /// Abort the pending appendable writer.
- /// #note
- /// This method is only applicable to writers opened in append mode.
+ /// Abort the pending writer.
async fn abort(&mut self) -> Result<()>;
/// Close the writer and make sure all data has been flushed.
@@ -107,15 +114,6 @@ impl Write for () {
unimplemented!("write is required to be implemented for oio::Write")
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
@@ -139,10 +137,6 @@ impl<T: Write + ?Sized> Write for Box<T> {
(**self).write(bs).await
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- (**self).append(bs).await
- }
-
async fn abort(&mut self) -> Result<()> {
(**self).abort().await
}
@@ -160,9 +154,6 @@ pub trait BlockingWrite: Send + Sync + 'static {
/// Write whole content at once.
fn write(&mut self, bs: Bytes) -> Result<()>;
- /// Append content at tailing.
- fn append(&mut self, bs: Bytes) -> Result<()>;
-
/// Close the writer and make sure all data has been flushed.
fn close(&mut self) -> Result<()>;
}
@@ -174,15 +165,6 @@ impl BlockingWrite for () {
unimplemented!("write is required to be implemented for
oio::BlockingWrite")
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
fn close(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
@@ -198,10 +180,6 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
(**self).write(bs)
}
- fn append(&mut self, bs: Bytes) -> Result<()> {
- (**self).append(bs)
- }
-
fn close(&mut self) -> Result<()> {
(**self).close()
}
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index dfd7ca18..659d9f5e 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -500,10 +500,10 @@ impl Accessor for AzblobBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index d74af141..9e6bdb40 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -65,15 +65,6 @@ impl oio::Write for AzblobWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index 5c0b68db..1dbe605e 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -354,10 +354,10 @@ impl Accessor for AzdfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index b9f6faf2..a6efaf08 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -87,15 +87,6 @@ impl oio::Write for AzdfsWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 33dcb2d5..03d6d824 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -54,17 +54,6 @@ impl oio::Write for FsWriter<tokio::fs::File> {
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.f.rewind().await.map_err(parse_io_error)?;
- self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
- Ok(())
- }
-
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- async fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.await
@@ -101,17 +90,6 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
fn write(&mut self, bs: Bytes) -> Result<()> {
- self.f.rewind().map_err(parse_io_error)?;
- self.f.write_all(&bs).map_err(parse_io_error)?;
-
- Ok(())
- }
-
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.map_err(parse_io_error)?;
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index f509eff2..5dd4b0ff 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -388,10 +388,10 @@ impl Accessor for FtpBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index dec8c473..653dc003 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,15 +53,6 @@ impl oio::Write for FtpWriter {
Ok(())
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 7e865dcb..aba03063 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -410,32 +410,9 @@ impl Accessor for GcsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let upload_location = if args.append() {
- let resp = self.core.gcs_initiate_resumable_upload(path).await?;
- let status = resp.status();
-
- match status {
- StatusCode::OK => {
- let bs = parse_location(resp.headers())
- .expect("Failed to retrieve location of resumable
upload");
- if let Some(location) = bs {
- Some(String::from(location))
- } else {
- return Err(Error::new(
- ErrorKind::NotFound,
- "location is not in the response header",
- ));
- }
- }
- _ => return Err(parse_error(resp).await?),
- }
- } else {
- None
- };
-
Ok((
RpWrite::default(),
- GcsWriter::new(self.core.clone(), args, path.to_string(),
upload_location),
+ GcsWriter::new(self.core.clone(), path, args),
))
}
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 18376acd..907d0ef4 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -322,7 +322,7 @@ impl GcsCore {
&self,
location: &str,
size: u64,
- written_bytes: u64,
+ written: u64,
is_last_part: bool,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
@@ -331,12 +331,12 @@ impl GcsCore {
let range_header = if is_last_part {
format!(
"bytes {}-{}/{}",
- written_bytes,
- written_bytes + size - 1,
- written_bytes + size
+ written,
+ written + size - 1,
+ written + size
)
} else {
- format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1)
+ format!("bytes {}-{}/*", written, written + size - 1)
};
req = req
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index b47b2051..c4ed1f6e 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -29,37 +29,40 @@ use crate::*;
pub struct GcsWriter {
core: Arc<GcsCore>,
-
- op: OpWrite,
path: String,
+ op: OpWrite,
+
location: Option<String>,
- written_bytes: u64,
- is_last_part_written: bool,
- last: Option<Bytes>,
+ written: u64,
+ buffer: oio::VectorCursor,
+ buffer_size: usize,
}
impl GcsWriter {
- pub fn new(
- core: Arc<GcsCore>,
- op: OpWrite,
- path: String,
- upload_location: Option<String>,
- ) -> Self {
+ pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
GcsWriter {
core,
+ path: path.to_string(),
op,
- path,
- location: upload_location,
- written_bytes: 0,
- is_last_part_written: false,
- last: None,
+
+ location: None,
+ written: 0,
+ buffer: oio::VectorCursor::new(),
+ // The chunk size should be a multiple of 256 KiB
+ // (256 x 1024 bytes), unless it's the last chunk
+ // that completes the upload.
+ //
+ // Larger chunk sizes typically make uploads faster,
+ // but note that there's a tradeoff between speed and
+ // memory usage. It's recommended that you use at least
+ // 8 MiB for the chunk size.
+ //
+ // TODO: allow this value to be configured.
+ buffer_size: 8 * 1024 * 1024,
}
}
-}
-#[async_trait]
-impl oio::Write for GcsWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
let mut req = self.core.gcs_insert_object_request(
&percent_encode_path(&self.path),
Some(bs.len()),
@@ -82,80 +85,119 @@ impl oio::Write for GcsWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let location = if let Some(location) = &self.location {
- location
- } else {
- return Ok(());
- };
+ async fn initiate_upload(&self) -> Result<String> {
+ let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
+ let status = resp.status();
- let result = if let Some(last) = &self.last {
- let bytes_to_upload = last.slice(0..last.len());
- let part_size = bytes_to_upload.len() as u64;
- let is_last_part = part_size % (256 * 1024) != 0;
- let mut req = self.core.gcs_upload_in_resumable_upload(
- location,
- part_size,
- self.written_bytes,
- is_last_part,
- AsyncBody::Bytes(bytes_to_upload),
- )?;
-
- self.core.sign(&mut req).await?;
-
- let resp = self.core.send(req).await?;
-
- let status = resp.status();
-
- match status {
- StatusCode::OK | StatusCode::PERMANENT_REDIRECT => {
- if is_last_part {
- self.is_last_part_written = true
- } else {
- self.written_bytes += part_size;
- }
- Ok(())
+ match status {
+ StatusCode::OK => {
+ let bs = parse_location(resp.headers())?;
+ if let Some(location) = bs {
+ Ok(location.to_string())
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "location is not in the response header",
+ ))
+ }
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> {
+ let mut req = self.core.gcs_upload_in_resumable_upload(
+ location,
+ bs.len() as u64,
+ self.written,
+ false,
+ AsyncBody::Bytes(bs),
+ )?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Write for GcsWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let location = match &self.location {
+ Some(location) => location,
+ None => {
+ if self.op.content_length().unwrap_or_default() == bs.len() as
u64
+ && self.written == 0
+ {
+ return self.write_oneshot(bs).await;
+ } else {
+ let location = self.initiate_upload().await?;
+ self.location = Some(location);
+ self.location.as_deref().unwrap()
}
- _ => Err(parse_error(resp).await?),
}
- } else {
- Ok(())
};
- self.last = Some(bs.slice(0..bs.len()));
- return result;
+ // Ignore empty bytes
+ if bs.is_empty() {
+ return Ok(());
+ }
+
+ self.buffer.push(bs);
+ // Return directly if the buffer is not full
+ if self.buffer.len() <= self.buffer_size {
+ return Ok(());
+ }
+
+ let bs = self.buffer.peak_exact(self.buffer_size);
+
+ match self.write_part(location, bs).await {
+ Ok(_) => {
+ self.buffer.take(self.buffer_size);
+ self.written += self.buffer_size as u64;
+ Ok(())
+ }
+ Err(e) => {
+ // If the upload fails, we should pop the given bs to make sure
+ // write is re-enter safe.
+ self.buffer.pop();
+ Err(e)
+ }
+ }
}
+ // TODO: we can cancel the upload by sending a DELETE request to the
location
async fn abort(&mut self) -> Result<()> {
Ok(())
}
async fn close(&mut self) -> Result<()> {
- if self.is_last_part_written {
- return Ok(());
- }
-
let location = if let Some(location) = &self.location {
location
} else {
return Ok(());
};
- let bs = self
- .last
- .as_ref()
- .expect("failed to get the previously uploaded part");
+ let bs = self.buffer.peak_exact(self.buffer.len());
let resp = self
.core
- .gcs_complete_resumable_upload(location, self.written_bytes,
bs.slice(0..bs.len()))
+ .gcs_complete_resumable_upload(location, self.written, bs)
.await?;
let status = resp.status();
-
match status {
StatusCode::OK => {
resp.into_body().consume().await?;
+
+ self.location = None;
+ self.buffer.clear();
Ok(())
}
_ => Err(parse_error(resp).await?),
diff --git a/core/src/services/ghac/backend.rs
b/core/src/services/ghac/backend.rs
index 159b0022..c5afeb81 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -391,10 +391,10 @@ impl Accessor for GhacBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b9e4eefd..8ff12c8b 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,15 +62,6 @@ impl oio::Write for GhacWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6062d305..d045c063 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -46,20 +46,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.f
- .seek(SeekFrom::Start(0))
- .await
- .map_err(parse_io_error)?;
- self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
- Ok(())
- }
-
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- async fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.await
@@ -90,17 +76,6 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
fn write(&mut self, bs: Bytes) -> Result<()> {
- self.f.rewind().map_err(parse_io_error)?;
- self.f.write_all(&bs).map_err(parse_io_error)?;
-
- Ok(())
- }
-
- /// # Notes
- ///
- /// File could be partial written, so we will seek to start to make sure
- /// we write the same content.
- fn append(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.map_err(parse_io_error)?;
diff --git a/core/src/services/ipmfs/backend.rs
b/core/src/services/ipmfs/backend.rs
index c2e59f84..9e9377b8 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -110,10 +110,10 @@ impl Accessor for IpmfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 4394c751..4f240000 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -55,15 +55,6 @@ impl oio::Write for IpmfsWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 3412999e..c8223d1a 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -315,9 +315,9 @@ impl Accessor for ObsBackend {
}
async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
- let mut req =
- self.core
- .obs_put_object_request(path, Some(0), None, None,
AsyncBody::Empty)?;
+ let mut req = self
+ .core
+ .obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?;
self.core.sign(&mut req).await?;
@@ -352,10 +352,10 @@ impl Accessor for ObsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index c61f1f02..191ec95b 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -116,7 +116,6 @@ impl ObsCore {
path: &str,
size: Option<usize>,
content_type: Option<&str>,
- if_match: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -125,10 +124,6 @@ impl ObsCore {
let mut req = Request::put(&url);
- if let Some(if_match) = if_match {
- req = req.header(IF_MATCH, if_match);
- }
-
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 080e0919..d71c7893 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -47,7 +47,6 @@ impl oio::Write for ObsWriter {
&self.path,
Some(bs.len()),
self.op.content_type(),
- self.op.if_match(),
AsyncBody::Bytes(bs),
)?;
@@ -66,15 +65,6 @@ impl oio::Write for ObsWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index b3077c7b..c314586c 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -407,25 +407,9 @@ impl Accessor for OssBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let upload_id = if args.append() {
- let resp = self.core.oss_initiate_upload(path, &args).await?;
- match resp.status() {
- StatusCode::OK => {
- let bs = resp.into_body().bytes().await?;
- let result: InitiateMultipartUploadResult =
- quick_xml::de::from_reader(bs.reader())
- .map_err(new_xml_deserialize_error)?;
- Some(result.upload_id)
- }
- _ => return Err(parse_error(resp).await?),
- }
- } else {
- None
- };
-
Ok((
RpWrite::default(),
- OssWriter::new(self.core.clone(), args, path.to_string(),
upload_id),
+ OssWriter::new(self.core.clone(), path, args),
))
}
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 3c56226c..1f730cfa 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -18,7 +18,7 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use http::StatusCode;
use super::core::*;
@@ -33,24 +33,33 @@ pub struct OssWriter {
op: OpWrite,
path: String,
upload_id: Option<String>,
+
parts: Vec<MultipartUploadPart>,
+ buffer: oio::VectorCursor,
+ buffer_size: usize,
}
impl OssWriter {
- pub fn new(core: Arc<OssCore>, op: OpWrite, path: String, upload_id:
Option<String>) -> Self {
+ pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
OssWriter {
core,
+ path: path.to_string(),
op,
- path,
- upload_id,
+
+ upload_id: None,
parts: vec![],
+ buffer: oio::VectorCursor::new(),
+ // The part size must be 5 MiB to 5 GiB. There is no minimum
+ // size limit on the last part of your multipart upload.
+ //
+ // We pick the default value as 8 MiB for better thoughput.
+ //
+ // TODO: allow this value to be configured.
+ buffer_size: 8 * 1024 * 1024,
}
}
-}
-#[async_trait]
-impl oio::Write for OssWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
let mut req = self.core.oss_put_object_request(
&self.path,
Some(bs.len()),
@@ -76,10 +85,20 @@ impl oio::Write for OssWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let upload_id = self.upload_id.as_ref().expect(
- "Writer doesn't have upload id, but users trying to call append,
must be buggy",
- );
+ async fn initiate_upload(&self) -> Result<String> {
+ let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
+ match resp.status() {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+ let result: InitiateMultipartUploadResult =
+
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+ Ok(result.upload_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write_part(&self, upload_id: &str, bs: Bytes) ->
Result<MultipartUploadPart> {
// Aliyun OSS requires part number must between [1..=10000]
let part_number = self.parts.len() + 1;
let mut req = self
@@ -108,13 +127,59 @@ impl oio::Write for OssWriter {
})?
.to_string();
resp.into_body().consume().await?;
- self.parts.push(MultipartUploadPart { part_number, etag });
- Ok(())
+ Ok(MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
+}
+#[async_trait]
+impl oio::Write for OssWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let upload_id = match &self.upload_id {
+ Some(upload_id) => upload_id,
+ None => {
+ if self.op.content_length().unwrap_or_default() == bs.len() as
u64 {
+ return self.write_oneshot(bs).await;
+ } else {
+ let upload_id = self.initiate_upload().await?;
+ self.upload_id = Some(upload_id);
+ self.upload_id.as_deref().unwrap()
+ }
+ }
+ };
+
+ // Ignore empty bytes
+ if bs.is_empty() {
+ return Ok(());
+ }
+
+ self.buffer.push(bs);
+ // Return directly if the buffer is not full
+ if self.buffer.len() <= self.buffer_size {
+ return Ok(());
+ }
+
+ let bs = self.buffer.peak_at_least(self.buffer_size);
+ let size = bs.len();
+
+ match self.write_part(upload_id, bs).await {
+ Ok(part) => {
+ self.buffer.take(size);
+ self.parts.push(part);
+ Ok(())
+ }
+ Err(e) => {
+ // If the upload fails, we should pop the given bs to make sure
+ // write is re-enter safe.
+ self.buffer.pop();
+ Err(e)
+ }
+ }
+ }
+
+ // TODO: we can cancel the upload by sending an abort request.
async fn abort(&mut self) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
@@ -129,6 +194,21 @@ impl oio::Write for OssWriter {
return Ok(());
};
+ // Make sure internal buffer has been flushed.
+ if !self.buffer.is_empty() {
+ let bs = self.buffer.peak_exact(self.buffer.len());
+
+ match self.write_part(upload_id, bs).await {
+ Ok(part) => {
+ self.buffer.clear();
+ self.parts.push(part);
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+
let resp = self
.core
.oss_complete_multipart_upload_request(&self.path, upload_id,
false, &self.parts)
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index da2f1181..66940674 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -959,39 +959,9 @@ impl Accessor for S3Backend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let upload_id = if args.append() {
- let resp = self
- .core
- .s3_initiate_multipart_upload(
- path,
- args.content_type(),
- args.content_disposition(),
- args.cache_control(),
- args.if_match(),
- )
- .await?;
-
- let status = resp.status();
-
- match status {
- StatusCode::OK => {
- let bs = resp.into_body().bytes().await?;
-
- let result: InitiateMultipartUploadResult =
- quick_xml::de::from_reader(bs.reader())
- .map_err(new_xml_deserialize_error)?;
-
- Some(result.upload_id)
- }
- _ => return Err(parse_error(resp).await?),
- }
- } else {
- None
- };
-
Ok((
RpWrite::default(),
- S3Writer::new(self.core.clone(), args, path.to_string(),
upload_id),
+ S3Writer::new(self.core.clone(), path, args),
))
}
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index c618b87d..4f1c36a2 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -489,7 +489,6 @@ impl S3Core {
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
- if_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -509,10 +508,6 @@ impl S3Core {
req = req.header(CACHE_CONTROL, cache_control)
}
- if let Some(if_match) = if_match {
- req = req.header(IF_MATCH, if_match)
- }
-
// Set storage class header
if let Some(v) = &self.default_storage_class {
req =
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index d75906f9..9e80bad2 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -18,7 +18,7 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use http::StatusCode;
use super::core::*;
@@ -32,32 +32,34 @@ pub struct S3Writer {
op: OpWrite,
path: String,
-
upload_id: Option<String>,
+
parts: Vec<CompleteMultipartUploadRequestPart>,
+ buffer: oio::VectorCursor,
+ buffer_size: usize,
}
impl S3Writer {
- pub fn new(core: Arc<S3Core>, op: OpWrite, path: String, upload_id:
Option<String>) -> Self {
+ pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
S3Writer {
core,
-
+ path: path.to_string(),
op,
- path,
- upload_id,
+
+ upload_id: None,
parts: vec![],
+ buffer: oio::VectorCursor::new(),
+ // The part size must be 5 MiB to 5 GiB. There is no minimum
+ // size limit on the last part of your multipart upload.
+ //
+ // We pick the default value as 8 MiB for better thoughput.
+ //
+ // TODO: allow this value to be configured.
+ buffer_size: 8 * 1024 * 1024,
}
}
-}
-
-#[async_trait]
-impl oio::Write for S3Writer {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- debug_assert!(
- self.upload_id.is_none(),
- "Writer initiated with upload id, but users trying to call write,
must be buggy"
- );
+ async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
let mut req = self.core.s3_put_object_request(
&self.path,
Some(bs.len()),
@@ -82,10 +84,37 @@ impl oio::Write for S3Writer {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let upload_id = self.upload_id.as_ref().expect(
- "Writer doesn't have upload id, but users trying to call append,
must be buggy",
- );
+ async fn initiate_upload(&self) -> Result<String> {
+ let resp = self
+ .core
+ .s3_initiate_multipart_upload(
+ &self.path,
+ self.op.content_type(),
+ self.op.content_disposition(),
+ self.op.cache_control(),
+ )
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let result: InitiateMultipartUploadResult =
+
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
+ Ok(result.upload_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write_part(
+ &self,
+ upload_id: &str,
+ bs: Bytes,
+ ) -> Result<CompleteMultipartUploadRequestPart> {
// AWS S3 requires part number must between [1..=10000]
let part_number = self.parts.len() + 1;
@@ -116,12 +145,55 @@ impl oio::Write for S3Writer {
resp.into_body().consume().await?;
- self.parts
- .push(CompleteMultipartUploadRequestPart { part_number,
etag });
+ Ok(CompleteMultipartUploadRequestPart { part_number, etag })
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Write for S3Writer {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let upload_id = match &self.upload_id {
+ Some(upload_id) => upload_id,
+ None => {
+ if self.op.content_length().unwrap_or_default() == bs.len() as
u64 {
+ return self.write_oneshot(bs).await;
+ } else {
+ let upload_id = self.initiate_upload().await?;
+ self.upload_id = Some(upload_id);
+ self.upload_id.as_deref().unwrap()
+ }
+ }
+ };
+
+ // Ignore empty bytes
+ if bs.is_empty() {
+ return Ok(());
+ }
+
+ self.buffer.push(bs);
+ // Return directly if the buffer is not full
+ if self.buffer.len() <= self.buffer_size {
+ return Ok(());
+ }
+
+ let bs = self.buffer.peak_at_least(self.buffer_size);
+ let size = bs.len();
+ match self.write_part(upload_id, bs).await {
+ Ok(part) => {
+ self.buffer.take(size);
+ self.parts.push(part);
Ok(())
}
- _ => Err(parse_error(resp).await?),
+ Err(e) => {
+ // If the upload fails, we should pop the given bs to make sure
+ // write is re-enter safe.
+ self.buffer.pop();
+ Err(e)
+ }
}
}
@@ -153,6 +225,21 @@ impl oio::Write for S3Writer {
return Ok(());
};
+ // Make sure internal buffer has been flushed.
+ if !self.buffer.is_empty() {
+ let bs = self.buffer.peak_exact(self.buffer.len());
+
+ match self.write_part(upload_id, bs).await {
+ Ok(part) => {
+ self.buffer.clear();
+ self.parts.push(part);
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ }
+ }
+
let resp = self
.core
.s3_complete_multipart_upload(&self.path, upload_id, &self.parts)
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
index 49ed6bc3..0cdbe3a5 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -951,7 +951,7 @@ impl Accessor for WasabiBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
Ok((
RpWrite::default(),
- WasabiWriter::new(self.core.clone(), args, path.to_string(), None),
+ WasabiWriter::new(self.core.clone(), args, path.to_string()),
))
}
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index 90d193df..f9b998b4 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -32,68 +32,50 @@ pub struct WasabiWriter {
op: OpWrite,
path: String,
-
- upload_id: Option<String>,
}
impl WasabiWriter {
- pub fn new(
- core: Arc<WasabiCore>,
- op: OpWrite,
- path: String,
- upload_id: Option<String>,
- ) -> Self {
- WasabiWriter {
- core,
-
- op,
- path,
- upload_id,
- }
+ pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self {
+ WasabiWriter { core, op, path }
}
}
#[async_trait]
impl oio::Write for WasabiWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- debug_assert!(
- self.upload_id.is_none(),
- "Writer initiated with upload id, but users trying to call write,
must be buggy"
- );
-
- let resp = self
- .core
- .put_object(
- &self.path,
- Some(bs.len()),
- self.op.content_type(),
- self.op.content_disposition(),
- self.op.cache_control(),
- AsyncBody::Bytes(bs),
- )
- .await?;
+ if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
+ let resp = self
+ .core
+ .put_object(
+ &self.path,
+ Some(bs.len()),
+ self.op.content_type(),
+ self.op.content_disposition(),
+ self.op.cache_control(),
+ AsyncBody::Bytes(bs),
+ )
+ .await?;
- match resp.status() {
- StatusCode::CREATED | StatusCode::OK => {
- resp.into_body().consume().await?;
- Ok(())
+ match resp.status() {
+ StatusCode::CREATED | StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
}
- _ => Err(parse_error(resp).await?),
- }
- }
-
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let resp = self
- .core
- .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))
- .await?;
+ } else {
+ let resp = self
+ .core
+ .append_object(&self.path, Some(bs.len()),
AsyncBody::Bytes(bs))
+ .await?;
- match resp.status() {
- StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => {
- resp.into_body().consume().await?;
- Ok(())
+ match resp.status() {
+ StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT
=> {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
}
- _ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index 23032855..2f872229 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -300,10 +300,10 @@ impl Accessor for WebdavBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index 0fa180c9..89ddd7b8 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebdavWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 094bef24..3a8bb7c2 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -519,10 +519,10 @@ impl Accessor for WebhdfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
+ if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
- "append write is not supported",
+ "write without content length is not supported",
));
}
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index f3ab7633..16448141 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebhdfsWriter {
}
}
- async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
-
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
- }
-
async fn abort(&mut self) -> Result<()> {
Ok(())
}
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index c6bae2dd..6b7a48df 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -443,7 +443,12 @@ impl BlockingOperator {
/// # }
/// ```
pub fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
- self.write_with(path, OpWrite::new(), bs)
+ let bs = bs.into();
+ self.write_with(
+ path,
+ OpWrite::new().with_content_length(bs.len() as u64),
+ bs,
+ )
}
/// Copy a file from `from` to `to`.
@@ -594,8 +599,11 @@ impl BlockingOperator {
);
}
- let (_, mut w) = self.inner().blocking_write(&path, args)?;
- w.write(bs.into())?;
+ let bs = bs.into();
+ let (_, mut w) = self
+ .inner()
+ .blocking_write(&path, args.with_content_length(bs.len() as u64))?;
+ w.write(bs)?;
w.close()?;
Ok(())
@@ -618,8 +626,8 @@ impl BlockingOperator {
///
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let mut w = op.writer("path/to/file")?;
- /// w.append(vec![0; 4096])?;
- /// w.append(vec![1; 4096])?;
+ /// w.write(vec![0; 4096])?;
+ /// w.write(vec![1; 4096])?;
/// w.close()?;
/// # Ok(())
/// # }
@@ -636,8 +644,8 @@ impl BlockingOperator {
);
}
- let op = OpWrite::default().with_append();
- BlockingWriter::create_dir(self.inner().clone(), &path, op)
+ let op = OpWrite::default();
+ BlockingWriter::create(self.inner().clone(), &path, op)
}
/// Delete given path.
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index 8156dcc8..ad24a542 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -631,7 +631,13 @@ impl Operator {
/// # }
/// ```
pub async fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> {
- self.write_with(path, OpWrite::new(), bs).await
+ let bs = bs.into();
+ self.write_with(
+ path,
+ OpWrite::new().with_content_length(bs.len() as u64),
+ bs,
+ )
+ .await
}
/// Copy a file from `from` to `to`.
@@ -769,8 +775,8 @@ impl Operator {
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let mut w = op.writer("path/to/file").await?;
- /// w.append(vec![0; 4096]).await?;
- /// w.append(vec![1; 4096]).await?;
+ /// w.write(vec![0; 4096]).await?;
+ /// w.write(vec![1; 4096]).await?;
/// w.close().await?;
/// # Ok(())
/// # }
@@ -799,8 +805,8 @@ impl Operator {
/// # async fn test(op: Operator) -> Result<()> {
/// let args =
OpWrite::new().with_content_type("application/octet-stream");
/// let mut w = op.writer_with("path/to/file", args).await?;
- /// w.append(vec![0; 4096]).await?;
- /// w.append(vec![1; 4096]).await?;
+ /// w.write(vec![0; 4096]).await?;
+ /// w.write(vec![1; 4096]).await?;
/// w.close().await?;
/// # Ok(())
/// # }
@@ -817,7 +823,7 @@ impl Operator {
);
}
- Writer::create_dir(self.inner().clone(), &path,
args.with_append()).await
+ Writer::create(self.inner().clone(), &path, args).await
}
/// Write data with extra options.
@@ -854,8 +860,12 @@ impl Operator {
);
}
- let (_, mut w) = self.inner().write(&path, args).await?;
- w.write(bs.into()).await?;
+ let bs = bs.into();
+ let (_, mut w) = self
+ .inner()
+ .write(&path, args.with_content_length(bs.len() as u64))
+ .await?;
+ w.write(bs).await?;
w.close().await?;
Ok(())
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index 616316a5..63180d5a 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -320,12 +320,10 @@ impl OpStat {
/// Args for `write` operation.
#[derive(Debug, Clone, Default)]
pub struct OpWrite {
- append: bool,
-
+ content_length: Option<u64>,
content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
- if_match: Option<String>,
}
impl OpWrite {
@@ -336,13 +334,20 @@ impl OpWrite {
Self::default()
}
- pub(crate) fn with_append(mut self) -> Self {
- self.append = true;
- self
+ /// Get the content length from op.
+ ///
+ /// The content length is the total length of the data to be written.
+ pub fn content_length(&self) -> Option<u64> {
+ self.content_length
}
- pub(crate) fn append(&self) -> bool {
- self.append
+ /// Set the content length of op.
+ ///
+ /// If the content length is not set, the content length will be
+ /// calculated automatically by buffering part of data.
+ pub fn with_content_length(mut self, content_length: u64) -> Self {
+ self.content_length = Some(content_length);
+ self
}
/// Get the content type from option
@@ -377,17 +382,6 @@ impl OpWrite {
self.cache_control = Some(cache_control.to_string());
self
}
-
- /// Set the If-Match of the option
- pub fn with_if_match(mut self, if_match: &str) -> Self {
- self.if_match = Some(if_match.to_string());
- self
- }
-
- /// Get If-Match from option
- pub fn if_match(&self) -> Option<&str> {
- self.if_match.as_deref()
- }
}
/// Args for `copy` operation.
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index dc4d2bfd..321988b9 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -52,7 +52,7 @@ impl Writer {
///
/// We don't want to expose those details to users so keep this function
/// in crate only.
- pub(crate) async fn create_dir(acc: FusedAccessor, path: &str, op:
OpWrite) -> Result<Self> {
+ pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) ->
Result<Self> {
let (_, w) = acc.write(path, op).await?;
Ok(Writer {
@@ -60,17 +60,13 @@ impl Writer {
})
}
- /// Append data into writer.
- ///
- /// It is highly recommended to align the length of the input bytes
- /// into blocks of 4MiB (except the last block) for better performance
- /// and compatibility.
- pub async fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+ /// Write into inner writer.
+ pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
- w.append(bs.into()).await
+ w.write(bs.into()).await
} else {
unreachable!(
- "writer state invalid while append, expect Idle, actual {}",
+ "writer state invalid while write, expect Idle, actual {}",
self.state
);
}
@@ -132,7 +128,7 @@ impl AsyncWrite for Writer {
let bs = Bytes::from(buf.to_vec());
let size = bs.len();
let fut = async move {
- w.append(bs).await?;
+ w.write(bs).await?;
Ok((size, w))
};
self.state = State::Write(Box::pin(fut));
@@ -184,6 +180,72 @@ impl AsyncWrite for Writer {
}
}
+impl tokio::io::AsyncWrite for Writer {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => {
+ let mut w = w
+ .take()
+ .expect("invalid state of writer: Idle state with
empty write");
+ let bs = Bytes::from(buf.to_vec());
+ let size = bs.len();
+ let fut = async move {
+ w.write(bs).await?;
+ Ok((size, w))
+ };
+ self.state = State::Write(Box::pin(fut));
+ }
+ State::Write(fut) => match ready!(fut.poll_unpin(cx)) {
+ Ok((size, w)) => {
+ self.state = State::Idle(Some(w));
+ return Poll::Ready(Ok(size));
+ }
+ Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ },
+ State::Close(_) => {
+ unreachable!("invalid state of writer: poll_write with
State::Close")
+ }
+ };
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) ->
Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<io::Result<()>> {
+ loop {
+ match &mut self.state {
+ State::Idle(w) => {
+ let mut w = w
+ .take()
+ .expect("invalid state of writer: Idle state with
empty write");
+ let fut = async move {
+ w.close().await?;
+ Ok(w)
+ };
+ self.state = State::Close(Box::pin(fut));
+ }
+ State::Write(_) => {
+ unreachable!("invalid state of writer: poll_close with
State::Write")
+ }
+ State::Close(fut) => match ready!(fut.poll_unpin(cx)) {
+ Ok(w) => {
+ self.state = State::Idle(Some(w));
+ return Poll::Ready(Ok(()));
+ }
+ Err(err) => return
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, err))),
+ },
+ }
+ }
+ }
+}
+
/// BlockingWriter is designed to write data into given path in an blocking
/// manner.
pub struct BlockingWriter {
@@ -198,19 +260,15 @@ impl BlockingWriter {
///
/// We don't want to expose those details to users so keep this function
/// in crate only.
- pub(crate) fn create_dir(acc: FusedAccessor, path: &str, op: OpWrite) ->
Result<Self> {
+ pub(crate) fn create(acc: FusedAccessor, path: &str, op: OpWrite) ->
Result<Self> {
let (_, w) = acc.blocking_write(path, op)?;
Ok(BlockingWriter { inner: w })
}
- /// Append data into writer.
- ///
- /// It is highly recommended to align the length of the input bytes
- /// into blocks of 4MiB (except the last block) for better performance
- /// and compatibility.
- pub fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
- self.inner.append(bs.into())
+ /// Write into inner writer.
+ pub fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+ self.inner.write(bs.into())
}
/// Close the writer and make sure all data have been stored.
@@ -222,7 +280,8 @@ impl BlockingWriter {
impl io::Write for BlockingWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let size = buf.len();
- self.append(Bytes::from(buf.to_vec()))
+ self.inner
+ .write(Bytes::from(buf.to_vec()))
.map(|_| size)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 44190997..b909d436 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -25,6 +25,7 @@ use log::debug;
use opendal::layers::LoggingLayer;
use opendal::layers::RetryLayer;
use opendal::*;
+use rand::distributions::uniform::SampleRange;
use rand::prelude::*;
use sha2::Digest;
use sha2::Sha256;
@@ -88,6 +89,16 @@ pub fn gen_bytes() -> (Vec<u8>, usize) {
(content, size)
}
+pub fn gen_bytes_with_range(range: impl SampleRange<usize>) -> (Vec<u8>,
usize) {
+ let mut rng = thread_rng();
+
+ let size = rng.gen_range(range);
+ let mut content = vec![0; size];
+ rng.fill_bytes(&mut content);
+
+ (content, size)
+}
+
pub fn gen_fixed_bytes(size: usize) -> Vec<u8> {
let mut rng = thread_rng();
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 91c9ec08..10082b57 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -97,8 +97,9 @@ macro_rules! behavior_write_tests {
test_delete_with_special_chars,
test_delete_not_existing,
test_delete_stream,
- test_append,
- test_abort_writer,
+ test_writer_write,
+ test_writer_abort,
+ test_writer_futures_copy,
);
)*
};
@@ -580,7 +581,7 @@ pub async fn test_read_with_special_chars(op: Operator) ->
Result<()> {
}
// Delete existing file should succeed.
-pub async fn test_abort_writer(op: Operator) -> Result<()> {
+pub async fn test_writer_abort(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content, _) = gen_bytes();
@@ -592,7 +593,7 @@ pub async fn test_abort_writer(op: Operator) -> Result<()> {
}
};
- if let Err(e) = writer.append(content).await {
+ if let Err(e) = writer.write(content).await {
assert_eq!(e.kind(), ErrorKind::Unsupported);
return Ok(());
}
@@ -685,8 +686,8 @@ pub async fn test_delete_stream(op: Operator) -> Result<()>
{
Ok(())
}
-// Append write
-pub async fn test_append(op: Operator) -> Result<()> {
+// Append data into writer
+pub async fn test_writer_write(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let size = 5 * 1024 * 1024; // write file with 5 MiB
let content_a = gen_fixed_bytes(size);
@@ -700,8 +701,8 @@ pub async fn test_append(op: Operator) -> Result<()> {
}
Err(err) => return Err(err.into()),
};
- w.append(content_a.clone()).await?;
- w.append(content_b.clone()).await?;
+ w.write(content_a.clone()).await?;
+ w.write(content_b.clone()).await?;
w.close().await?;
let meta = op.stat(&path).await.expect("stat must succeed");
@@ -723,3 +724,36 @@ pub async fn test_append(op: Operator) -> Result<()> {
op.delete(&path).await.expect("delete must succeed");
Ok(())
}
+
+// copy data from reader to writer
+pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, size): (Vec<u8>, usize) =
+ gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+ let mut w = match op.writer(&path).await {
+ Ok(w) => w,
+ Err(err) if err.kind() == ErrorKind::Unsupported => {
+ warn!("service doesn't support write with append");
+ return Ok(());
+ }
+ Err(err) => return Err(err.into()),
+ };
+
+ futures::io::copy(&mut content.as_slice(), &mut w).await?;
+ w.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), size as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content)),
+ "read content"
+ );
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}