This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch append in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit aed4bb8193f921543e4577ade33ce13a3b03761f Author: suyanhanx <[email protected]> AuthorDate: Sat May 13 17:22:43 2023 +0800 feat(core): append Signed-off-by: suyanhanx <[email protected]> --- core/src/layers/complete.rs | 55 ++++++++++ core/src/layers/concurrent_limit.rs | 26 +++++ core/src/layers/error_context.rs | 42 ++++++++ core/src/layers/immutable_index.rs | 5 + core/src/layers/logging.rs | 138 ++++++++++++++++++++++++++ core/src/layers/retry.rs | 65 ++++++++++++ core/src/layers/type_eraser.rs | 8 ++ core/src/raw/accessor.rs | 26 +++++ core/src/raw/adapters/kv/backend.rs | 1 + core/src/raw/adapters/typed_kv/backend.rs | 1 + core/src/raw/layer.rs | 9 ++ core/src/raw/oio/mod.rs | 3 + core/src/raw/oio/write.rs | 91 ++++++++++++++++- core/src/raw/operation.rs | 3 + core/src/raw/rps.rs | 11 ++ core/src/services/azblob/backend.rs | 1 + core/src/services/azdfs/backend.rs | 1 + core/src/services/fs/backend.rs | 1 + core/src/services/ftp/backend.rs | 1 + core/src/services/gcs/backend.rs | 1 + core/src/services/gdrive/backend.rs | 1 + core/src/services/ghac/backend.rs | 1 + core/src/services/hdfs/backend.rs | 1 + core/src/services/http/backend.rs | 1 + core/src/services/ipfs/backend.rs | 1 + core/src/services/ipmfs/backend.rs | 1 + core/src/services/obs/backend.rs | 1 + core/src/services/onedrive/backend.rs | 1 + core/src/services/oss/backend.rs | 1 + core/src/services/s3/backend.rs | 1 + core/src/services/supabase/backend.rs | 1 + core/src/services/vercel_artifacts/backend.rs | 1 + core/src/services/wasabi/backend.rs | 1 + core/src/services/webdav/backend.rs | 1 + core/src/services/webhdfs/backend.rs | 1 + core/src/types/capability.rs | 3 + core/src/types/ops.rs | 11 ++ 37 files changed, 516 insertions(+), 2 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index eec72d56..cea2a117 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -327,6 +327,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { type BlockingReader = CompleteReader<A, A::BlockingReader>; type Writer = CompleteWriter<A::Writer>; type BlockingWriter = CompleteWriter<A::BlockingWriter>; + type Appender = CompleteAppender<A::Appender>; type Pager = CompletePager<A, A::Pager>; type BlockingPager = CompletePager<A, A::BlockingPager>; @@ -375,6 +376,13 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { .map(|(rp, w)| (rp, CompleteWriter::new(w, size))) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner + .append(path, args) + .await + .map(|(rp, a)| (rp, CompleteAppender::new(a))) + } + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.complete_list(path, args).await } @@ -646,3 +654,50 @@ where Ok(()) } } + +pub struct CompleteAppender<A> { + inner: Option<A>, +} + +impl<A> CompleteAppender<A> { + pub fn new(inner: A) -> CompleteAppender<A> { + CompleteAppender { + inner: Some(inner), + } + } +} + +/// Check if the appender has been closed while debug_assertions enabled. +/// This code will never be executed in release mode. +#[cfg(debug_assertions)] +impl<A> Drop for CompleteAppender<A> { + fn drop(&mut self) { + if !self.inner.is_some() { + // Do we need to panic here? + log::warn!("appender has not been closed, must be a bug") + } + } +} + +#[async_trait] +impl<A> oio::Append for CompleteAppender<A> +where A: oio::Append +{ + async fn append(&mut self, bs: Bytes) -> Result<()> { + let a = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "appender has been closed or aborted") + })?; + + a.append(bs).await + } + + async fn close(&mut self) -> Result<()> { + let a = self.inner.as_mut().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "appender has been closed or aborted") + })?; + + a.close().await?; + self.inner = None; + Ok(()) + } +} diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 07c4cfc4..5228eace 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -87,6 +87,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> { type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>; type Writer = ConcurrentLimitWrapper<A::Writer>; type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>; + type Appender = ConcurrentLimitWrapper<A::Appender>; type Pager = ConcurrentLimitWrapper<A::Pager>; type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>; @@ -132,6 +133,20 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> { .map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit))) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + let permit = self + .semaphore + .clone() + .acquire_owned() + .await + .expect("semaphore must be valid"); + + self.inner + .append(path, args) + .await + .map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit))) + } + async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { let _permit = self .semaphore @@ -309,6 +324,17 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { } } +#[async_trait] +impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> { + async fn append(&mut self, bs: Bytes) -> Result<()> { + self.inner.append(bs).await + } + + async fn close(&mut self) -> Result<()> { + self.inner.close().await + } +} + #[async_trait] impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> { async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 194b3168..a23b78dd 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -26,6 +26,7 @@ use bytes::Bytes; use futures::TryFutureExt; use crate::ops::*; +use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -71,6 +72,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> { type BlockingReader = ErrorContextWrapper<A::BlockingReader>; type Writer = ErrorContextWrapper<A::Writer>; type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>; + type Appender = ErrorContextWrapper<A::Appender>; type Pager = ErrorContextWrapper<A::Pager>; type BlockingPager = ErrorContextWrapper<A::BlockingPager>; @@ -138,6 +140,27 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> { .await } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner + .append(path, args) + .map_ok(|(rp, os)| { + ( + rp, + ErrorContextWrapper { + scheme: self.meta.scheme(), + path: path.to_string(), + inner: os, + }, + ) + }) + .map_err(|err| { + err.with_operation(Operation::Append) + .with_context("service", self.meta.scheme()) + .with_context("path", path) + }) + .await + } + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { self.inner .copy(from, to, args) @@ -447,6 +470,25 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { } } +#[async_trait::async_trait] +impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> { + async fn append(&mut self, bs: Bytes) -> Result<()> { + self.inner.append(bs).await.map_err(|err| { + err.with_operation(AppendOperation::Append) + .with_context("service", self.scheme) + .with_context("path", &self.path) + }) + } + + async fn close(&mut self) -> Result<()> { + self.inner.close().await.map_err(|err| { + err.with_operation(AppendOperation::Close) + .with_context("service", self.scheme) + .with_context("path", &self.path) + }) + } +} + #[async_trait::async_trait] impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> { async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index 01b4d8f0..70f3842d 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -139,6 +139,7 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> { type BlockingReader = A::BlockingReader; type Writer = A::Writer; type BlockingWriter = A::BlockingWriter; + type Appender = A::Appender; type Pager = ImmutableDir; type BlockingPager = ImmutableDir; @@ -194,6 +195,10 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> { self.inner.blocking_write(path, args) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner.append(path, args).await + } + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { let mut path = path; if path == "/" { diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index cbbc8ea1..0fbfdd4c 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -190,6 +190,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> { type BlockingReader = LoggingReader<A::BlockingReader>; type Writer = LoggingWriter<A::Writer>; type BlockingWriter = LoggingWriter<A::BlockingWriter>; + type Appender = LoggingAppender<A::Appender>; type Pager = LoggingPager<A::Pager>; type BlockingPager = LoggingPager<A::BlockingPager>; @@ -340,6 +341,51 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> { }) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> started", + self.scheme, + Operation::Append, + path + ); + + self.inner + .append(path, args) + .await + .map(|(rp, a)| { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> start appending", + self.scheme, + Operation::Append, + path, + ); + let a = LoggingAppender::new( + self.scheme, + Operation::Append, + path, + a, + self.failure_level, + ); + (rp, a) + }) + .map_err(|err| { + if let Some(lvl) = self.err_level(&err) { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} -> {}: {err:?}", + self.scheme, + Operation::Append, + path, + self.err_status(&err) + ) + }; + err + }) + } + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { debug!( target: LOGGING_TARGET, @@ -1424,6 +1470,98 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> { } } +pub struct LoggingAppender<A> { + scheme: Scheme, + op: Operation, + path: String, + + failure_level: Option<Level>, + + inner: A, +} + +impl<A> LoggingAppender<A> { + fn new( + scheme: Scheme, + op: Operation, + path: &str, + appender: A, + failure_level: Option<Level>, + ) -> Self { + Self { + scheme, + op, + path: path.to_string(), + + failure_level, + + inner: appender, + } + } +} + +#[async_trait] +impl<A: oio::Append> oio::Append for LoggingAppender<A> { + async fn append(&mut self, bs: Bytes) -> Result<()> { + let len = bs.len(); + + match self.inner.append(bs).await { + Ok(_) => { + trace!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> data append {}B", + self.scheme, + self.op, + self.path, + len + ); + Ok(()) + } + Err(err) => { + if let Some(lvl) = self.failure_level { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} -> data append failed: {err:?}", + self.scheme, + self.op, + self.path, + ) + } + Err(err) + } + } + } + + async fn close(&mut self) -> Result<()> { + match self.inner.close().await { + Ok(_) => { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> data appended finished", + self.scheme, + self.op, + self.path, + ); + Ok(()) + } + Err(err) => { + if let Some(lvl) = self.failure_level { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} -> data appender close failed: {err:?}", + self.scheme, + self.op, + self.path, + ) + } + Err(err) + } + } + } +} + pub struct LoggingPager<P> { scheme: Scheme, path: String, diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 8ebd3838..8fcf615f 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -35,6 +35,7 @@ use futures::FutureExt; use log::warn; use crate::ops::*; +use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -161,6 +162,7 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> { type BlockingReader = RetryWrapper<A::BlockingReader>; type Writer = RetryWrapper<A::Writer>; type BlockingWriter = RetryWrapper<A::BlockingWriter>; + type Appender = RetryWrapper<A::Appender>; type Pager = RetryWrapper<A::Pager>; type BlockingPager = RetryWrapper<A::BlockingPager>; @@ -219,6 +221,23 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> { .await } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + { || self.inner.append(path, args.clone()) } + .retry(&self.builder) + .when(|e| e.is_temporary()) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Append, dur.as_secs_f64(), err) + }) + .map(|v| { + v.map(|(rp, r)| (rp, RetryWrapper::new(r, path, self.builder.clone()))) + .map_err(|e| e.set_persistent()) + }) + .await + } + async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { { || self.inner.stat(path, args.clone()) } .retry(&self.builder) @@ -703,6 +722,51 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for RetryWrapper<R> { } } +#[async_trait] +impl<A: oio::Append> oio::Append for RetryWrapper<A> { + async fn append(&mut self, bs: Bytes) -> Result<()> { + let mut backoff = self.builder.build(); + + loop { + match self.inner.append(bs.clone()).await { + Ok(v) => return Ok(v), + Err(e) if !e.is_temporary() => return Err(e), + Err(e) => match backoff.next() { + None => return Err(e), + Some(dur) => { + warn!(target: "opendal::service", + "operation={} path={} -> appender retry after {}s: error={:?}", + AppendOperation::Append, self.path, dur.as_secs_f64(), e); + tokio::time::sleep(dur).await; + continue; + } + }, + } + } + } + + async fn close(&mut self) -> Result<()> { + let mut backoff = self.builder.build(); + + loop { + match self.inner.close().await { + Ok(v) => return Ok(v), + Err(e) if !e.is_temporary() => return Err(e), + Err(e) => match backoff.next() { + None => return Err(e), + Some(dur) => { + warn!(target: "opendal::service", + "operation={} path={} -> appender retry after {}s: error={:?}", + AppendOperation::Close, self.path, dur.as_secs_f64(), e); + tokio::time::sleep(dur).await; + continue; + } + }, + } + } + } +} + #[async_trait] impl<P: oio::Page> oio::Page for RetryWrapper<P> { async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { @@ -790,6 +854,7 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); + type Appender = (); type Pager = MockPager; type BlockingPager = (); diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs index e47124b7..fb236646 100644 --- a/core/src/layers/type_eraser.rs +++ b/core/src/layers/type_eraser.rs @@ -58,6 +58,7 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { type BlockingReader = oio::BlockingReader; type Writer = oio::Writer; type BlockingWriter = oio::BlockingWriter; + type Appender = oio::Appender; type Pager = oio::Pager; type BlockingPager = oio::BlockingPager; @@ -91,6 +92,13 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter)) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner + .append(path, args) + .await + .map(|(rp, a)| (rp, Box::new(a) as oio::Appender)) + } + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.inner .list(path, args) diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index 48e2ea64..8fb98d0c 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -71,6 +71,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// BlockingPager is the associated pager that could return in /// `blocking_list` operation. type BlockingPager: oio::BlockingPage; + /// Appender is the associated appender that could return in `append` operation. + type Appender: oio::Append; /// Invoke the `info` operation to get metadata of accessor. /// @@ -137,6 +139,23 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { )) } + /// Invoke the `append` operation on the specified path, returns a + /// appended size if operate successful. + /// + /// Require [`Capability::append`] + /// + /// # Behavior + /// + /// - Input path MUST be file path, DON'T NEED to check mode. + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + let (_, _) = (path, args); + + Err(Error::new( + ErrorKind::Unsupported, + "operation is not supported", + )) + } + /// Invoke the `copy` operation on the specified `from` path and `to` path. /// /// Require [Capability::copy] @@ -371,6 +390,7 @@ impl Accessor for () { type BlockingReader = (); type Writer = (); type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); @@ -392,6 +412,7 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> { type BlockingReader = T::BlockingReader; type Writer = T::Writer; type BlockingWriter = T::BlockingWriter; + type Appender = T::Appender; type Pager = T::Pager; type BlockingPager = T::BlockingPager; @@ -410,6 +431,10 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> { self.as_ref().write(path, args).await } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.as_ref().append(path, args).await + } + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { self.as_ref().copy(from, to, args).await } @@ -472,6 +497,7 @@ pub type FusedAccessor = Arc< BlockingReader = oio::BlockingReader, Writer = oio::Writer, BlockingWriter = oio::BlockingWriter, + Appender = oio::Appender, Pager = oio::Pager, BlockingPager = oio::BlockingPager, >, diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 3798021a..ee8d32ac 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -63,6 +63,7 @@ impl<S: Adapter> Accessor for Backend<S> { type BlockingReader = oio::Cursor; type Writer = KvWriter<S>; type BlockingWriter = KvWriter<S>; + type Appender = (); type Pager = KvPager; type BlockingPager = KvPager; diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 3bb3147a..e04cd202 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -59,6 +59,7 @@ impl<S: Adapter> Accessor for Backend<S> { type BlockingReader = oio::Cursor; type Writer = KvWriter<S>; type BlockingWriter = KvWriter<S>; + type Appender = (); type Pager = KvPager; type BlockingPager = KvPager; diff --git a/core/src/raw/layer.rs b/core/src/raw/layer.rs index 7535aa2d..a912c8ea 100644 --- a/core/src/raw/layer.rs +++ b/core/src/raw/layer.rs @@ -134,6 +134,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { type BlockingReader: oio::BlockingRead; type Writer: oio::Write; type BlockingWriter: oio::BlockingWrite; + type Appender: oio::Append; type Pager: oio::Page; type BlockingPager: oio::BlockingPage; @@ -151,6 +152,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)>; + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)>; + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { self.inner().copy(from, to, args).await } @@ -210,6 +213,7 @@ impl<L: LayeredAccessor> Accessor for L { type BlockingReader = L::BlockingReader; type Writer = L::Writer; type BlockingWriter = L::BlockingWriter; + type Appender = L::Appender; type Pager = L::Pager; type BlockingPager = L::BlockingPager; @@ -229,6 +233,10 @@ impl<L: LayeredAccessor> Accessor for L { (self as &L).write(path, args).await } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + (self as &L).append(path, args).await + } + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { (self as &L).copy(from, to, args).await } @@ -323,6 +331,7 @@ mod tests { type BlockingReader = (); type Writer = (); type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index 56b48dda..600783f5 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -40,6 +40,9 @@ pub use write::BlockingWriter; pub use write::Write; pub use write::WriteOperation; pub use write::Writer; +pub use write::Append; +pub use write::AppendOperation; +pub use write::Appender; mod cursor; pub use cursor::Cursor; diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index dade202c..49c0972f 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -123,8 +123,9 @@ impl Write for () { } } -/// `Box<dyn Write>` won't implement `Write` automatically. To make Writer -/// work as expected, we must add this impl. +/// `Box<dyn Write>` won't implement `Write` automatically. +/// +/// To make Writer work as expected, we must add this impl. #[async_trait] impl<T: Write + ?Sized> Write for Box<T> { async fn write(&mut self, bs: Bytes) -> Result<()> { @@ -168,6 +169,7 @@ impl BlockingWrite for () { } /// `Box<dyn BlockingWrite>` won't implement `BlockingWrite` automatically. +/// /// To make BlockingWriter work as expected, we must add this impl. impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { fn write(&mut self, bs: Bytes) -> Result<()> { @@ -178,3 +180,88 @@ impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> { (**self).close() } } + +/// AppendOperation is the name for APIs of Append. +#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] +#[non_exhaustive] +pub enum AppendOperation { + /// Operation for [`Append::append`] + Append, + /// Operation for [`Append::close`] + Close, +} + +impl AppendOperation { + /// Convert self into static str. + pub fn into_static(self) -> &'static str { + self.into() + } +} + +impl Display for AppendOperation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.into_static()) + } +} + +impl From<AppendOperation> for &'static str { + fn from(v: AppendOperation) -> &'static str { + use AppendOperation::*; + + match v { + Append => "Append::append", + Close => "Append::close", + } + } +} + +/// Appender is a type erased [`Append`] +pub type Appender = Box<dyn Append>; + +/// Append is the trait that OpenDAL returns to callers. +/// +/// # Notes +/// +/// Users will call `append` multiple times. +#[async_trait] +pub trait Append: Unpin + Send + Sync { + /// Append data to the end of file. + /// + /// Users will call `append` multiple times. + /// Please make sure `append` is safe to re-enter. + async fn append(&mut self, bs: Bytes) -> Result<()>; + + /// Seal the file to mark it as unmodifiable. + async fn close(&mut self) -> Result<()>; +} + +#[async_trait] +impl Append for () { + async fn append(&mut self, bs: Bytes) -> Result<()> { + let _ = bs; + + unimplemented!("append is required to be implemented for oio::Append") + } + + async fn close(&mut self) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "output appender doesn't support close", + )) + } +} + +/// `Box<dyn Append>` won't implement `Append` automatically. +/// +/// To make Appender work as expected, we must add this impl. +#[async_trait] +impl<T: Append + ?Sized> Append for Box<T> { + async fn append(&mut self, bs: Bytes) -> Result<()> { + (**self).append(bs).await + } + + async fn close(&mut self) -> Result<()> { + (**self).close().await + } + +} diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs index 0600ce26..105b75c9 100644 --- a/core/src/raw/operation.rs +++ b/core/src/raw/operation.rs @@ -31,6 +31,8 @@ pub enum Operation { Read, /// Operation for [`crate::raw::Accessor::write`] Write, + /// Operation for [`crate::raw::Accessor::append`] + Append, /// Operation for [`crate::raw::Accessor::copy`] Copy, /// Operation for [`crate::raw::Accessor::rename`] @@ -87,6 +89,7 @@ impl From<Operation> for &'static str { Operation::CreateDir => "create_dir", Operation::Read => "read", Operation::Write => "write", + Operation::Append => "append", Operation::Copy => "copy", Operation::Rename => "rename", Operation::Stat => "stat", diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs index bc45d145..7cd24611 100644 --- a/core/src/raw/rps.rs +++ b/core/src/raw/rps.rs @@ -195,6 +195,17 @@ impl RpWrite { } } +/// Reply for `append` operation. +#[derive(Debug, Clone, Default)] +pub struct RpAppend {} + +impl RpAppend { + /// Create a new reply for `append`. + pub fn new() -> Self { + Self {} + } +} + /// Reply for `copy` operation. #[derive(Debug, Clone, Default)] pub struct RpCopy {} diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index ac6d2e0f..9712c4c9 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -449,6 +449,7 @@ impl Accessor for AzblobBackend { type BlockingReader = (); type Writer = AzblobWriter; type BlockingWriter = (); + type Appender = (); type Pager = AzblobPager; type BlockingPager = (); diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 57807205..d428a780 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -304,6 +304,7 @@ impl Accessor for AzdfsBackend { type BlockingReader = (); type Writer = AzdfsWriter; type BlockingWriter = (); + type Appender = (); type Pager = AzdfsPager; type BlockingPager = (); diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 6b5aaaea..282656d9 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -296,6 +296,7 @@ impl Accessor for FsBackend { type BlockingReader = oio::into_blocking_reader::FdReader<std::fs::File>; type Writer = FsWriter<tokio::fs::File>; type BlockingWriter = FsWriter<std::fs::File>; + type Appender = (); type Pager = Option<FsPager<tokio::fs::ReadDir>>; type BlockingPager = Option<FsPager<std::fs::ReadDir>>; diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index d5ae8ec0..3efd60b0 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -314,6 +314,7 @@ impl Accessor for FtpBackend { type BlockingReader = (); type Writer = FtpWriter; type BlockingWriter = (); + type Appender = (); type Pager = FtpPager; type BlockingPager = (); diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 188b9713..dcc646eb 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -396,6 +396,7 @@ impl Accessor for GcsBackend { type BlockingReader = (); type Writer = GcsWriter; type BlockingWriter = (); + type Appender = (); type Pager = GcsPager; type BlockingPager = (); diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 03d8c83e..80242607 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -56,6 +56,7 @@ impl Accessor for GdriveBackend { type BlockingReader = (); type Writer = GdriveWriter; type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/ghac/backend.rs b/core/src/services/ghac/backend.rs index cd264bb3..380dd9a2 100644 --- a/core/src/services/ghac/backend.rs +++ b/core/src/services/ghac/backend.rs @@ -296,6 +296,7 @@ impl Accessor for GhacBackend { type BlockingReader = (); type Writer = GhacWriter; type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 81d76699..bce999bd 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -235,6 +235,7 @@ impl Accessor for HdfsBackend { type BlockingReader = oio::into_blocking_reader::FdReader<hdrs::File>; type Writer = HdfsWriter<hdrs::AsyncFile>; type BlockingWriter = HdfsWriter<hdrs::File>; + type Appender = (); type Pager = Option<HdfsPager>; type BlockingPager = Option<HdfsPager>; diff --git a/core/src/services/http/backend.rs b/core/src/services/http/backend.rs index 3aa60878..8be0f16a 100644 --- a/core/src/services/http/backend.rs +++ b/core/src/services/http/backend.rs @@ -256,6 +256,7 @@ impl Accessor for HttpBackend { type BlockingReader = (); type Writer = (); type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/ipfs/backend.rs b/core/src/services/ipfs/backend.rs index 78da66f9..2436c578 100644 --- a/core/src/services/ipfs/backend.rs +++ b/core/src/services/ipfs/backend.rs @@ -217,6 +217,7 @@ impl Accessor for IpfsBackend { type BlockingReader = (); type Writer = (); type BlockingWriter = (); + type Appender = (); type Pager = DirStream; type BlockingPager = (); diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index 28129f86..43c69f8e 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -84,6 +84,7 @@ impl Accessor for IpmfsBackend { type BlockingReader = (); type Writer = IpmfsWriter; type BlockingWriter = (); + type Appender = (); type Pager = IpmfsPager; type BlockingPager = (); diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 29a745ca..3a9e1b84 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -301,6 +301,7 @@ impl Accessor for ObsBackend { type BlockingReader = (); type Writer = ObsWriter; type BlockingWriter = (); + type Appender = (); type Pager = ObsPager; type BlockingPager = (); diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 0bf36f32..56c535ad 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -78,6 +78,7 @@ impl Accessor for OnedriveBackend { type BlockingReader = (); type Writer = OneDriveWriter; type BlockingWriter = (); + type Appender = (); type Pager = OnedrivePager; type BlockingPager = (); diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 097da6d2..713236f1 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -441,6 +441,7 @@ impl Accessor for OssBackend { type BlockingReader = (); type Writer = OssWriter; type BlockingWriter = (); + type Appender = (); type Pager = OssPager; type BlockingPager = (); diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 91ff14f1..51ecfccd 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -953,6 +953,7 @@ impl Accessor for S3Backend { type BlockingReader = (); type Writer = S3Writer; type BlockingWriter = (); + type Appender = (); type Pager = S3Pager; type BlockingPager = (); diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs index 69621a33..34f31521 100644 --- a/core/src/services/supabase/backend.rs +++ b/core/src/services/supabase/backend.rs @@ -209,6 +209,7 @@ impl Accessor for SupabaseBackend { type BlockingReader = (); type Writer = SupabaseWriter; type BlockingWriter = (); + type Appender = (); // todo: implement Pager to support list and scan type Pager = (); type BlockingPager = (); diff --git a/core/src/services/vercel_artifacts/backend.rs b/core/src/services/vercel_artifacts/backend.rs index ea850303..c651af5e 100644 --- a/core/src/services/vercel_artifacts/backend.rs +++ b/core/src/services/vercel_artifacts/backend.rs @@ -51,6 +51,7 @@ impl Accessor for VercelArtifactsBackend { type BlockingReader = (); type Writer = VercelArtifactsWriter; type BlockingWriter = (); + type Appender = (); type Pager = (); type BlockingPager = (); diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index fe2344c5..39e7c680 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -896,6 +896,7 @@ impl Accessor for WasabiBackend { type BlockingReader = (); type Writer = WasabiWriter; type BlockingWriter = (); + type Appender = (); type Pager = WasabiPager; type BlockingPager = (); diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 78b6463a..68b54019 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -262,6 +262,7 @@ impl Accessor for WebdavBackend { type BlockingReader = (); type Writer = WebdavWriter; type BlockingWriter = (); + type Appender = (); type Pager = WebdavPager; type BlockingPager = (); diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 10d5f066..01c6801c 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -463,6 +463,7 @@ impl Accessor for WebhdfsBackend { type BlockingReader = (); type Writer = WebhdfsWriter; type BlockingWriter = (); + type Appender = (); type Pager = WebhdfsPager; type BlockingPager = (); diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 1bf9e89b..14685c30 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -86,6 +86,9 @@ pub struct Capability { /// If operator supports write with cache control natively, it will be true. pub write_with_cache_control: bool, + /// If operator supports append natively, it will be true. + pub append: bool, + /// If operator supports create dir natively, it will be true. pub create_dir: bool, diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs index aba48950..78545dda 100644 --- a/core/src/types/ops.rs +++ b/core/src/types/ops.rs @@ -397,6 +397,17 @@ impl OpWrite { } } +/// Args for `append` operation. +#[derive(Debug, Clone, Default)] +pub struct OpAppend {} + +impl OpAppend { + /// Create a new `OpAppend`. + pub fn new() -> Self { + Self::default() + } +} + /// Args for `copy` operation. #[derive(Debug, Clone, Default)] pub struct OpCopy {}
