This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch append
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit aed4bb8193f921543e4577ade33ce13a3b03761f
Author: suyanhanx <[email protected]>
AuthorDate: Sat May 13 17:22:43 2023 +0800

    feat(core): append
    
    Signed-off-by: suyanhanx <[email protected]>
---
 core/src/layers/complete.rs                   |  55 ++++++++++
 core/src/layers/concurrent_limit.rs           |  26 +++++
 core/src/layers/error_context.rs              |  42 ++++++++
 core/src/layers/immutable_index.rs            |   5 +
 core/src/layers/logging.rs                    | 138 ++++++++++++++++++++++++++
 core/src/layers/retry.rs                      |  65 ++++++++++++
 core/src/layers/type_eraser.rs                |   8 ++
 core/src/raw/accessor.rs                      |  26 +++++
 core/src/raw/adapters/kv/backend.rs           |   1 +
 core/src/raw/adapters/typed_kv/backend.rs     |   1 +
 core/src/raw/layer.rs                         |   9 ++
 core/src/raw/oio/mod.rs                       |   3 +
 core/src/raw/oio/write.rs                     |  91 ++++++++++++++++-
 core/src/raw/operation.rs                     |   3 +
 core/src/raw/rps.rs                           |  11 ++
 core/src/services/azblob/backend.rs           |   1 +
 core/src/services/azdfs/backend.rs            |   1 +
 core/src/services/fs/backend.rs               |   1 +
 core/src/services/ftp/backend.rs              |   1 +
 core/src/services/gcs/backend.rs              |   1 +
 core/src/services/gdrive/backend.rs           |   1 +
 core/src/services/ghac/backend.rs             |   1 +
 core/src/services/hdfs/backend.rs             |   1 +
 core/src/services/http/backend.rs             |   1 +
 core/src/services/ipfs/backend.rs             |   1 +
 core/src/services/ipmfs/backend.rs            |   1 +
 core/src/services/obs/backend.rs              |   1 +
 core/src/services/onedrive/backend.rs         |   1 +
 core/src/services/oss/backend.rs              |   1 +
 core/src/services/s3/backend.rs               |   1 +
 core/src/services/supabase/backend.rs         |   1 +
 core/src/services/vercel_artifacts/backend.rs |   1 +
 core/src/services/wasabi/backend.rs           |   1 +
 core/src/services/webdav/backend.rs           |   1 +
 core/src/services/webhdfs/backend.rs          |   1 +
 core/src/types/capability.rs                  |   3 +
 core/src/types/ops.rs                         |  11 ++
 37 files changed, 516 insertions(+), 2 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index eec72d56..cea2a117 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -327,6 +327,7 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
     type BlockingReader = CompleteReader<A, A::BlockingReader>;
     type Writer = CompleteWriter<A::Writer>;
     type BlockingWriter = CompleteWriter<A::BlockingWriter>;
+    type Appender = CompleteAppender<A::Appender>;
     type Pager = CompletePager<A, A::Pager>;
     type BlockingPager = CompletePager<A, A::BlockingPager>;
 
@@ -375,6 +376,13 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
             .map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        self.inner
+            .append(path, args)
+            .await
+            .map(|(rp, a)| (rp, CompleteAppender::new(a)))
+    }
+
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         self.complete_list(path, args).await
     }
@@ -646,3 +654,50 @@ where
         Ok(())
     }
 }
+
+pub struct CompleteAppender<A> {
+    inner: Option<A>,
+}
+
+impl<A> CompleteAppender<A> {
+    pub fn new(inner: A) -> CompleteAppender<A> {
+        CompleteAppender {
+            inner: Some(inner),
+        }
+    }
+}
+
+/// Check if the appender has been closed while debug_assertions enabled.
+/// This code will never be executed in release mode.
+#[cfg(debug_assertions)]
+impl<A> Drop for CompleteAppender<A> {
+    fn drop(&mut self) {
+        if !self.inner.is_some() {
+            // Do we need to panic here?
+            log::warn!("appender has not been closed, must be a bug")
+        }
+    }
+}
+
+#[async_trait]
+impl<A> oio::Append for CompleteAppender<A>
+where A: oio::Append
+{
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let a = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "appender has been closed or 
aborted")
+        })?;
+
+        a.append(bs).await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        let a = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "appender has been closed or 
aborted")
+        })?;
+
+        a.close().await?;
+        self.inner = None;
+        Ok(())
+    }
+}
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 07c4cfc4..5228eace 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -87,6 +87,7 @@ impl<A: Accessor> LayeredAccessor for 
ConcurrentLimitAccessor<A> {
     type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
     type Writer = ConcurrentLimitWrapper<A::Writer>;
     type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
+    type Appender = ConcurrentLimitWrapper<A::Appender>;
     type Pager = ConcurrentLimitWrapper<A::Pager>;
     type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>;
 
@@ -132,6 +133,20 @@ impl<A: Accessor> LayeredAccessor for 
ConcurrentLimitAccessor<A> {
             .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        let permit = self
+            .semaphore
+            .clone()
+            .acquire_owned()
+            .await
+            .expect("semaphore must be valid");
+
+        self.inner
+            .append(path, args)
+            .await
+            .map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit)))
+    }
+
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         let _permit = self
             .semaphore
@@ -309,6 +324,17 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
ConcurrentLimitWrapper<R> {
     }
 }
 
+#[async_trait]
+impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.append(bs).await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await
+    }
+}
+
 #[async_trait]
 impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> {
     async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 194b3168..a23b78dd 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -26,6 +26,7 @@ use bytes::Bytes;
 use futures::TryFutureExt;
 
 use crate::ops::*;
+use crate::raw::oio::AppendOperation;
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
 use crate::raw::oio::WriteOperation;
@@ -71,6 +72,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> 
{
     type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
     type Writer = ErrorContextWrapper<A::Writer>;
     type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
+    type Appender = ErrorContextWrapper<A::Appender>;
     type Pager = ErrorContextWrapper<A::Pager>;
     type BlockingPager = ErrorContextWrapper<A::BlockingPager>;
 
@@ -138,6 +140,27 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
             .await
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        self.inner
+            .append(path, args)
+            .map_ok(|(rp, os)| {
+                (
+                    rp,
+                    ErrorContextWrapper {
+                        scheme: self.meta.scheme(),
+                        path: path.to_string(),
+                        inner: os,
+                    },
+                )
+            })
+            .map_err(|err| {
+                err.with_operation(Operation::Append)
+                    .with_context("service", self.meta.scheme())
+                    .with_context("path", path)
+            })
+            .await
+    }
+
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         self.inner
             .copy(from, to, args)
@@ -447,6 +470,25 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for 
ErrorContextWrapper<T> {
     }
 }
 
+#[async_trait::async_trait]
+impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.append(bs).await.map_err(|err| {
+            err.with_operation(AppendOperation::Append)
+                .with_context("service", self.scheme)
+                .with_context("path", &self.path)
+        })
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await.map_err(|err| {
+            err.with_operation(AppendOperation::Close)
+                .with_context("service", self.scheme)
+                .with_context("path", &self.path)
+        })
+    }
+}
+
 #[async_trait::async_trait]
 impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> {
     async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
diff --git a/core/src/layers/immutable_index.rs 
b/core/src/layers/immutable_index.rs
index 01b4d8f0..70f3842d 100644
--- a/core/src/layers/immutable_index.rs
+++ b/core/src/layers/immutable_index.rs
@@ -139,6 +139,7 @@ impl<A: Accessor> LayeredAccessor for 
ImmutableIndexAccessor<A> {
     type BlockingReader = A::BlockingReader;
     type Writer = A::Writer;
     type BlockingWriter = A::BlockingWriter;
+    type Appender = A::Appender;
     type Pager = ImmutableDir;
     type BlockingPager = ImmutableDir;
 
@@ -194,6 +195,10 @@ impl<A: Accessor> LayeredAccessor for 
ImmutableIndexAccessor<A> {
         self.inner.blocking_write(path, args)
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        self.inner.append(path, args).await
+    }
+
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
         let mut path = path;
         if path == "/" {
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index cbbc8ea1..0fbfdd4c 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -190,6 +190,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
     type BlockingReader = LoggingReader<A::BlockingReader>;
     type Writer = LoggingWriter<A::Writer>;
     type BlockingWriter = LoggingWriter<A::BlockingWriter>;
+    type Appender = LoggingAppender<A::Appender>;
     type Pager = LoggingPager<A::Pager>;
     type BlockingPager = LoggingPager<A::BlockingPager>;
 
@@ -340,6 +341,51 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
             })
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        debug!(
+            target: LOGGING_TARGET,
+            "service={} operation={} path={} -> started",
+            self.scheme,
+            Operation::Append,
+            path
+        );
+
+        self.inner
+            .append(path, args)
+            .await
+            .map(|(rp, a)| {
+                debug!(
+                    target: LOGGING_TARGET,
+                    "service={} operation={} path={} -> start appending",
+                    self.scheme,
+                    Operation::Append,
+                    path,
+                );
+                let a = LoggingAppender::new(
+                    self.scheme,
+                    Operation::Append,
+                    path,
+                    a,
+                    self.failure_level,
+                );
+                (rp, a)
+            })
+            .map_err(|err| {
+                if let Some(lvl) = self.err_level(&err) {
+                    log!(
+                        target: LOGGING_TARGET,
+                        lvl,
+                        "service={} operation={} path={} -> {}: {err:?}",
+                        self.scheme,
+                        Operation::Append,
+                        path,
+                        self.err_status(&err)
+                    )
+                };
+                err
+            })
+    }
+
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         debug!(
             target: LOGGING_TARGET,
@@ -1424,6 +1470,98 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
     }
 }
 
+pub struct LoggingAppender<A> {
+    scheme: Scheme,
+    op: Operation,
+    path: String,
+
+    failure_level: Option<Level>,
+
+    inner: A,
+}
+
+impl<A> LoggingAppender<A> {
+    fn new(
+        scheme: Scheme,
+        op: Operation,
+        path: &str,
+        appender: A,
+        failure_level: Option<Level>,
+    ) -> Self {
+        Self {
+            scheme,
+            op,
+            path: path.to_string(),
+
+            failure_level,
+
+            inner: appender,
+        }
+    }
+}
+
+#[async_trait]
+impl<A: oio::Append> oio::Append for LoggingAppender<A>  {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let len = bs.len();
+
+        match self.inner.append(bs).await {
+            Ok(_) => {
+                trace!(
+                    target: LOGGING_TARGET,
+                    "service={} operation={} path={} -> data append {}B",
+                    self.scheme,
+                    self.op,
+                    self.path,
+                    len
+                );
+                Ok(())
+            }
+            Err(err) => {
+                if let Some(lvl) = self.failure_level {
+                    log!(
+                        target: LOGGING_TARGET,
+                        lvl,
+                        "service={} operation={} path={} -> data append 
failed: {err:?}",
+                        self.scheme,
+                        self.op,
+                        self.path,
+                    )
+                }
+                Err(err)
+            }
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        match self.inner.close().await {
+            Ok(_) => {
+                debug!(
+                    target: LOGGING_TARGET,
+                    "service={} operation={} path={} -> data appended 
finished",
+                    self.scheme,
+                    self.op,
+                    self.path,
+                );
+                Ok(())
+            }
+            Err(err) => {
+                if let Some(lvl) = self.failure_level {
+                    log!(
+                        target: LOGGING_TARGET,
+                        lvl,
+                        "service={} operation={} path={} -> data appender 
close failed: {err:?}",
+                        self.scheme,
+                        self.op,
+                        self.path,
+                    )
+                }
+                Err(err)
+            }
+        }
+    }
+}
+
 pub struct LoggingPager<P> {
     scheme: Scheme,
     path: String,
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 8ebd3838..8fcf615f 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -35,6 +35,7 @@ use futures::FutureExt;
 use log::warn;
 
 use crate::ops::*;
+use crate::raw::oio::AppendOperation;
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
 use crate::raw::oio::WriteOperation;
@@ -161,6 +162,7 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
     type BlockingReader = RetryWrapper<A::BlockingReader>;
     type Writer = RetryWrapper<A::Writer>;
     type BlockingWriter = RetryWrapper<A::BlockingWriter>;
+    type Appender = RetryWrapper<A::Appender>;
     type Pager = RetryWrapper<A::Pager>;
     type BlockingPager = RetryWrapper<A::BlockingPager>;
 
@@ -219,6 +221,23 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
             .await
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        { || self.inner.append(path, args.clone()) }
+            .retry(&self.builder)
+            .when(|e| e.is_temporary())
+            .notify(|err, dur| {
+                warn!(
+                    target: "opendal::service",
+                    "operation={} -> retry after {}s: error={:?}",
+                    Operation::Append, dur.as_secs_f64(), err)
+            })
+            .map(|v| {
+                v.map(|(rp, r)| (rp, RetryWrapper::new(r, path, 
self.builder.clone())))
+                    .map_err(|e| e.set_persistent())
+            })
+            .await
+    }
+
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         { || self.inner.stat(path, args.clone()) }
             .retry(&self.builder)
@@ -703,6 +722,51 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
RetryWrapper<R> {
     }
 }
 
+#[async_trait]
+impl<A: oio::Append> oio::Append for RetryWrapper<A> {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let mut backoff = self.builder.build();
+
+        loop {
+            match self.inner.append(bs.clone()).await {
+                Ok(v) => return Ok(v),
+                Err(e) if !e.is_temporary() => return Err(e),
+                Err(e) => match backoff.next() {
+                    None => return Err(e),
+                    Some(dur) => {
+                        warn!(target: "opendal::service",
+                              "operation={} path={} -> appender retry after 
{}s: error={:?}",
+                              AppendOperation::Append, self.path, 
dur.as_secs_f64(), e);
+                        tokio::time::sleep(dur).await;
+                        continue;
+                    }
+                },
+            }
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        let mut backoff = self.builder.build();
+
+        loop {
+            match self.inner.close().await {
+                Ok(v) => return Ok(v),
+                Err(e) if !e.is_temporary() => return Err(e),
+                Err(e) => match backoff.next() {
+                    None => return Err(e),
+                    Some(dur) => {
+                        warn!(target: "opendal::service",
+                              "operation={} path={} -> appender retry after 
{}s: error={:?}",
+                              AppendOperation::Close, self.path, 
dur.as_secs_f64(), e);
+                        tokio::time::sleep(dur).await;
+                        continue;
+                    }
+                },
+            }
+        }
+    }
+}
+
 #[async_trait]
 impl<P: oio::Page> oio::Page for RetryWrapper<P> {
     async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
@@ -790,6 +854,7 @@ mod tests {
         type BlockingReader = ();
         type Writer = ();
         type BlockingWriter = ();
+        type Appender = ();
         type Pager = MockPager;
         type BlockingPager = ();
 
diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs
index e47124b7..fb236646 100644
--- a/core/src/layers/type_eraser.rs
+++ b/core/src/layers/type_eraser.rs
@@ -58,6 +58,7 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
     type BlockingReader = oio::BlockingReader;
     type Writer = oio::Writer;
     type BlockingWriter = oio::BlockingWriter;
+    type Appender = oio::Appender;
     type Pager = oio::Pager;
     type BlockingPager = oio::BlockingPager;
 
@@ -91,6 +92,13 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
             .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        self.inner
+            .append(path, args)
+            .await
+            .map(|(rp, a)| (rp, Box::new(a) as oio::Appender))
+    }
+
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         self.inner
             .list(path, args)
diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index 48e2ea64..8fb98d0c 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -71,6 +71,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
     /// BlockingPager is the associated pager that could return in
     /// `blocking_list` operation.
     type BlockingPager: oio::BlockingPage;
+    /// Appender is the associated appender that could return in `append` 
operation.
+    type Appender: oio::Append;
 
     /// Invoke the `info` operation to get metadata of accessor.
     ///
@@ -137,6 +139,23 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
         ))
     }
 
+    /// Invoke the `append` operation on the specified path, returns a
+    /// appended size if operate successful.
+    ///
+    ///  Require [`Capability::append`]
+    ///
+    /// # Behavior
+    ///
+    /// - Input path MUST be file path, DON'T NEED to check mode.
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        let (_, _) = (path, args);
+
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "operation is not supported",
+        ))
+    }
+
     /// Invoke the `copy` operation on the specified `from` path and `to` path.
     ///
     /// Require [Capability::copy]
@@ -371,6 +390,7 @@ impl Accessor for () {
     type BlockingReader = ();
     type Writer = ();
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ();
     type BlockingPager = ();
 
@@ -392,6 +412,7 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
     type BlockingReader = T::BlockingReader;
     type Writer = T::Writer;
     type BlockingWriter = T::BlockingWriter;
+    type Appender = T::Appender;
     type Pager = T::Pager;
     type BlockingPager = T::BlockingPager;
 
@@ -410,6 +431,10 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
         self.as_ref().write(path, args).await
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        self.as_ref().append(path, args).await
+    }
+
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         self.as_ref().copy(from, to, args).await
     }
@@ -472,6 +497,7 @@ pub type FusedAccessor = Arc<
         BlockingReader = oio::BlockingReader,
         Writer = oio::Writer,
         BlockingWriter = oio::BlockingWriter,
+        Appender = oio::Appender,
         Pager = oio::Pager,
         BlockingPager = oio::BlockingPager,
     >,
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 3798021a..ee8d32ac 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -63,6 +63,7 @@ impl<S: Adapter> Accessor for Backend<S> {
     type BlockingReader = oio::Cursor;
     type Writer = KvWriter<S>;
     type BlockingWriter = KvWriter<S>;
+    type Appender = ();
     type Pager = KvPager;
     type BlockingPager = KvPager;
 
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 3bb3147a..e04cd202 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -59,6 +59,7 @@ impl<S: Adapter> Accessor for Backend<S> {
     type BlockingReader = oio::Cursor;
     type Writer = KvWriter<S>;
     type BlockingWriter = KvWriter<S>;
+    type Appender = ();
     type Pager = KvPager;
     type BlockingPager = KvPager;
 
diff --git a/core/src/raw/layer.rs b/core/src/raw/layer.rs
index 7535aa2d..a912c8ea 100644
--- a/core/src/raw/layer.rs
+++ b/core/src/raw/layer.rs
@@ -134,6 +134,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 
'static {
     type BlockingReader: oio::BlockingRead;
     type Writer: oio::Write;
     type BlockingWriter: oio::BlockingWrite;
+    type Appender: oio::Append;
     type Pager: oio::Page;
     type BlockingPager: oio::BlockingPage;
 
@@ -151,6 +152,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 
'static {
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)>;
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)>;
+
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         self.inner().copy(from, to, args).await
     }
@@ -210,6 +213,7 @@ impl<L: LayeredAccessor> Accessor for L {
     type BlockingReader = L::BlockingReader;
     type Writer = L::Writer;
     type BlockingWriter = L::BlockingWriter;
+    type Appender = L::Appender;
     type Pager = L::Pager;
     type BlockingPager = L::BlockingPager;
 
@@ -229,6 +233,10 @@ impl<L: LayeredAccessor> Accessor for L {
         (self as &L).write(path, args).await
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        (self as &L).append(path, args).await
+    }
+
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         (self as &L).copy(from, to, args).await
     }
@@ -323,6 +331,7 @@ mod tests {
         type BlockingReader = ();
         type Writer = ();
         type BlockingWriter = ();
+        type Appender = ();
         type Pager = ();
         type BlockingPager = ();
 
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 56b48dda..600783f5 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -40,6 +40,9 @@ pub use write::BlockingWriter;
 pub use write::Write;
 pub use write::WriteOperation;
 pub use write::Writer;
+pub use write::Append;
+pub use write::AppendOperation;
+pub use write::Appender;
 
 mod cursor;
 pub use cursor::Cursor;
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index dade202c..49c0972f 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -123,8 +123,9 @@ impl Write for () {
     }
 }
 
-/// `Box<dyn Write>` won't implement `Write` automatically. To make Writer
-/// work as expected, we must add this impl.
+/// `Box<dyn Write>` won't implement `Write` automatically.
+///
+/// To make Writer work as expected, we must add this impl.
 #[async_trait]
 impl<T: Write + ?Sized> Write for Box<T> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
@@ -168,6 +169,7 @@ impl BlockingWrite for () {
 }
 
 /// `Box<dyn BlockingWrite>` won't implement `BlockingWrite` automatically.
+///
 /// To make BlockingWriter work as expected, we must add this impl.
 impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
@@ -178,3 +180,88 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
         (**self).close()
     }
 }
+
+/// AppendOperation is the name for APIs of Append.
+#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
+#[non_exhaustive]
+pub enum AppendOperation {
+    /// Operation for [`Append::append`]
+    Append,
+    /// Operation for [`Append::close`]
+    Close,
+}
+
+impl AppendOperation {
+    /// Convert self into static str.
+    pub fn into_static(self) -> &'static str {
+        self.into()
+    }
+}
+
+impl Display for AppendOperation {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.into_static())
+    }
+}
+
+impl From<AppendOperation> for &'static str {
+    fn from(v: AppendOperation) -> &'static str {
+        use AppendOperation::*;
+
+        match v {
+            Append => "Append::append",
+            Close => "Append::close",
+        }
+    }
+}
+
+/// Appender is a type erased [`Append`]
+pub type Appender = Box<dyn Append>;
+
+/// Append is the trait that OpenDAL returns to callers.
+///
+/// # Notes
+///
+/// Users will call `append` multiple times.
+#[async_trait]
+pub trait Append: Unpin + Send + Sync {
+    /// Append data to the end of file.
+    ///
+    /// Users will call `append` multiple times.
+    /// Please make sure `append` is safe to re-enter.
+    async fn append(&mut self, bs: Bytes) -> Result<()>;
+
+    /// Seal the file to mark it as unmodifiable.
+    async fn close(&mut self) -> Result<()>;
+}
+
+#[async_trait]
+impl Append for () {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let _ = bs;
+
+        unimplemented!("append is required to be implemented for oio::Append")
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "output appender doesn't support close",
+        ))
+    }
+}
+
+/// `Box<dyn Append>` won't implement `Append` automatically.
+///
+/// To make Appender work as expected, we must add this impl.
+#[async_trait]
+impl<T: Append + ?Sized> Append for Box<T> {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        (**self).append(bs).await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        (**self).close().await
+    }
+
+}
diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs
index 0600ce26..105b75c9 100644
--- a/core/src/raw/operation.rs
+++ b/core/src/raw/operation.rs
@@ -31,6 +31,8 @@ pub enum Operation {
     Read,
     /// Operation for [`crate::raw::Accessor::write`]
     Write,
+    /// Operation for [`crate::raw::Accessor::append`]
+    Append,
     /// Operation for [`crate::raw::Accessor::copy`]
     Copy,
     /// Operation for [`crate::raw::Accessor::rename`]
@@ -87,6 +89,7 @@ impl From<Operation> for &'static str {
             Operation::CreateDir => "create_dir",
             Operation::Read => "read",
             Operation::Write => "write",
+            Operation::Append => "append",
             Operation::Copy => "copy",
             Operation::Rename => "rename",
             Operation::Stat => "stat",
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index bc45d145..7cd24611 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -195,6 +195,17 @@ impl RpWrite {
     }
 }
 
+/// Reply for `append` operation.
+#[derive(Debug, Clone, Default)]
+pub struct RpAppend {}
+
+impl RpAppend {
+    /// Create a new reply for `append`.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
 /// Reply for `copy` operation.
 #[derive(Debug, Clone, Default)]
 pub struct RpCopy {}
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index ac6d2e0f..9712c4c9 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -449,6 +449,7 @@ impl Accessor for AzblobBackend {
     type BlockingReader = ();
     type Writer = AzblobWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = AzblobPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 57807205..d428a780 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -304,6 +304,7 @@ impl Accessor for AzdfsBackend {
     type BlockingReader = ();
     type Writer = AzdfsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = AzdfsPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6b5aaaea..282656d9 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -296,6 +296,7 @@ impl Accessor for FsBackend {
     type BlockingReader = oio::into_blocking_reader::FdReader<std::fs::File>;
     type Writer = FsWriter<tokio::fs::File>;
     type BlockingWriter = FsWriter<std::fs::File>;
+    type Appender = ();
     type Pager = Option<FsPager<tokio::fs::ReadDir>>;
     type BlockingPager = Option<FsPager<std::fs::ReadDir>>;
 
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index d5ae8ec0..3efd60b0 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -314,6 +314,7 @@ impl Accessor for FtpBackend {
     type BlockingReader = ();
     type Writer = FtpWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = FtpPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 188b9713..dcc646eb 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -396,6 +396,7 @@ impl Accessor for GcsBackend {
     type BlockingReader = ();
     type Writer = GcsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = GcsPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 03d8c83e..80242607 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -56,6 +56,7 @@ impl Accessor for GdriveBackend {
     type BlockingReader = ();
     type Writer = GdriveWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ();
     type BlockingPager = ();
 
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index cd264bb3..380dd9a2 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -296,6 +296,7 @@ impl Accessor for GhacBackend {
     type BlockingReader = ();
     type Writer = GhacWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ();
     type BlockingPager = ();
 
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index 81d76699..bce999bd 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -235,6 +235,7 @@ impl Accessor for HdfsBackend {
     type BlockingReader = oio::into_blocking_reader::FdReader<hdrs::File>;
     type Writer = HdfsWriter<hdrs::AsyncFile>;
     type BlockingWriter = HdfsWriter<hdrs::File>;
+    type Appender = ();
     type Pager = Option<HdfsPager>;
     type BlockingPager = Option<HdfsPager>;
 
diff --git a/core/src/services/http/backend.rs 
b/core/src/services/http/backend.rs
index 3aa60878..8be0f16a 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -256,6 +256,7 @@ impl Accessor for HttpBackend {
     type BlockingReader = ();
     type Writer = ();
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ();
     type BlockingPager = ();
 
diff --git a/core/src/services/ipfs/backend.rs 
b/core/src/services/ipfs/backend.rs
index 78da66f9..2436c578 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -217,6 +217,7 @@ impl Accessor for IpfsBackend {
     type BlockingReader = ();
     type Writer = ();
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = DirStream;
     type BlockingPager = ();
 
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index 28129f86..43c69f8e 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -84,6 +84,7 @@ impl Accessor for IpmfsBackend {
     type BlockingReader = ();
     type Writer = IpmfsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = IpmfsPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 29a745ca..3a9e1b84 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -301,6 +301,7 @@ impl Accessor for ObsBackend {
     type BlockingReader = ();
     type Writer = ObsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ObsPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 0bf36f32..56c535ad 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -78,6 +78,7 @@ impl Accessor for OnedriveBackend {
     type BlockingReader = ();
     type Writer = OneDriveWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = OnedrivePager;
     type BlockingPager = ();
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 097da6d2..713236f1 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -441,6 +441,7 @@ impl Accessor for OssBackend {
     type BlockingReader = ();
     type Writer = OssWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = OssPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 91ff14f1..51ecfccd 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -953,6 +953,7 @@ impl Accessor for S3Backend {
     type BlockingReader = ();
     type Writer = S3Writer;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = S3Pager;
     type BlockingPager = ();
 
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
index 69621a33..34f31521 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -209,6 +209,7 @@ impl Accessor for SupabaseBackend {
     type BlockingReader = ();
     type Writer = SupabaseWriter;
     type BlockingWriter = ();
+    type Appender = ();
     // todo: implement Pager to support list and scan
     type Pager = ();
     type BlockingPager = ();
diff --git a/core/src/services/vercel_artifacts/backend.rs 
b/core/src/services/vercel_artifacts/backend.rs
index ea850303..c651af5e 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -51,6 +51,7 @@ impl Accessor for VercelArtifactsBackend {
     type BlockingReader = ();
     type Writer = VercelArtifactsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = ();
     type BlockingPager = ();
 
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index fe2344c5..39e7c680 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -896,6 +896,7 @@ impl Accessor for WasabiBackend {
     type BlockingReader = ();
     type Writer = WasabiWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = WasabiPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 78b6463a..68b54019 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -262,6 +262,7 @@ impl Accessor for WebdavBackend {
     type BlockingReader = ();
     type Writer = WebdavWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = WebdavPager;
     type BlockingPager = ();
 
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 10d5f066..01c6801c 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -463,6 +463,7 @@ impl Accessor for WebhdfsBackend {
     type BlockingReader = ();
     type Writer = WebhdfsWriter;
     type BlockingWriter = ();
+    type Appender = ();
     type Pager = WebhdfsPager;
     type BlockingPager = ();
 
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index 1bf9e89b..14685c30 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -86,6 +86,9 @@ pub struct Capability {
     /// If operator supports write with cache control natively, it will be 
true.
     pub write_with_cache_control: bool,
 
+    /// If operator supports append natively, it will be true.
+    pub append: bool,
+
     /// If operator supports create dir natively, it will be true.
     pub create_dir: bool,
 
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index aba48950..78545dda 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -397,6 +397,17 @@ impl OpWrite {
     }
 }
 
+/// Args for `append` operation.
+#[derive(Debug, Clone, Default)]
+pub struct OpAppend {}
+
+impl OpAppend {
+    /// Create a new `OpAppend`.
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
 /// Args for `copy` operation.
 #[derive(Debug, Clone, Default)]
 pub struct OpCopy {}


Reply via email to