This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 9800b6d3c511868265be6779f191148bab2fc8de Author: Xuanwo <[email protected]> AuthorDate: Wed Apr 19 16:44:08 2023 +0800 Remove append Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/concurrent_limit.rs | 8 ---- core/src/layers/error_context.rs | 16 -------- core/src/layers/logging.rs | 66 ------------------------------ core/src/layers/metrics.rs | 23 ----------- core/src/layers/minitrace.rs | 16 -------- core/src/layers/oteltrace.rs | 8 ---- core/src/layers/prometheus.rs | 33 --------------- core/src/layers/retry.rs | 35 ---------------- core/src/layers/tracing.rs | 16 -------- core/src/raw/adapters/kv/backend.rs | 44 ++++++++------------ core/src/raw/oio/cursor.rs | 6 +++ core/src/raw/oio/write.rs | 32 --------------- core/src/services/azblob/writer.rs | 9 ----- core/src/services/azdfs/writer.rs | 9 ----- core/src/services/fs/writer.rs | 22 ---------- core/src/services/ftp/writer.rs | 9 ----- core/src/services/gcs/writer.rs | 8 +--- core/src/services/ghac/writer.rs | 9 ----- core/src/services/hdfs/writer.rs | 25 ------------ core/src/services/ipmfs/writer.rs | 9 ----- core/src/services/obs/writer.rs | 9 ----- core/src/services/oss/backend.rs | 2 +- core/src/services/oss/writer.rs | 10 ++--- core/src/services/s3/backend.rs | 2 +- core/src/services/s3/writer.rs | 10 ++--- core/src/services/wasabi/backend.rs | 2 +- core/src/services/wasabi/writer.rs | 80 ++++++++++++++----------------------- core/src/services/webdav/writer.rs | 9 ----- core/src/services/webhdfs/writer.rs | 9 ----- core/src/types/writer.rs | 26 ++++-------- core/tests/behavior/write.rs | 6 +-- 31 files changed, 75 insertions(+), 493 deletions(-) diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 99e947b3..c1ccb76d 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -316,10 +316,6 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { self.inner.write(bs).await } - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await } @@ -334,10 +330,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { self.inner.write(bs) } - fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs) - } - fn close(&mut self) -> Result<()> { self.inner.close() } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 29724613..5417a040 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -411,14 +411,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await.map_err(|err| { - err.with_operation(WriteOperation::Append) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { err.with_operation(WriteOperation::Append) @@ -445,14 +437,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { }) } - fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).map_err(|err| { - err.with_operation(WriteOperation::BlockingAppend) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - } - fn close(&mut self) -> Result<()> { self.inner.close().map_err(|err| { err.with_operation(WriteOperation::BlockingClose) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 706287ad..abecc48f 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1386,39 +1386,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - match self.inner.append(bs).await { - Ok(_) => { - self.written += size as u64; - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={} -> data write {}B", - self.scheme, - WriteOperation::Append, - self.path, - self.written, - size - ); - Ok(()) - } - Err(err) => { - if let Some(lvl) = self.failure_level { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={} -> data write failed: {err:?}", - self.scheme, - WriteOperation::Append, - self.path, - self.written, - ) - } - Err(err) - } - } - } - async fn abort(&mut self) -> Result<()> { match self.inner.abort().await { Ok(_) => { @@ -1504,39 +1471,6 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { } } - fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - match self.inner.append(bs) { - Ok(_) => { - self.written += size as u64; - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} written={} -> data write {}B", - self.scheme, - WriteOperation::BlockingAppend, - self.path, - self.written, - size - ); - Ok(()) - } - Err(err) => { - if let Some(lvl) = self.failure_level { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} written={} -> data write failed: {err:?}", - self.scheme, - WriteOperation::BlockingAppend, - self.path, - self.written, - ) - } - Err(err) - } - } - } - fn close(&mut self) -> Result<()> { match self.inner.close() { Ok(_) => Ok(()), diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 2aff9003..6bdfe19f 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -925,18 +925,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { }) } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .append(bs) - .await - .map(|_| self.bytes += size as u64) - .map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); @@ -964,17 +952,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> { }) } - fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .append(bs) - .map(|_| self.bytes += size as u64) - .map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } - fn close(&mut self) -> Result<()> { self.inner.close().map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 3e91bac6..35c48d27 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -337,16 +337,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner - .append(bs) - .in_span(Span::enter_with_parent( - WriteOperation::Append.into_static(), - &self.span, - )) - .await - } - async fn abort(&mut self) -> Result<()> { self.inner .abort() @@ -375,12 +365,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { self.inner.write(bs) } - fn append(&mut self, bs: Bytes) -> Result<()> { - let _span = - Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(), &self.span); - self.inner.append(bs) - } - fn close(&mut self) -> Result<()> { let _span = Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), &self.span); diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index c607e975..f9f3a931 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -339,10 +339,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { self.inner.write(bs).await } - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await } @@ -357,10 +353,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { self.inner.write(bs) } - fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs) - } - fn close(&mut self) -> Result<()> { self.inner.close() } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 4689c383..6919abc4 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -720,23 +720,6 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .append(bs) - .await - .map(|_| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(size as f64) - }) - .map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) - } - async fn abort(&mut self) -> Result<()> { self.inner.abort().await.map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); @@ -769,22 +752,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { }) } - fn append(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - self.inner - .append(bs) - .map(|_| { - self.stats - .bytes_total - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(size as f64) - }) - .map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) - } - fn close(&mut self) -> Result<()> { self.inner.close().map_err(|err| { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 401bd53e..497c1ca8 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -651,27 +651,6 @@ impl<R: oio::Write> oio::Write for RetryWrapper<R> { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let mut backoff = self.builder.build(); - - loop { - match self.inner.append(bs.clone()).await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), - Some(dur) => { - warn!(target: "opendal::service", - "operation={} path={} -> pager retry after {}s: error={:?}", - WriteOperation::Append, self.path, dur.as_secs_f64(), e); - tokio::time::sleep(dur).await; - continue; - } - }, - } - } - } - async fn abort(&mut self) -> Result<()> { let mut backoff = self.builder.build(); @@ -730,20 +709,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for RetryWrapper<R> { .map_err(|e| e.set_persistent()) } - fn append(&mut self, bs: Bytes) -> Result<()> { - { || self.inner.append(bs.clone()) } - .retry(&self.builder) - .when(|e| e.is_temporary()) - .notify(move |err, dur| { - warn!( - target: "opendal::service", - "operation={} -> pager retry after {}s: error={:?}", - WriteOperation::BlockingAppend, dur.as_secs_f64(), err) - }) - .call() - .map_err(|e| e.set_persistent()) - } - fn close(&mut self) -> Result<()> { { || self.inner.close() } .retry(&self.builder) diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 8f9b8a5d..2782d049 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -344,14 +344,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { self.inner.write(bs).await } - #[tracing::instrument( - parent = &self.span, - level = "trace", - skip_all)] - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs).await - } - #[tracing::instrument( parent = &self.span, level = "trace", @@ -378,14 +370,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { self.inner.write(bs) } - #[tracing::instrument( - parent = &self.span, - level = "trace", - skip_all)] - fn append(&mut self, bs: Bytes) -> Result<()> { - self.inner.append(bs) - } - #[tracing::instrument( parent = &self.span, level = "trace", diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 15dc7beb..dae293e0 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -107,13 +107,27 @@ impl<S: Adapter> Accessor for Backend<S> { Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs))) } - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + let p = build_abs_path(&self.root, path); Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p))) } - fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + let p = build_abs_path(&self.root, path); Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p))) @@ -264,35 +278,17 @@ impl<S> KvWriter<S> { buf: None, } } - - fn extend_buf(&mut self, bs: Bytes) { - if let Some(buf) = self.buf.as_mut() { - buf.extend(bs); - } else { - self.buf = Some(bs.into()) - } - } } #[async_trait] impl<S: Adapter> oio::Write for KvWriter<S> { + // TODO: we need to support append in the future. async fn write(&mut self, bs: Bytes) -> Result<()> { self.buf = Some(bs.into()); Ok(()) } - async fn append(&mut self, bs: Bytes) -> Result<()> { - if let Err(e) = self.kv.append(&self.path, bs.to_vec().as_slice()).await { - if e.kind() == ErrorKind::Unsupported { - self.extend_buf(bs); - } else { - return Err(e); - } - } - Ok(()) - } - async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -316,12 +312,6 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> { Ok(()) } - fn append(&mut self, bs: Bytes) -> Result<()> { - self.extend_buf(bs); - - Ok(()) - } - fn close(&mut self) -> Result<()> { if let Some(buf) = self.buf.as_deref() { self.kv.blocking_set(&self.path, buf)?; diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index f8db0539..0889e45e 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -148,6 +148,12 @@ pub struct VectorCursor { size: usize, } +impl Default for VectorCursor { + fn default() -> Self { + Self::new() + } +} + impl VectorCursor { /// Create a new vector cursor. pub fn new() -> Self { diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index c74e0d90..a2437312 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -99,9 +99,6 @@ pub trait Write: Unpin + Send + Sync { /// Please make sure `write` is safe to re-enter. async fn write(&mut self, bs: Bytes) -> Result<()>; - /// Append bytes to the writer. - async fn append(&mut self, bs: Bytes) -> Result<()>; - /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -117,15 +114,6 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -149,10 +137,6 @@ impl<T: Write + ?Sized> Write for Box<T> { (**self).write(bs).await } - async fn append(&mut self, bs: Bytes) -> Result<()> { - (**self).append(bs).await - } - async fn abort(&mut self) -> Result<()> { (**self).abort().await } @@ -170,9 +154,6 @@ pub trait BlockingWrite: Send + Sync + 'static { /// Write whole content at once. fn write(&mut self, bs: Bytes) -> Result<()>; - /// Append content at tailing. - fn append(&mut self, bs: Bytes) -> Result<()>; - /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } @@ -184,15 +165,6 @@ impl BlockingWrite for () { unimplemented!("write is required to be implemented for oio::BlockingWrite") } - fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - fn close(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, @@ -208,10 +180,6 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { (**self).write(bs) } - fn append(&mut self, bs: Bytes) -> Result<()> { - (**self).append(bs) - } - fn close(&mut self) -> Result<()> { (**self).close() } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index d74af141..9e6bdb40 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -65,15 +65,6 @@ impl oio::Write for AzblobWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index b9f6faf2..a6efaf08 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -87,15 +87,6 @@ impl oio::Write for AzdfsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 33dcb2d5..03d6d824 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -54,17 +54,6 @@ impl oio::Write for FsWriter<tokio::fs::File> { /// File could be partial written, so we will seek to start to make sure /// we write the same content. async fn write(&mut self, bs: Bytes) -> Result<()> { - self.f.rewind().await.map_err(parse_io_error)?; - self.f.write_all(&bs).await.map_err(parse_io_error)?; - - Ok(()) - } - - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - async fn append(&mut self, bs: Bytes) -> Result<()> { self.f .seek(SeekFrom::Start(self.pos)) .await @@ -101,17 +90,6 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> { /// File could be partial written, so we will seek to start to make sure /// we write the same content. fn write(&mut self, bs: Bytes) -> Result<()> { - self.f.rewind().map_err(parse_io_error)?; - self.f.write_all(&bs).map_err(parse_io_error)?; - - Ok(()) - } - - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - fn append(&mut self, bs: Bytes) -> Result<()> { self.f .seek(SeekFrom::Start(self.pos)) .map_err(parse_io_error)?; diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index dec8c473..653dc003 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,15 +53,6 @@ impl oio::Write for FtpWriter { Ok(()) } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 19fbd6d5..609b34ef 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -138,14 +138,14 @@ impl oio::Write for GcsWriter { return self.write_oneshot(bs).await; } else { let location = self.initiate_upload().await?; - self.location = Some(location.clone()); + self.location = Some(location); self.location.as_deref().unwrap() } } }; // Ignore empty bytes - if bs.len() == 0 { + if bs.is_empty() { return Ok(()); } @@ -172,10 +172,6 @@ impl oio::Write for GcsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - self.write(bs).await - } - // TODO: we can cancel the upload by sending a DELETE request to the location async fn abort(&mut self) -> Result<()> { Ok(()) diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index b9e4eefd..8ff12c8b 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,15 +62,6 @@ impl oio::Write for GhacWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 6062d305..d045c063 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -46,20 +46,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { /// File could be partial written, so we will seek to start to make sure /// we write the same content. async fn write(&mut self, bs: Bytes) -> Result<()> { - self.f - .seek(SeekFrom::Start(0)) - .await - .map_err(parse_io_error)?; - self.f.write_all(&bs).await.map_err(parse_io_error)?; - - Ok(()) - } - - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - async fn append(&mut self, bs: Bytes) -> Result<()> { self.f .seek(SeekFrom::Start(self.pos)) .await @@ -90,17 +76,6 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> { /// File could be partial written, so we will seek to start to make sure /// we write the same content. fn write(&mut self, bs: Bytes) -> Result<()> { - self.f.rewind().map_err(parse_io_error)?; - self.f.write_all(&bs).map_err(parse_io_error)?; - - Ok(()) - } - - /// # Notes - /// - /// File could be partial written, so we will seek to start to make sure - /// we write the same content. - fn append(&mut self, bs: Bytes) -> Result<()> { self.f .seek(SeekFrom::Start(self.pos)) .map_err(parse_io_error)?; diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 4394c751..4f240000 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -55,15 +55,6 @@ impl oio::Write for IpmfsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 21c685c7..d71c7893 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -65,15 +65,6 @@ impl oio::Write for ObsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 4e298fb6..c314586c 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -409,7 +409,7 @@ impl Accessor for OssBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), - OssWriter::new(self.core.clone(), &path, args), + OssWriter::new(self.core.clone(), path, args), )) } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 095ba6e4..d4cbe721 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -94,7 +94,7 @@ impl OssWriter { quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; Ok(result.upload_id) } - _ => return Err(parse_error(resp).await?), + _ => Err(parse_error(resp).await?), } } @@ -144,14 +144,14 @@ impl oio::Write for OssWriter { return self.write_oneshot(bs).await; } else { let upload_id = self.initiate_upload().await?; - self.upload_id = Some(upload_id.clone()); + self.upload_id = Some(upload_id); self.upload_id.as_deref().unwrap() } } }; // Ignore empty bytes - if bs.len() == 0 { + if bs.is_empty() { return Ok(()); } @@ -178,10 +178,6 @@ impl oio::Write for OssWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - todo!() - } - // TODO: we can cancel the upload by sending an abort request. async fn abort(&mut self) -> Result<()> { Err(Error::new( diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 7eb7b1f9..66940674 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -961,7 +961,7 @@ impl Accessor for S3Backend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), - S3Writer::new(self.core.clone(), &path, args), + S3Writer::new(self.core.clone(), path, args), )) } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 5887cafc..39248cde 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -106,7 +106,7 @@ impl S3Writer { Ok(result.upload_id) } - _ => return Err(parse_error(resp).await?), + _ => Err(parse_error(resp).await?), } } @@ -162,14 +162,14 @@ impl oio::Write for S3Writer { return self.write_oneshot(bs).await; } else { let upload_id = self.initiate_upload().await?; - self.upload_id = Some(upload_id.clone()); + self.upload_id = Some(upload_id); self.upload_id.as_deref().unwrap() } } }; // Ignore empty bytes - if bs.len() == 0 { + if bs.is_empty() { return Ok(()); } @@ -196,10 +196,6 @@ impl oio::Write for S3Writer { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - todo!() - } - async fn abort(&mut self) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index 49ed6bc3..0cdbe3a5 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -951,7 +951,7 @@ impl Accessor for WasabiBackend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), - WasabiWriter::new(self.core.clone(), args, path.to_string(), None), + WasabiWriter::new(self.core.clone(), args, path.to_string()), )) } diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index 90d193df..f9b998b4 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -32,68 +32,50 @@ pub struct WasabiWriter { op: OpWrite, path: String, - - upload_id: Option<String>, } impl WasabiWriter { - pub fn new( - core: Arc<WasabiCore>, - op: OpWrite, - path: String, - upload_id: Option<String>, - ) -> Self { - WasabiWriter { - core, - - op, - path, - upload_id, - } + pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self { + WasabiWriter { core, op, path } } } #[async_trait] impl oio::Write for WasabiWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - debug_assert!( - self.upload_id.is_none(), - "Writer initiated with upload id, but users trying to call write, must be buggy" - ); - - let resp = self - .core - .put_object( - &self.path, - Some(bs.len()), - self.op.content_type(), - self.op.content_disposition(), - self.op.cache_control(), - AsyncBody::Bytes(bs), - ) - .await?; + if self.op.content_length().unwrap_or_default() == bs.len() as u64 { + let resp = self + .core + .put_object( + &self.path, + Some(bs.len()), + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + AsyncBody::Bytes(bs), + ) + .await?; - match resp.status() { - StatusCode::CREATED | StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) + match resp.status() { + StatusCode::CREATED | StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), } - _ => Err(parse_error(resp).await?), - } - } - - async fn append(&mut self, bs: Bytes) -> Result<()> { - let resp = self - .core - .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs)) - .await?; + } else { + let resp = self + .core + .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs)) + .await?; - match resp.status() { - StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => { - resp.into_body().consume().await?; - Ok(()) + match resp.status() { + StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), } - _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 0fa180c9..89ddd7b8 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -63,15 +63,6 @@ impl oio::Write for WebdavWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index f3ab7633..16448141 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -63,15 +63,6 @@ impl oio::Write for WebhdfsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let _ = bs; - - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support append", - )) - } - async fn abort(&mut self) -> Result<()> { Ok(()) } diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 3705f4d6..68fe446f 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -60,17 +60,13 @@ impl Writer { }) } - /// Append data into writer. - /// - /// It is highly recommended to align the length of the input bytes - /// into blocks of 4MiB (except the last block) for better performance - /// and compatibility. - pub async fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> { + /// Write into inner writer. + pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { - w.append(bs.into()).await + w.write(bs.into()).await } else { unreachable!( - "writer state invalid while append, expect Idle, actual {}", + "writer state invalid while abort, expect Idle, actual {}", self.state ); } @@ -132,7 +128,7 @@ impl AsyncWrite for Writer { let bs = Bytes::from(buf.to_vec()); let size = bs.len(); let fut = async move { - w.append(bs).await?; + w.write(bs).await?; Ok((size, w)) }; self.state = State::Write(Box::pin(fut)); @@ -204,15 +200,6 @@ impl BlockingWriter { Ok(BlockingWriter { inner: w }) } - /// Append data into writer. - /// - /// It is highly recommended to align the length of the input bytes - /// into blocks of 4MiB (except the last block) for better performance - /// and compatibility. - pub fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> { - self.inner.append(bs.into()) - } - /// Close the writer and make sure all data have been stored. pub fn close(&mut self) -> Result<()> { self.inner.close() @@ -222,7 +209,8 @@ impl BlockingWriter { impl io::Write for BlockingWriter { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { let size = buf.len(); - self.append(Bytes::from(buf.to_vec())) + self.inner + .write(Bytes::from(buf.to_vec())) .map(|_| size) .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 91c9ec08..c0d24c4a 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -592,7 +592,7 @@ pub async fn test_abort_writer(op: Operator) -> Result<()> { } }; - if let Err(e) = writer.append(content).await { + if let Err(e) = writer.write(content).await { assert_eq!(e.kind(), ErrorKind::Unsupported); return Ok(()); } @@ -700,8 +700,8 @@ pub async fn test_append(op: Operator) -> Result<()> { } Err(err) => return Err(err.into()), }; - w.append(content_a.clone()).await?; - w.append(content_b.clone()).await?; + w.write(content_a.clone()).await?; + w.write(content_b.clone()).await?; w.close().await?; let meta = op.stat(&path).await.expect("stat must succeed");
