This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 9800b6d3c511868265be6779f191148bab2fc8de
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 19 16:44:08 2023 +0800

    Remove append
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/concurrent_limit.rs |  8 ----
 core/src/layers/error_context.rs    | 16 --------
 core/src/layers/logging.rs          | 66 ------------------------------
 core/src/layers/metrics.rs          | 23 -----------
 core/src/layers/minitrace.rs        | 16 --------
 core/src/layers/oteltrace.rs        |  8 ----
 core/src/layers/prometheus.rs       | 33 ---------------
 core/src/layers/retry.rs            | 35 ----------------
 core/src/layers/tracing.rs          | 16 --------
 core/src/raw/adapters/kv/backend.rs | 44 ++++++++------------
 core/src/raw/oio/cursor.rs          |  6 +++
 core/src/raw/oio/write.rs           | 32 ---------------
 core/src/services/azblob/writer.rs  |  9 -----
 core/src/services/azdfs/writer.rs   |  9 -----
 core/src/services/fs/writer.rs      | 22 ----------
 core/src/services/ftp/writer.rs     |  9 -----
 core/src/services/gcs/writer.rs     |  8 +---
 core/src/services/ghac/writer.rs    |  9 -----
 core/src/services/hdfs/writer.rs    | 25 ------------
 core/src/services/ipmfs/writer.rs   |  9 -----
 core/src/services/obs/writer.rs     |  9 -----
 core/src/services/oss/backend.rs    |  2 +-
 core/src/services/oss/writer.rs     | 10 ++---
 core/src/services/s3/backend.rs     |  2 +-
 core/src/services/s3/writer.rs      | 10 ++---
 core/src/services/wasabi/backend.rs |  2 +-
 core/src/services/wasabi/writer.rs  | 80 ++++++++++++++-----------------------
 core/src/services/webdav/writer.rs  |  9 -----
 core/src/services/webhdfs/writer.rs |  9 -----
 core/src/types/writer.rs            | 26 ++++--------
 core/tests/behavior/write.rs        |  6 +--
 31 files changed, 75 insertions(+), 493 deletions(-)

diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 99e947b3..c1ccb76d 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -316,10 +316,6 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
@@ -334,10 +330,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
ConcurrentLimitWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close()
     }
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 29724613..5417a040 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -411,14 +411,6 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
         })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await.map_err(|err| {
-            err.with_operation(WriteOperation::Append)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             err.with_operation(WriteOperation::Append)
@@ -445,14 +437,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for 
ErrorContextWrapper<T> {
         })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).map_err(|err| {
-            err.with_operation(WriteOperation::BlockingAppend)
-                .with_context("service", self.scheme)
-                .with_context("path", &self.path)
-        })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             err.with_operation(WriteOperation::BlockingClose)
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 706287ad..abecc48f 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1386,39 +1386,6 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        match self.inner.append(bs).await {
-            Ok(_) => {
-                self.written += size as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data write 
{}B",
-                    self.scheme,
-                    WriteOperation::Append,
-                    self.path,
-                    self.written,
-                    size
-                );
-                Ok(())
-            }
-            Err(err) => {
-                if let Some(lvl) = self.failure_level {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
write failed: {err:?}",
-                        self.scheme,
-                        WriteOperation::Append,
-                        self.path,
-                        self.written,
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         match self.inner.abort().await {
             Ok(_) => {
@@ -1504,39 +1471,6 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
         }
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        match self.inner.append(bs) {
-            Ok(_) => {
-                self.written += size as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={} -> data write 
{}B",
-                    self.scheme,
-                    WriteOperation::BlockingAppend,
-                    self.path,
-                    self.written,
-                    size
-                );
-                Ok(())
-            }
-            Err(err) => {
-                if let Some(lvl) = self.failure_level {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={} -> data 
write failed: {err:?}",
-                        self.scheme,
-                        WriteOperation::BlockingAppend,
-                        self.path,
-                        self.written,
-                    )
-                }
-                Err(err)
-            }
-        }
-    }
-
     fn close(&mut self) -> Result<()> {
         match self.inner.close() {
             Ok(_) => Ok(()),
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 2aff9003..6bdfe19f 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -925,18 +925,6 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
             })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .await
-            .map(|_| self.bytes += size as u64)
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
@@ -964,17 +952,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
MetricWrapper<R> {
             })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .map(|_| self.bytes += size as u64)
-            .map_err(|err| {
-                self.handle.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             self.handle.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 3e91bac6..35c48d27 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -337,16 +337,6 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
             .await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner
-            .append(bs)
-            .in_span(Span::enter_with_parent(
-                WriteOperation::Append.into_static(),
-                &self.span,
-            ))
-            .await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner
             .abort()
@@ -375,12 +365,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
MinitraceWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _span =
-            
Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(), 
&self.span);
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         let _span =
             
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), 
&self.span);
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index c607e975..f9f3a931 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -339,10 +339,6 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
         self.inner.write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await
     }
@@ -357,10 +353,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
OtelTraceWrapper<R> {
         self.inner.write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close()
     }
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 4689c383..6919abc4 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -720,23 +720,6 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .await
-            .map(|_| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
-                    .observe(size as f64)
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     async fn abort(&mut self) -> Result<()> {
         self.inner.abort().await.map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
@@ -769,22 +752,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
PrometheusMetricWrapper<R> {
             })
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-        self.inner
-            .append(bs)
-            .map(|_| {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
-                    .observe(size as f64)
-            })
-            .map_err(|err| {
-                self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
-    }
-
     fn close(&mut self) -> Result<()> {
         self.inner.close().map_err(|err| {
             self.stats.increment_errors_total(self.op, err.kind());
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 401bd53e..497c1ca8 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -651,27 +651,6 @@ impl<R: oio::Write> oio::Write for RetryWrapper<R> {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let mut backoff = self.builder.build();
-
-        loop {
-            match self.inner.append(bs.clone()).await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
-                    Some(dur) => {
-                        warn!(target: "opendal::service",
-                              "operation={} path={} -> pager retry after {}s: 
error={:?}",
-                              WriteOperation::Append, self.path, 
dur.as_secs_f64(), e);
-                        tokio::time::sleep(dur).await;
-                        continue;
-                    }
-                },
-            }
-        }
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let mut backoff = self.builder.build();
 
@@ -730,20 +709,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
RetryWrapper<R> {
             .map_err(|e| e.set_persistent())
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        { || self.inner.append(bs.clone()) }
-            .retry(&self.builder)
-            .when(|e| e.is_temporary())
-            .notify(move |err, dur| {
-                warn!(
-                target: "opendal::service",
-                "operation={} -> pager retry after {}s: error={:?}",
-                WriteOperation::BlockingAppend, dur.as_secs_f64(), err)
-            })
-            .call()
-            .map_err(|e| e.set_persistent())
-    }
-
     fn close(&mut self) -> Result<()> {
         { || self.inner.close() }
             .retry(&self.builder)
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 8f9b8a5d..2782d049 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -344,14 +344,6 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> {
         self.inner.write(bs).await
     }
 
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs).await
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
@@ -378,14 +370,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
         self.inner.write(bs)
     }
 
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.inner.append(bs)
-    }
-
     #[tracing::instrument(
         parent = &self.span,
         level = "trace",
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 15dc7beb..dae293e0 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -107,13 +107,27 @@ impl<S: Adapter> Accessor for Backend<S> {
         Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
     }
 
-    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
     }
 
-    fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
         let p = build_abs_path(&self.root, path);
 
         Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p)))
@@ -264,35 +278,17 @@ impl<S> KvWriter<S> {
             buf: None,
         }
     }
-
-    fn extend_buf(&mut self, bs: Bytes) {
-        if let Some(buf) = self.buf.as_mut() {
-            buf.extend(bs);
-        } else {
-            self.buf = Some(bs.into())
-        }
-    }
 }
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
+    // TODO: we need to support append in the future.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         self.buf = Some(bs.into());
 
         Ok(())
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        if let Err(e) = self.kv.append(&self.path, 
bs.to_vec().as_slice()).await {
-            if e.kind() == ErrorKind::Unsupported {
-                self.extend_buf(bs);
-            } else {
-                return Err(e);
-            }
-        }
-        Ok(())
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -316,12 +312,6 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
         Ok(())
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.extend_buf(bs);
-
-        Ok(())
-    }
-
     fn close(&mut self) -> Result<()> {
         if let Some(buf) = self.buf.as_deref() {
             self.kv.blocking_set(&self.path, buf)?;
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index f8db0539..0889e45e 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -148,6 +148,12 @@ pub struct VectorCursor {
     size: usize,
 }
 
+impl Default for VectorCursor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl VectorCursor {
     /// Create a new vector cursor.
     pub fn new() -> Self {
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index c74e0d90..a2437312 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -99,9 +99,6 @@ pub trait Write: Unpin + Send + Sync {
     /// Please make sure `write` is safe to re-enter.
     async fn write(&mut self, bs: Bytes) -> Result<()>;
 
-    /// Append bytes to the writer.
-    async fn append(&mut self, bs: Bytes) -> Result<()>;
-
     /// Abort the pending writer.
     async fn abort(&mut self) -> Result<()>;
 
@@ -117,15 +114,6 @@ impl Write for () {
         unimplemented!("write is required to be implemented for oio::Write")
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -149,10 +137,6 @@ impl<T: Write + ?Sized> Write for Box<T> {
         (**self).write(bs).await
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        (**self).append(bs).await
-    }
-
     async fn abort(&mut self) -> Result<()> {
         (**self).abort().await
     }
@@ -170,9 +154,6 @@ pub trait BlockingWrite: Send + Sync + 'static {
     /// Write whole content at once.
     fn write(&mut self, bs: Bytes) -> Result<()>;
 
-    /// Append content at tailing.
-    fn append(&mut self, bs: Bytes) -> Result<()>;
-
     /// Close the writer and make sure all data has been flushed.
     fn close(&mut self) -> Result<()>;
 }
@@ -184,15 +165,6 @@ impl BlockingWrite for () {
         unimplemented!("write is required to be implemented for 
oio::BlockingWrite")
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     fn close(&mut self) -> Result<()> {
         Err(Error::new(
             ErrorKind::Unsupported,
@@ -208,10 +180,6 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
         (**self).write(bs)
     }
 
-    fn append(&mut self, bs: Bytes) -> Result<()> {
-        (**self).append(bs)
-    }
-
     fn close(&mut self) -> Result<()> {
         (**self).close()
     }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index d74af141..9e6bdb40 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -65,15 +65,6 @@ impl oio::Write for AzblobWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index b9f6faf2..a6efaf08 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -87,15 +87,6 @@ impl oio::Write for AzdfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 33dcb2d5..03d6d824 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -54,17 +54,6 @@ impl oio::Write for FsWriter<tokio::fs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().await.map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .await
@@ -101,17 +90,6 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .map_err(parse_io_error)?;
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index dec8c473..653dc003 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -53,15 +53,6 @@ impl oio::Write for FtpWriter {
         Ok(())
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 19fbd6d5..609b34ef 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -138,14 +138,14 @@ impl oio::Write for GcsWriter {
                     return self.write_oneshot(bs).await;
                 } else {
                     let location = self.initiate_upload().await?;
-                    self.location = Some(location.clone());
+                    self.location = Some(location);
                     self.location.as_deref().unwrap()
                 }
             }
         };
 
         // Ignore empty bytes
-        if bs.len() == 0 {
+        if bs.is_empty() {
             return Ok(());
         }
 
@@ -172,10 +172,6 @@ impl oio::Write for GcsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.write(bs).await
-    }
-
     // TODO: we can cancel the upload by sending a DELETE request to the 
location
     async fn abort(&mut self) -> Result<()> {
         Ok(())
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index b9e4eefd..8ff12c8b 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -62,15 +62,6 @@ impl oio::Write for GhacWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6062d305..d045c063 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -46,20 +46,6 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f
-            .seek(SeekFrom::Start(0))
-            .await
-            .map_err(parse_io_error)?;
-        self.f.write_all(&bs).await.map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .await
@@ -90,17 +76,6 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
     /// File could be partial written, so we will seek to start to make sure
     /// we write the same content.
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.f.rewind().map_err(parse_io_error)?;
-        self.f.write_all(&bs).map_err(parse_io_error)?;
-
-        Ok(())
-    }
-
-    /// # Notes
-    ///
-    /// File could be partial written, so we will seek to start to make sure
-    /// we write the same content.
-    fn append(&mut self, bs: Bytes) -> Result<()> {
         self.f
             .seek(SeekFrom::Start(self.pos))
             .map_err(parse_io_error)?;
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 4394c751..4f240000 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -55,15 +55,6 @@ impl oio::Write for IpmfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 21c685c7..d71c7893 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -65,15 +65,6 @@ impl oio::Write for ObsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 4e298fb6..c314586c 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -409,7 +409,7 @@ impl Accessor for OssBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
-            OssWriter::new(self.core.clone(), &path, args),
+            OssWriter::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 095ba6e4..d4cbe721 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -94,7 +94,7 @@ impl OssWriter {
                     
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
                 Ok(result.upload_id)
             }
-            _ => return Err(parse_error(resp).await?),
+            _ => Err(parse_error(resp).await?),
         }
     }
 
@@ -144,14 +144,14 @@ impl oio::Write for OssWriter {
                     return self.write_oneshot(bs).await;
                 } else {
                     let upload_id = self.initiate_upload().await?;
-                    self.upload_id = Some(upload_id.clone());
+                    self.upload_id = Some(upload_id);
                     self.upload_id.as_deref().unwrap()
                 }
             }
         };
 
         // Ignore empty bytes
-        if bs.len() == 0 {
+        if bs.is_empty() {
             return Ok(());
         }
 
@@ -178,10 +178,6 @@ impl oio::Write for OssWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        todo!()
-    }
-
     // TODO: we can cancel the upload by sending an abort request.
     async fn abort(&mut self) -> Result<()> {
         Err(Error::new(
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 7eb7b1f9..66940674 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -961,7 +961,7 @@ impl Accessor for S3Backend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
-            S3Writer::new(self.core.clone(), &path, args),
+            S3Writer::new(self.core.clone(), path, args),
         ))
     }
 
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 5887cafc..39248cde 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -106,7 +106,7 @@ impl S3Writer {
 
                 Ok(result.upload_id)
             }
-            _ => return Err(parse_error(resp).await?),
+            _ => Err(parse_error(resp).await?),
         }
     }
 
@@ -162,14 +162,14 @@ impl oio::Write for S3Writer {
                     return self.write_oneshot(bs).await;
                 } else {
                     let upload_id = self.initiate_upload().await?;
-                    self.upload_id = Some(upload_id.clone());
+                    self.upload_id = Some(upload_id);
                     self.upload_id.as_deref().unwrap()
                 }
             }
         };
 
         // Ignore empty bytes
-        if bs.len() == 0 {
+        if bs.is_empty() {
             return Ok(());
         }
 
@@ -196,10 +196,6 @@ impl oio::Write for S3Writer {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        todo!()
-    }
-
     async fn abort(&mut self) -> Result<()> {
         let upload_id = if let Some(upload_id) = &self.upload_id {
             upload_id
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 49ed6bc3..0cdbe3a5 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -951,7 +951,7 @@ impl Accessor for WasabiBackend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
-            WasabiWriter::new(self.core.clone(), args, path.to_string(), None),
+            WasabiWriter::new(self.core.clone(), args, path.to_string()),
         ))
     }
 
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index 90d193df..f9b998b4 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -32,68 +32,50 @@ pub struct WasabiWriter {
 
     op: OpWrite,
     path: String,
-
-    upload_id: Option<String>,
 }
 
 impl WasabiWriter {
-    pub fn new(
-        core: Arc<WasabiCore>,
-        op: OpWrite,
-        path: String,
-        upload_id: Option<String>,
-    ) -> Self {
-        WasabiWriter {
-            core,
-
-            op,
-            path,
-            upload_id,
-        }
+    pub fn new(core: Arc<WasabiCore>, op: OpWrite, path: String) -> Self {
+        WasabiWriter { core, op, path }
     }
 }
 
 #[async_trait]
 impl oio::Write for WasabiWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        debug_assert!(
-            self.upload_id.is_none(),
-            "Writer initiated with upload id, but users trying to call write, 
must be buggy"
-        );
-
-        let resp = self
-            .core
-            .put_object(
-                &self.path,
-                Some(bs.len()),
-                self.op.content_type(),
-                self.op.content_disposition(),
-                self.op.cache_control(),
-                AsyncBody::Bytes(bs),
-            )
-            .await?;
+        if self.op.content_length().unwrap_or_default() == bs.len() as u64 {
+            let resp = self
+                .core
+                .put_object(
+                    &self.path,
+                    Some(bs.len()),
+                    self.op.content_type(),
+                    self.op.content_disposition(),
+                    self.op.cache_control(),
+                    AsyncBody::Bytes(bs),
+                )
+                .await?;
 
-        match resp.status() {
-            StatusCode::CREATED | StatusCode::OK => {
-                resp.into_body().consume().await?;
-                Ok(())
+            match resp.status() {
+                StatusCode::CREATED | StatusCode::OK => {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
             }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let resp = self
-            .core
-            .append_object(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))
-            .await?;
+        } else {
+            let resp = self
+                .core
+                .append_object(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))
+                .await?;
 
-        match resp.status() {
-            StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => {
-                resp.into_body().consume().await?;
-                Ok(())
+            match resp.status() {
+                StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT 
=> {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
             }
-            _ => Err(parse_error(resp).await?),
         }
     }
 
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 0fa180c9..89ddd7b8 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebdavWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index f3ab7633..16448141 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -63,15 +63,6 @@ impl oio::Write for WebhdfsWriter {
         }
     }
 
-    async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
-
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
-    }
-
     async fn abort(&mut self) -> Result<()> {
         Ok(())
     }
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 3705f4d6..68fe446f 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -60,17 +60,13 @@ impl Writer {
         })
     }
 
-    /// Append data into writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
-    pub async fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
+    /// Write into inner writer.
+    pub async fn write(&mut self, bs: impl Into<Bytes>) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
-            w.append(bs.into()).await
+            w.write(bs.into()).await
         } else {
             unreachable!(
-                "writer state invalid while append, expect Idle, actual {}",
+                "writer state invalid while abort, expect Idle, actual {}",
                 self.state
             );
         }
@@ -132,7 +128,7 @@ impl AsyncWrite for Writer {
                     let bs = Bytes::from(buf.to_vec());
                     let size = bs.len();
                     let fut = async move {
-                        w.append(bs).await?;
+                        w.write(bs).await?;
                         Ok((size, w))
                     };
                     self.state = State::Write(Box::pin(fut));
@@ -204,15 +200,6 @@ impl BlockingWriter {
         Ok(BlockingWriter { inner: w })
     }
 
-    /// Append data into writer.
-    ///
-    /// It is highly recommended to align the length of the input bytes
-    /// into blocks of 4MiB (except the last block) for better performance
-    /// and compatibility.
-    pub fn append(&mut self, bs: impl Into<Bytes>) -> Result<()> {
-        self.inner.append(bs.into())
-    }
-
     /// Close the writer and make sure all data have been stored.
     pub fn close(&mut self) -> Result<()> {
         self.inner.close()
@@ -222,7 +209,8 @@ impl BlockingWriter {
 impl io::Write for BlockingWriter {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
         let size = buf.len();
-        self.append(Bytes::from(buf.to_vec()))
+        self.inner
+            .write(Bytes::from(buf.to_vec()))
             .map(|_| size)
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
     }
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 91c9ec08..c0d24c4a 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -592,7 +592,7 @@ pub async fn test_abort_writer(op: Operator) -> Result<()> {
         }
     };
 
-    if let Err(e) = writer.append(content).await {
+    if let Err(e) = writer.write(content).await {
         assert_eq!(e.kind(), ErrorKind::Unsupported);
         return Ok(());
     }
@@ -700,8 +700,8 @@ pub async fn test_append(op: Operator) -> Result<()> {
         }
         Err(err) => return Err(err.into()),
     };
-    w.append(content_a.clone()).await?;
-    w.append(content_b.clone()).await?;
+    w.write(content_a.clone()).await?;
+    w.write(content_b.clone()).await?;
     w.close().await?;
 
     let meta = op.stat(&path).await.expect("stat must succeed");


Reply via email to