This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new fda9670d6 refactor: Remove the requirement of passing `content_length`
to writer (#3044)
fda9670d6 is described below
commit fda9670d6eba11e61175e65a29e6ee3f14120c33
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 10:16:36 2023 +0800
refactor: Remove the requirement of passing `content_length` to writer
(#3044)
* Polish
Signed-off-by: Xuanwo <[email protected]>
* Save work
Signed-off-by: Xuanwo <[email protected]>
* delay write for oneshot
Signed-off-by: Xuanwo <[email protected]>
* Fix doc test
Signed-off-by: Xuanwo <[email protected]>
* Add comments
Signed-off-by: Xuanwo <[email protected]>
* Fix naming
Signed-off-by: Xuanwo <[email protected]>
* Fix ftp
Signed-off-by: Xuanwo <[email protected]>
* Add check for append
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
.github/workflows/service_test_webhdfs.yml | 4 +-
core/fuzz/fuzz_writer.rs | 2 +-
core/src/layers/complete.rs | 86 +++++++--------------------
core/src/raw/adapters/kv/backend.rs | 18 +-----
core/src/raw/adapters/typed_kv/api.rs | 8 +--
core/src/raw/adapters/typed_kv/backend.rs | 7 +--
core/src/raw/oio/write/one_shot_write.rs | 68 ++++++++++++++-------
core/src/raw/ops.rs | 35 +++--------
core/src/services/azblob/writer.rs | 6 +-
core/src/services/azdfs/backend.rs | 7 ---
core/src/services/azdfs/writer.rs | 15 ++---
core/src/services/cos/backend.rs | 4 +-
core/src/services/dropbox/backend.rs | 6 --
core/src/services/dropbox/writer.rs | 9 ++-
core/src/services/fs/backend.rs | 2 +-
core/src/services/ftp/backend.rs | 20 +++----
core/src/services/ftp/writer.rs | 60 +++++--------------
core/src/services/gcs/backend.rs | 4 +-
core/src/services/gdrive/backend.rs | 9 +--
core/src/services/gdrive/writer.rs | 4 +-
core/src/services/ghac/backend.rs | 9 +--
core/src/services/ipmfs/backend.rs | 15 ++---
core/src/services/ipmfs/writer.rs | 5 +-
core/src/services/obs/backend.rs | 4 +-
core/src/services/onedrive/backend.rs | 7 ---
core/src/services/onedrive/writer.rs | 4 +-
core/src/services/oss/backend.rs | 5 +-
core/src/services/s3/backend.rs | 5 +-
core/src/services/sftp/backend.rs | 3 +-
core/src/services/supabase/backend.rs | 7 ---
core/src/services/supabase/writer.rs | 11 ++--
core/src/services/vercel_artifacts/backend.rs | 7 ---
core/src/services/vercel_artifacts/writer.rs | 11 ++--
core/src/services/wasabi/backend.rs | 7 ---
core/src/services/wasabi/writer.rs | 10 ++--
core/src/services/webdav/backend.rs | 7 ---
core/src/services/webdav/writer.rs | 10 ++--
core/src/services/webhdfs/backend.rs | 7 ---
core/src/services/webhdfs/writer.rs | 9 +--
core/src/types/capability.rs | 71 ++++++++++------------
core/src/types/operator/blocking_operator.rs | 2 +-
core/src/types/operator/operator.rs | 2 +-
core/src/types/operator/operator_functions.rs | 28 ++-------
core/src/types/operator/operator_futures.rs | 41 ++-----------
core/src/types/writer.rs | 42 +++++++++----
core/tests/behavior/write.rs | 18 +++---
46 files changed, 266 insertions(+), 455 deletions(-)
diff --git a/.github/workflows/service_test_webhdfs.yml
b/.github/workflows/service_test_webhdfs.yml
index 3487c2db6..f32d9d2d3 100644
--- a/.github/workflows/service_test_webhdfs.yml
+++ b/.github/workflows/service_test_webhdfs.yml
@@ -37,7 +37,7 @@ concurrency:
cancel-in-progress: true
jobs:
- hdfs:
+ webhdfs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
@@ -76,7 +76,7 @@ jobs:
OPENDAL_WEBHDFS_ROOT: /
OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870
- hdfs_with_list_batch_disabled:
+ webhdfs_with_list_batch_disabled:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs
index 2f992b919..ff1eb0bdb 100644
--- a/core/fuzz/fuzz_writer.rs
+++ b/core/fuzz/fuzz_writer.rs
@@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) ->
Result<()> {
let checker = WriteChecker::new(total_size);
- let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+ let mut writer = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
for chunk in &checker.chunks {
writer.write(chunk.clone()).await?;
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index cc767f90e..1abed7bbd 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -426,12 +426,17 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
if !capability.write {
return new_capability_unsupported_error(Operation::Write);
}
+ if args.append() && !capability.write_can_append {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write with append enabled is not supported",
+ ));
+ }
- let size = args.content_length();
self.inner
.write(path, args)
.await
- .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+ .map(|(rp, w)| (rp, CompleteWriter::new(w)))
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
@@ -439,11 +444,16 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
if !capability.write || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingWrite);
}
+ if args.append() && !capability.write_can_append {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write with append enabled is not supported",
+ ));
+ }
- let size = args.content_length();
self.inner
.blocking_write(path, args)
- .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
+ .map(|(rp, w)| (rp, CompleteWriter::new(w)))
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
@@ -681,17 +691,11 @@ where
pub struct CompleteWriter<W> {
inner: Option<W>,
- size: Option<u64>,
- written: u64,
}
impl<W> CompleteWriter<W> {
- pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
- CompleteWriter {
- inner: Some(inner),
- size,
- written: 0,
- }
+ pub fn new(inner: W) -> CompleteWriter<W> {
+ CompleteWriter { inner: Some(inner) }
}
}
@@ -717,52 +721,27 @@ where
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
let n = ready!(w.poll_write(cx, bs))?;
- self.written += n as u64;
-
- if let Some(size) = self.size {
- if self.written > size {
- return Poll::Ready(Err(Error::new(
- ErrorKind::ContentTruncated,
- &format!(
- "writer got too much data, expect: {size}, actual: {}",
- self.written + n as u64
- ),
- )));
- }
- }
Poll::Ready(Ok(n))
}
- fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- ready!(w.poll_abort(cx))?;
+ ready!(w.poll_close(cx))?;
self.inner = None;
Poll::Ready(Ok(()))
}
- fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
- if let Some(size) = self.size {
- if self.written < size {
- return Poll::Ready(Err(Error::new(
- ErrorKind::ContentIncomplete,
- &format!(
- "writer got too less data, expect: {size}, actual: {}",
- self.written
- ),
- )));
- }
- }
-
+ fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- ready!(w.poll_close(cx))?;
+ ready!(w.poll_abort(cx))?;
self.inner = None;
Poll::Ready(Ok(()))
@@ -778,36 +757,11 @@ where
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
let n = w.write(bs)?;
- self.written += n as u64;
-
- if let Some(size) = self.size {
- if self.written > size {
- return Err(Error::new(
- ErrorKind::ContentTruncated,
- &format!(
- "writer got too much data, expect: {size}, actual: {}",
- self.written + n as u64
- ),
- ));
- }
- }
Ok(n)
}
fn close(&mut self) -> Result<()> {
- if let Some(size) = self.size {
- if self.written < size {
- return Err(Error::new(
- ErrorKind::ContentIncomplete,
- &format!(
- "writer got too less data, expect: {size}, actual: {}",
- self.written
- ),
- ));
- }
- }
-
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index db82e1cd6..f7a40e698 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -143,27 +143,13 @@ impl<S: Adapter> Accessor for Backend<S> {
Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
}
- fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
diff --git a/core/src/raw/adapters/typed_kv/api.rs
b/core/src/raw/adapters/typed_kv/api.rs
index 3217db576..7a1b929a0 100644
--- a/core/src/raw/adapters/typed_kv/api.rs
+++ b/core/src/raw/adapters/typed_kv/api.rs
@@ -121,13 +121,13 @@ impl Value {
/// by Typed KV Operator.
#[derive(Copy, Clone, Default)]
pub struct Capability {
- /// If typed_kv operator supports get natively, it will be true.
+ /// If typed_kv operator supports get natively.
pub get: bool,
- /// If typed_kv operator supports set natively, it will be true.
+ /// If typed_kv operator supports set natively.
pub set: bool,
- /// If typed_kv operator supports delete natively, it will be true.
+ /// If typed_kv operator supports delete natively.
pub delete: bool,
- /// If typed_kv operator supports scan natively, it will be true.
+ /// If typed_kv operator supports scan natively.
pub scan: bool,
}
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index e04822cdd..6d515b01a 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -393,6 +393,8 @@ impl<S> KvWriter<S> {
let value = self.buf.take().map(Bytes::from).unwrap_or_default();
let mut metadata = Metadata::new(EntryMode::FILE);
+ metadata.set_content_length(value.len() as u64);
+
if let Some(v) = self.op.cache_control() {
metadata.set_cache_control(v);
}
@@ -402,11 +404,6 @@ impl<S> KvWriter<S> {
if let Some(v) = self.op.content_type() {
metadata.set_content_type(v);
}
- if let Some(v) = self.op.content_length() {
- metadata.set_content_length(v);
- } else {
- metadata.set_content_length(value.len() as u64);
- }
Value { metadata, value }
}
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
index c56679e19..4efb3655b 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -20,7 +20,6 @@ use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
-use bytes::Bytes;
use futures::future::BoxFuture;
use crate::raw::*;
@@ -37,17 +36,18 @@ pub trait OneShotWrite: Send + Sync + Unpin + 'static {
/// write_once write all data at once.
///
/// Implementations should make sure that the data is written correctly at
once.
- async fn write_once(&self, bs: Bytes) -> Result<()>;
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()>;
}
/// OneShotWrite is used to implement [`Write`] based on one shot.
pub struct OneShotWriter<W: OneShotWrite> {
state: State<W>,
+ buffer: Option<oio::ChunkedBytes>,
}
enum State<W> {
Idle(Option<W>),
- Write(BoxFuture<'static, (W, Result<usize>)>),
+ Write(BoxFuture<'static, (W, Result<()>)>),
}
/// # Safety
@@ -60,45 +60,73 @@ impl<W: OneShotWrite> OneShotWriter<W> {
pub fn new(inner: W) -> Self {
Self {
state: State::Idle(Some(inner)),
+ buffer: None,
}
}
}
#[async_trait]
impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
- fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
+ fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
+ loop {
+ match &mut self.state {
+ State::Idle(_) => {
+ return match &self.buffer {
+ Some(_) => Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "OneShotWriter doesn't support multiple write",
+ ))),
+ None => {
+ let size = bs.remaining();
+ let bs = bs.vectored_bytes(size);
+ self.buffer =
Some(oio::ChunkedBytes::from_vec(bs));
+ Poll::Ready(Ok(size))
+ }
+ }
+ }
+ State::Write(_) => {
+ unreachable!("OneShotWriter must not go into State::Write
during poll_write")
+ }
+ }
+ }
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match &mut self.state {
State::Idle(w) => {
let w = w.take().expect("writer must be valid");
- let size = bs.remaining();
- let bs = bs.bytes(size);
- let fut = async move {
- let res = w.write_once(bs).await;
+ match self.buffer.clone() {
+ Some(bs) => {
+ let fut = Box::pin(async move {
+ let res = w.write_once(&bs).await;
- (w, res.map(|_| size))
- };
+ (w, res)
+ });
+ self.state = State::Write(fut);
+ }
+ None => {
+ let fut = Box::pin(async move {
+ let res = w.write_once(&"".as_bytes()).await;
- self.state = State::Write(Box::pin(fut));
+ (w, res)
+ });
+ self.state = State::Write(fut);
+ }
+ };
}
State::Write(fut) => {
- let (w, size) = ready!(fut.as_mut().poll(cx));
+ let (w, res) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
- return Poll::Ready(size);
+ return Poll::Ready(res);
}
}
}
}
fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- Poll::Ready(Err(Error::new(
- ErrorKind::Unsupported,
- "OneShotWriter doesn't support abort since all content has been
flushed",
- )))
- }
-
- fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ self.buffer = None;
Poll::Ready(Ok(()))
}
}
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 0404ed468..1e60d329e 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -405,9 +405,8 @@ impl OpStat {
#[derive(Debug, Clone, Default)]
pub struct OpWrite {
append: bool,
+ buffer: Option<usize>,
- buffer_size: Option<usize>,
- content_length: Option<u64>,
content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
@@ -440,39 +439,23 @@ impl OpWrite {
self
}
- /// Get the buffer size from op.
+ /// Get the buffer from op.
///
- /// The buffer size is used by service to decide the buffer size of the
underlying writer.
- pub fn buffer_size(&self) -> Option<usize> {
- self.buffer_size
+ /// The buffer is used by service to decide the buffer size of the
underlying writer.
+ pub fn buffer(&self) -> Option<usize> {
+ self.buffer
}
- /// Set the buffer size of op.
+ /// Set the buffer of op.
///
- /// If buffer size is set, the data will be buffered by the underlying
writer.
+ /// If buffer is set, the data will be buffered by the underlying writer.
///
/// ## NOTE
///
/// Service could have their own minimum buffer size while perform write
operations like
/// multipart uploads. So the buffer size may be larger than the given
buffer size.
- pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
- self.buffer_size = Some(buffer_size);
- self
- }
-
- /// Get the content length from op.
- ///
- /// The content length is the total length of the data to be written.
- pub fn content_length(&self) -> Option<u64> {
- self.content_length
- }
-
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn with_content_length(mut self, content_length: u64) -> Self {
- self.content_length = Some(content_length);
+ pub fn with_buffer(mut self, buffer: usize) -> Self {
+ self.buffer = Some(buffer);
self
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index 029278ad5..301c9f733 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::AzblobCore;
@@ -46,13 +45,14 @@ impl AzblobWriter {
#[async_trait]
impl oio::OneShotWrite for AzblobWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len() as u64),
self.op.content_type(),
self.op.cache_control(),
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)?;
self.core.sign(&mut req).await?;
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index 1c0dd3a8c..c729ab754 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -296,13 +296,6 @@ impl Accessor for AzdfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
Ok((
RpWrite::default(),
oio::OneShotWriter::new(AzdfsWriter::new(self.core.clone(), args,
path.to_string())),
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 9e2a962f2..0424fe494 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,11 +18,11 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::AzdfsCore;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -41,7 +41,7 @@ impl AzdfsWriter {
#[async_trait]
impl oio::OneShotWrite for AzdfsWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
let mut req = self.core.azdfs_create_request(
&self.path,
"file",
@@ -66,11 +66,12 @@ impl oio::OneShotWrite for AzdfsWriter {
}
}
- let size = bs.len();
-
- let mut req =
- self.core
- .azdfs_update_request(&self.path, Some(size),
AsyncBody::Bytes(bs))?;
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+ let mut req = self.core.azdfs_update_request(
+ &self.path,
+ Some(bs.len()),
+ AsyncBody::ChunkedBytes(bs),
+ )?;
self.core.sign(&mut req).await?;
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index eced1ccc9..897c4c10a 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -272,10 +272,10 @@ impl Accessor for CosBackend {
write: true,
write_can_append: true,
+ write_can_multi: true,
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
- write_without_content_length: true,
delete: true,
create_dir: true,
@@ -342,7 +342,7 @@ impl Accessor for CosBackend {
CosWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer_size() {
+ let w = if let Some(buffer_size) = args.buffer() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/dropbox/backend.rs
b/core/src/services/dropbox/backend.rs
index 029f1fe7f..8400b3300 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -106,12 +106,6 @@ impl Accessor for DropboxBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
Ok((
RpWrite::default(),
oio::OneShotWriter::new(DropboxWriter::new(
diff --git a/core/src/services/dropbox/writer.rs
b/core/src/services/dropbox/writer.rs
index 3a5c6cdd7..b5a4a69af 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::DropboxCore;
@@ -40,16 +39,16 @@ impl DropboxWriter {
#[async_trait]
impl oio::OneShotWrite for DropboxWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let resp = self
.core
.dropbox_update(
&self.path,
- Some(size),
+ Some(bs.len()),
self.op.content_type(),
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)
.await?;
let status = resp.status();
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 950cfd76e..b1f1fdf49 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -263,7 +263,7 @@ impl Accessor for FsBackend {
write: true,
write_can_append: true,
- write_without_content_length: true,
+ write_can_multi: true,
create_dir: true,
delete: true,
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index ed960c5f7..2fe8ffe9a 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -42,6 +42,7 @@ use super::pager::FtpPager;
use super::util::FtpReader;
use super::writer::FtpWriter;
use crate::raw::*;
+use crate::services::ftp::writer::FtpWriters;
use crate::*;
/// FTP and FTPS services support.
@@ -264,7 +265,7 @@ impl Debug for FtpBackend {
impl Accessor for FtpBackend {
type Reader = FtpReader;
type BlockingReader = ();
- type Writer = FtpWriter;
+ type Writer = FtpWriters;
type BlockingWriter = ();
type Pager = FtpPager;
type BlockingPager = ();
@@ -351,14 +352,7 @@ impl Accessor for FtpBackend {
Ok((RpRead::new(size), FtpReader::new(r, ftp_stream)))
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
// Ensure the parent dir exists.
let parent = get_parent(path);
let paths: Vec<&str> = parent.split('/').collect();
@@ -381,10 +375,10 @@ impl Accessor for FtpBackend {
}
}
- Ok((
- RpWrite::new(),
- FtpWriter::new(self.clone(), path.to_string()),
- ))
+ let w = FtpWriter::new(self.clone(), path.to_string());
+ let w = oio::OneShotWriter::new(w);
+
+ Ok((RpWrite::new(), w))
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 4b2658907..79ae8e55d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,24 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-use std::task::ready;
-use std::task::Context;
-use std::task::Poll;
-
use async_trait::async_trait;
-use futures::future::BoxFuture;
use futures::AsyncWriteExt;
-use futures::FutureExt;
use super::backend::FtpBackend;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
+pub type FtpWriters = oio::OneShotWriter<FtpWriter>;
+
pub struct FtpWriter {
backend: FtpBackend,
path: String,
-
- fut: Option<BoxFuture<'static, Result<usize>>>,
}
/// # TODO
@@ -42,11 +37,7 @@ pub struct FtpWriter {
/// After we can use data stream, we should return it directly.
impl FtpWriter {
pub fn new(backend: FtpBackend, path: String) -> Self {
- FtpWriter {
- backend,
- path,
- fut: None,
- }
+ FtpWriter { backend, path }
}
}
@@ -56,39 +47,18 @@ impl FtpWriter {
unsafe impl Sync for FtpWriter {}
#[async_trait]
-impl oio::Write for FtpWriter {
- fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
- loop {
- if let Some(fut) = self.fut.as_mut() {
- let res = ready!(fut.poll_unpin(cx));
- self.fut = None;
- return Poll::Ready(res);
- }
-
- let size = bs.remaining();
- let bs = bs.bytes(size);
+impl oio::OneShotWrite for FtpWriter {
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let size = bs.remaining();
+ let bs = bs.bytes(size);
- let path = self.path.clone();
- let backend = self.backend.clone();
- let fut = async move {
- let mut ftp_stream =
backend.ftp_connect(Operation::Write).await?;
- let mut data_stream =
ftp_stream.append_with_stream(&path).await?;
- data_stream.write_all(&bs).await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "copy from ftp
stream").set_source(err)
- })?;
-
- ftp_stream.finalize_put_stream(data_stream).await?;
- Ok(size)
- };
- self.fut = Some(Box::pin(fut));
- }
- }
-
- fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- Poll::Ready(Ok(()))
- }
+ let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
+ let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
+ data_stream.write_all(&bs).await.map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "copy from ftp
stream").set_source(err)
+ })?;
- fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- Poll::Ready(Ok(()))
+ ftp_stream.finalize_put_stream(data_stream).await?;
+ Ok(())
}
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 9e26cdce1..20e7a1f5e 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -362,8 +362,8 @@ impl Accessor for GcsBackend {
read_with_if_none_match: true,
write: true,
+ write_can_multi: true,
write_with_content_type: true,
- write_without_content_length: true,
delete: true,
copy: true,
@@ -423,7 +423,7 @@ impl Accessor for GcsBackend {
let w = GcsWriter::new(self.core.clone(), path, args.clone());
let w = oio::RangeWriter::new(w);
- let w = if let Some(buffer_size) = args.buffer_size() {
+ let w = if let Some(buffer_size) = args.buffer() {
// FIXME: we should align with 256KiB instead.
let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index ea67ed200..66c181d2b 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -166,14 +166,7 @@ impl Accessor for GdriveBackend {
}
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
// As Google Drive allows files have the same name, we need to check
if the file exists.
// If the file exists, we will keep its ID and update it.
let mut file_id: Option<String> = None;
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index e70f1754e..445ecb36f 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
use super::core::GdriveCore;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -88,7 +89,8 @@ impl GdriveWriter {
#[async_trait]
impl oio::OneShotWrite for GdriveWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs = bs.bytes(bs.remaining());
let size = bs.len();
if self.file_id.is_none() {
self.write_create(size as u64, bs).await?;
diff --git a/core/src/services/ghac/backend.rs
b/core/src/services/ghac/backend.rs
index 8adce69a3..e78fdb405 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -404,14 +404,7 @@ impl Accessor for GhacBackend {
}
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let req = self.ghac_reserve(path).await?;
let resp = self.client.send(req).await?;
diff --git a/core/src/services/ipmfs/backend.rs
b/core/src/services/ipmfs/backend.rs
index eda6d8895..e7999767b 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -21,7 +21,6 @@ use std::str;
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::Request;
use http::Response;
use http::StatusCode;
@@ -121,14 +120,7 @@ impl Accessor for IpmfsBackend {
}
}
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
Ok((
RpWrite::default(),
oio::OneShotWriter::new(IpmfsWriter::new(self.clone(),
path.to_string())),
@@ -290,7 +282,7 @@ impl IpmfsBackend {
pub async fn ipmfs_write(
&self,
path: &str,
- body: Bytes,
+ body: oio::ChunkedBytes,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path);
@@ -300,7 +292,8 @@ impl IpmfsBackend {
percent_encode_path(&p)
);
- let multipart =
Multipart::new().part(FormDataPart::new("data").content(body));
+ let multipart = Multipart::new()
+ .part(FormDataPart::new("data").stream(body.len() as u64,
Box::new(body)));
let req: http::request::Builder = Request::post(url);
let req = multipart.apply(req)?;
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index e6a32ac1b..96d9dab18 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,11 +16,11 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::IpmfsBackend;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -38,7 +38,8 @@ impl IpmfsWriter {
#[async_trait]
impl oio::OneShotWrite for IpmfsWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let resp = self.backend.ipmfs_write(&self.path, bs).await?;
let status = resp.status();
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 9cca757a8..7caa6d749 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -279,9 +279,9 @@ impl Accessor for ObsBackend {
write: true,
write_can_append: true,
+ write_can_multi: true,
write_with_content_type: true,
write_with_cache_control: true,
- write_without_content_length: true,
delete: true,
create_dir: true,
@@ -380,7 +380,7 @@ impl Accessor for ObsBackend {
ObsWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer_size() {
+ let w = if let Some(buffer_size) = args.buffer() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/onedrive/backend.rs
b/core/src/services/onedrive/backend.rs
index 2b32abbaf..0001d8685 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -103,13 +103,6 @@ impl Accessor for OnedriveBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
let path = build_rooted_abs_path(&self.root, path);
Ok((
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 5ad92ac63..326035acb 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -23,6 +23,7 @@ use super::backend::OnedriveBackend;
use super::error::parse_error;
use super::graph_model::OneDriveUploadSessionCreationRequestBody;
use super::graph_model::OneDriveUploadSessionCreationResponseBody;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -45,7 +46,8 @@ impl OneDriveWriter {
#[async_trait]
impl oio::OneShotWrite for OneDriveWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs = bs.bytes(bs.remaining());
let size = bs.len();
if size <= Self::MAX_SIMPLE_SIZE {
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6d027ba5b..9022079bc 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -404,10 +404,11 @@ impl Accessor for OssBackend {
write: true,
write_can_append: true,
+ write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
- write_without_content_length: true,
+
delete: true,
create_dir: true,
copy: true,
@@ -478,7 +479,7 @@ impl Accessor for OssBackend {
OssWriters::One(oio::MultipartUploadWriter::new(writer))
};
- let w = if let Some(buffer_size) = args.buffer_size() {
+ let w = if let Some(buffer_size) = args.buffer() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
let w = oio::ExactBufWriter::new(w, buffer_size);
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index aa2b95eb8..89a8330b8 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -913,9 +913,10 @@ impl Accessor for S3Backend {
read_with_override_content_type: true,
write: true,
+ write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
- write_without_content_length: true,
+
create_dir: true,
delete: true,
copy: true,
@@ -979,7 +980,7 @@ impl Accessor for S3Backend {
let w = oio::MultipartUploadWriter::new(writer);
- let w = if let Some(buffer_size) = args.buffer_size() {
+ let w = if let Some(buffer_size) = args.buffer() {
let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
index 931edcb23..0f9b7f3bd 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -243,7 +243,8 @@ impl Accessor for SftpBackend {
read_can_seek: true,
write: true,
- write_without_content_length: true,
+ write_can_multi: true,
+
create_dir: true,
delete: true,
diff --git a/core/src/services/supabase/backend.rs
b/core/src/services/supabase/backend.rs
index 402c3abe7..a5d0d3db1 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -224,13 +224,6 @@ impl Accessor for SupabaseBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
Ok((
RpWrite::default(),
oio::OneShotWriter::new(SupabaseWriter::new(self.core.clone(),
path, args)),
diff --git a/core/src/services/supabase/writer.rs
b/core/src/services/supabase/writer.rs
index c7fc6c8f8..b6929c4f5 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -18,11 +18,11 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -45,13 +45,14 @@ impl SupabaseWriter {
#[async_trait]
impl oio::OneShotWrite for SupabaseWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+
let mut req = self.core.supabase_upload_object_request(
&self.path,
- Some(size),
+ Some(bs.len()),
self.op.content_type(),
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)?;
self.core.sign(&mut req)?;
diff --git a/core/src/services/vercel_artifacts/backend.rs
b/core/src/services/vercel_artifacts/backend.rs
index 4b88be664..9a0ae95cf 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -84,13 +84,6 @@ impl Accessor for VercelArtifactsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
Ok((
RpWrite::default(),
oio::OneShotWriter::new(VercelArtifactsWriter::new(
diff --git a/core/src/services/vercel_artifacts/writer.rs
b/core/src/services/vercel_artifacts/writer.rs
index f62d76cf5..058e57041 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,7 +16,6 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::VercelArtifactsBackend;
@@ -43,12 +42,16 @@ impl VercelArtifactsWriter {
#[async_trait]
impl oio::OneShotWrite for VercelArtifactsWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let resp = self
.backend
- .vercel_artifacts_put(self.path.as_str(), size as u64,
AsyncBody::Bytes(bs))
+ .vercel_artifacts_put(
+ self.path.as_str(),
+ bs.len() as u64,
+ AsyncBody::ChunkedBytes(bs),
+ )
.await?;
let status = resp.status();
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
index 835492bad..dcb91f2ea 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -750,13 +750,6 @@ impl Accessor for WasabiBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
Ok((
RpWrite::default(),
oio::OneShotWriter::new(WasabiWriter::new(self.core.clone(), args,
path.to_string())),
diff --git a/core/src/services/wasabi/writer.rs
b/core/src/services/wasabi/writer.rs
index f9e27334e..cd76bf82c 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -18,11 +18,11 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -41,18 +41,18 @@ impl WasabiWriter {
#[async_trait]
impl oio::OneShotWrite for WasabiWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let resp = self
.core
.put_object(
&self.path,
- Some(size),
+ Some(bs.len()),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)
.await?;
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index cca8728ba..1f9aa1019 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -275,13 +275,6 @@ impl Accessor for WebdavBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
self.ensure_parent_path(path).await?;
let p = build_abs_path(&self.root, path);
diff --git a/core/src/services/webdav/writer.rs
b/core/src/services/webdav/writer.rs
index 5b6ea319f..42de4fc0e 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,11 +16,11 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::WebdavBackend;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -39,17 +39,17 @@ impl WebdavWriter {
#[async_trait]
impl oio::OneShotWrite for WebdavWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
let resp = self
.backend
.webdav_put(
&self.path,
- Some(size as u64),
+ Some(bs.len() as u64),
self.op.content_type(),
self.op.content_disposition(),
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)
.await?;
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index e8a3a8f04..b1bfed8ae 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -474,13 +474,6 @@ impl Accessor for WebhdfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.content_length().is_none() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write without content length is not supported",
- ));
- }
-
Ok((
RpWrite::default(),
oio::OneShotWriter::new(WebhdfsWriter::new(self.clone(), args,
path.to_string())),
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index b323c0173..ddc8dd328 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,11 +16,11 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::backend::WebhdfsBackend;
use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -39,14 +39,15 @@ impl WebhdfsWriter {
#[async_trait]
impl oio::OneShotWrite for WebhdfsWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
- let size = bs.len();
+ /// Using `bytes` instead of `vectored_bytes` to allow request to be
redirected.
+ async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+ let bs = bs.bytes(bs.remaining());
let req = self
.backend
.webhdfs_create_object_request(
&self.path,
- Some(size),
+ Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index 6d3389c2b..b568b3ee8 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -46,90 +46,85 @@ use std::fmt::Debug;
/// - Operation with limitations should be named like `batch_max_operations`.
#[derive(Copy, Clone, Default)]
pub struct Capability {
- /// If operator supports stat , it will be true.
+ /// If operator supports stat.
pub stat: bool,
- /// If operator supports stat with if match , it will be true.
+ /// If operator supports stat with if match.
pub stat_with_if_match: bool,
- /// If operator supports stat with if none match , it will be true.
+ /// If operator supports stat with if none match.
pub stat_with_if_none_match: bool,
- /// If operator supports read , it will be true.
+ /// If operator supports read.
pub read: bool,
- /// If operator supports seek on returning reader , it will
- /// be true.
+ /// If operator supports seek on returning reader.
pub read_can_seek: bool,
- /// If operator supports next on returning reader , it will
- /// be true.
+ /// If operator supports next on returning reader.
pub read_can_next: bool,
- /// If operator supports read with range , it will be true.
+ /// If operator supports read with range.
pub read_with_range: bool,
- /// If operator supports read with if match , it will be true.
+ /// If operator supports read with if match.
pub read_with_if_match: bool,
- /// If operator supports read with if none match , it will be true.
+ /// If operator supports read with if none match.
pub read_with_if_none_match: bool,
- /// if operator supports read with override cache control , it will be
true.
+ /// if operator supports read with override cache control.
pub read_with_override_cache_control: bool,
- /// if operator supports read with override content disposition , it will
be true.
+ /// if operator supports read with override content disposition.
pub read_with_override_content_disposition: bool,
- /// if operator supports read with override content type , it will be true.
+ /// if operator supports read with override content type.
pub read_with_override_content_type: bool,
- /// If operator supports write , it will be true.
+ /// If operator supports write.
pub write: bool,
- /// If operator supports write by append, it will be true.
+ /// If operator supports write can be called in multi times.
+ pub write_can_multi: bool,
+ /// If operator supports write by append.
pub write_can_append: bool,
- /// If operator supports write with without content length, it will
- /// be true.
- ///
- /// This feature also be called as `Unsized` write or streaming write.
- pub write_without_content_length: bool,
- /// If operator supports write with content type , it will be true.
+ /// If operator supports write with content type.
pub write_with_content_type: bool,
- /// If operator supports write with content disposition , it will be true.
+ /// If operator supports write with content disposition.
pub write_with_content_disposition: bool,
- /// If operator supports write with cache control , it will be true.
+ /// If operator supports write with cache control.
pub write_with_cache_control: bool,
- /// If operator supports create dir , it will be true.
+ /// If operator supports create dir.
pub create_dir: bool,
- /// If operator supports delete , it will be true.
+ /// If operator supports delete.
pub delete: bool,
- /// If operator supports copy , it will be true.
+ /// If operator supports copy.
pub copy: bool,
- /// If operator supports rename , it will be true.
+ /// If operator supports rename.
pub rename: bool,
- /// If operator supports list , it will be true.
+ /// If operator supports list.
pub list: bool,
- /// If backend supports list with limit, it will be true.
+ /// If backend supports list with limit.
pub list_with_limit: bool,
- /// If backend supports list with start after, it will be true.
+ /// If backend supports list with start after.
pub list_with_start_after: bool,
/// If backend support list with using slash as delimiter.
pub list_with_delimiter_slash: bool,
/// If backend supports list without delimiter.
pub list_without_delimiter: bool,
- /// If operator supports presign , it will be true.
+ /// If operator supports presign.
pub presign: bool,
- /// If operator supports presign read , it will be true.
+ /// If operator supports presign read.
pub presign_read: bool,
- /// If operator supports presign stat , it will be true.
+ /// If operator supports presign stat.
pub presign_stat: bool,
- /// If operator supports presign write , it will be true.
+ /// If operator supports presign write.
pub presign_write: bool,
- /// If operator supports batch , it will be true.
+ /// If operator supports batch.
pub batch: bool,
- /// If operator supports batch delete , it will be true.
+ /// If operator supports batch delete.
pub batch_delete: bool,
/// The max operations that operator supports in batch.
pub batch_max_operations: Option<usize>,
- /// If operator supports blocking , it will be true.
+ /// If operator supports blocking.
pub blocking: bool,
}
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 3468f8b6b..22a6208f1 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -550,7 +550,7 @@ impl BlockingOperator {
FunctionWrite(OperatorFunction::new(
self.inner().clone(),
path,
- (OpWrite::default().with_content_length(bs.len() as u64), bs),
+ (OpWrite::default(), bs),
|inner, path, (args, mut bs)| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index 771ffd677..8c8539bfd 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -717,7 +717,7 @@ impl Operator {
let fut = FutureWrite(OperatorFuture::new(
self.inner().clone(),
path,
- (OpWrite::default().with_content_length(bs.len() as u64), bs),
+ (OpWrite::default(), bs),
|inner, path, (args, mut bs)| {
let fut = async move {
if !validate_path(&path, EntryMode::FILE) {
diff --git a/core/src/types/operator/operator_functions.rs
b/core/src/types/operator/operator_functions.rs
index 49a0e7e85..367c5b93f 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -96,19 +96,8 @@ impl FunctionWrite {
///
/// Service could have their own minimum buffer size while perform write
operations like
/// multipart uploads. So the buffer size may be larger than the given
buffer size.
- pub fn buffer_size(mut self, v: usize) -> Self {
- self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
- self
- }
-
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn content_length(mut self, v: u64) -> Self {
- self.0 = self
- .0
- .map_args(|(args, bs)| (args.with_content_length(v), bs));
+ pub fn buffer(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
self
}
@@ -173,17 +162,8 @@ impl FunctionWriter {
///
/// Service could have their own minimum buffer size while perform write
operations like
/// multipart uploads. So the buffer size may be larger than the given
buffer size.
- pub fn buffer_size(mut self, v: usize) -> Self {
- self.0 = self.0.map_args(|args| args.with_buffer_size(v));
- self
- }
-
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn content_length(mut self, v: u64) -> Self {
- self.0 = self.0.map_args(|args| args.with_content_length(v));
+ pub fn buffer(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_buffer(v));
self
}
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index 7f2df677a..63c013f05 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -213,17 +213,6 @@ impl Future for FuturePresignRead {
pub struct FuturePresignWrite(pub(crate) OperatorFuture<(OpWrite, Duration),
PresignedRequest>);
impl FuturePresignWrite {
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn content_length(mut self, v: u64) -> Self {
- self.0 = self
- .0
- .map_args(|(args, dur)| (args.with_content_length(v), dur));
- self
- }
-
/// Set the content type of option
pub fn content_type(mut self, v: &str) -> Self {
self.0 = self
@@ -403,19 +392,8 @@ impl FutureWrite {
///
/// Service could have their own minimum buffer size while perform write
operations like
/// multipart uploads. So the buffer size may be larger than the given
buffer size.
- pub fn buffer_size(mut self, v: usize) -> Self {
- self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
- self
- }
-
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn content_length(mut self, v: u64) -> Self {
- self.0 = self
- .0
- .map_args(|(args, bs)| (args.with_content_length(v), bs));
+ pub fn buffer(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
self
}
@@ -462,7 +440,7 @@ impl FutureWriter {
///
/// If the append mode is set, the data will be appended to the end of the
file.
///
- /// # Notes
+ /// ## Notes
///
/// Service could return `Unsupported` if the underlying storage does not
support append.
pub fn append(mut self, v: bool) -> Self {
@@ -478,17 +456,8 @@ impl FutureWriter {
///
/// Service could have their own minimum buffer size while perform write
operations like
/// multipart uploads. So the buffer size may be larger than the given
buffer size.
- pub fn buffer_size(mut self, v: usize) -> Self {
- self.0 = self.0.map_args(|args| args.with_buffer_size(v));
- self
- }
-
- /// Set the content length of op.
- ///
- /// If the content length is not set, the content length will be
- /// calculated automatically by buffering part of data.
- pub fn content_length(mut self, v: u64) -> Self {
- self.0 = self.0.map_args(|args| args.with_content_length(v));
+ pub fn buffer(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_buffer(v));
self
}
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 56e87c5fb..b350f9625 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -38,20 +38,39 @@ use crate::*;
/// Please make sure either `close` or `abort` has been called before
/// dropping the writer otherwise the data could be lost.
///
-/// ## Notes
+/// ## Usage
+///
+/// ### Write Multiple Chunks
+///
+/// Some services support to write multiple chunks of data into given path.
Services that doesn't
+/// support write multiple chunks will return [`ErrorKind::Unsupported`] error
when calling `write`
+/// at the second time.
+///
+/// ```no_build
+/// let mut w = op.writer("path/to/file").await?;
+/// w.write(bs).await?;
+/// w.write(bs).await?;
+/// w.close().await?
+/// ```
+///
+/// Our writer also provides [`Writer::sink`] and [`Writer::copy`] support.
+///
+/// Besides, our writer implements [`AsyncWrite`] and
[`tokio::io::AsyncWrite`].
+///
+/// ### Write with append enabled
///
-/// Writer can be used in two ways:
+/// Writer also supports to write with append enabled. This is useful when
users want to append
+/// some data to the end of the file.
///
-/// - Sized: write data with a known size by specify the content length.
-/// - Unsized: write data with an unknown size, also known as streaming.
+/// - If file doesn't exist, it will be created and just like calling `write`.
+/// - If file exists, data will be appended to the end of the file.
///
-/// All services will support `sized` writer and provide special optimization
if
-/// the given data size is the same as the content length, allowing them to
-/// be written in one request.
+/// Possible Errors:
///
-/// Some services also supports `unsized` writer. They MAY buffer part of the
data
-/// and flush them into storage at needs. And finally, the file will be
available
-/// after `close` has been called.
+/// - Some services store normal file and appendable file in different way.
Trying to append
+/// on non-appendable file could return [`ErrorKind::ConditionNotMatch`]
error.
+/// - Services that doesn't support append will return
[`ErrorKind::Unsupported`] error when
+/// creating writer with `append` enabled.
pub struct Writer {
inner: oio::Writer,
}
@@ -105,7 +124,6 @@ impl Writer {
/// async fn sink_example(op: Operator) -> Result<()> {
/// let mut w = op
/// .writer_with("path/to/file")
- /// .content_length(2 * 4096)
/// .await?;
/// let stream = stream::iter(vec![vec![0; 4096], vec![1;
4096]]).map(Ok);
/// w.sink(stream).await?;
@@ -154,7 +172,7 @@ impl Writer {
///
/// #[tokio::main]
/// async fn copy_example(op: Operator) -> Result<()> {
- /// let mut w =
op.writer_with("path/to/file").content_length(4096).await?;
+ /// let mut w = op.writer_with("path/to/file").await?;
/// let reader = Cursor::new(vec![0; 4096]);
/// w.copy(reader).await?;
/// w.close().await?;
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 60125243a..f20cec326 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1111,7 +1111,7 @@ pub async fn test_delete_stream(op: Operator) ->
Result<()> {
/// Append data into writer
pub async fn test_writer_write(op: Operator) -> Result<()> {
- if !(op.info().full_capability().write_without_content_length) {
+ if !(op.info().full_capability().write_can_multi) {
return Ok(());
}
@@ -1148,7 +1148,7 @@ pub async fn test_writer_write(op: Operator) ->
Result<()> {
/// Streaming data into writer
pub async fn test_writer_sink(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
- if !(cap.write && cap.write_without_content_length) {
+ if !(cap.write && cap.write_can_multi) {
return Ok(());
}
@@ -1158,7 +1158,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()>
{
let content_b = gen_fixed_bytes(size);
let stream = stream::iter(vec![content_a.clone(),
content_b.clone()]).map(Ok);
- let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+ let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
w.sink(stream).await?;
w.close().await?;
@@ -1185,7 +1185,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()>
{
/// Reading data into writer
pub async fn test_writer_copy(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
- if !(cap.write && cap.write_without_content_length) {
+ if !(cap.write && cap.write_can_multi) {
return Ok(());
}
@@ -1194,7 +1194,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()>
{
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
- let mut w = op.writer_with(&path).buffer_size(5 * 1024 * 1024).await?;
+ let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
let mut content = Bytes::from([content_a.clone(),
content_b.clone()].concat());
while !content.is_empty() {
@@ -1226,7 +1226,7 @@ pub async fn test_writer_copy(op: Operator) -> Result<()>
{
/// Copy data from reader to writer
pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
- if !(op.info().full_capability().write_without_content_length) {
+ if !(op.info().full_capability().write_can_multi) {
return Ok(());
}
@@ -1234,7 +1234,7 @@ pub async fn test_writer_futures_copy(op: Operator) ->
Result<()> {
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
- let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+ let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
// Wrap a buf reader here to make sure content is read in 1MiB chunks.
let mut cursor = BufReader::with_capacity(1024 * 1024,
Cursor::new(content.clone()));
@@ -1258,7 +1258,7 @@ pub async fn test_writer_futures_copy(op: Operator) ->
Result<()> {
/// Add test for unsized writer
pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
- if !op.info().full_capability().write_without_content_length {
+ if !op.info().full_capability().write_can_multi {
warn!("{op:?} doesn't support write without content length, test
skip");
return Ok(());
}
@@ -1267,7 +1267,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) ->
Result<()> {
let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
- let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
+ let mut w = op.writer_with(&path).buffer(8 * 1024 * 1024).await?;
for _ in 0..100 {
match fuzzer.fuzz() {