This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new cbaf1141 feat(core): Add trait and public API for `append` (#2260)
cbaf1141 is described below
commit cbaf11411315aa5fea07acc1437569ee93c711ec
Author: Suyan <[email protected]>
AuthorDate: Tue May 16 21:55:42 2023 +0800
feat(core): Add trait and public API for `append` (#2260)
* feat(core): append
Signed-off-by: suyanhanx <[email protected]>
* fix unit
Signed-off-by: suyanhanx <[email protected]>
* forward to inner in layers
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* fix fmt
Signed-off-by: suyanhanx <[email protected]>
* pick a new file
Signed-off-by: suyanhanx <[email protected]>
* fix header
Signed-off-by: suyanhanx <[email protected]>
* fix wrong comment
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
bin/oay/src/bin/oay.rs | 4 +-
bindings/c/src/lib.rs | 4 +-
bindings/c/src/result.rs | 3 +-
core/src/layers/chaos.rs | 5 +
core/src/layers/complete.rs | 56 +++++++++++
core/src/layers/concurrent_limit.rs | 26 +++++
core/src/layers/error_context.rs | 42 ++++++++
core/src/layers/immutable_index.rs | 5 +
core/src/layers/logging.rs | 138 ++++++++++++++++++++++++++
core/src/layers/madsim.rs | 12 +++
core/src/layers/metrics.rs | 5 +
core/src/layers/minitrace.rs | 5 +
core/src/layers/oteltrace.rs | 5 +
core/src/layers/prometheus.rs | 5 +
core/src/layers/retry.rs | 65 ++++++++++++
core/src/layers/tracing.rs | 5 +
core/src/layers/type_eraser.rs | 8 ++
core/src/raw/accessor.rs | 26 +++++
core/src/raw/adapters/kv/backend.rs | 1 +
core/src/raw/adapters/typed_kv/api.rs | 3 +-
core/src/raw/adapters/typed_kv/backend.rs | 1 +
core/src/raw/layer.rs | 14 +++
core/src/raw/oio/append.rs | 108 ++++++++++++++++++++
core/src/raw/oio/into_reader/by_range.rs | 1 +
core/src/raw/oio/mod.rs | 5 +
core/src/raw/oio/to_flat_pager.rs | 1 +
core/src/raw/oio/write.rs | 6 +-
core/src/raw/operation.rs | 3 +
core/src/raw/rps.rs | 11 ++
core/src/services/azblob/backend.rs | 1 +
core/src/services/azdfs/backend.rs | 1 +
core/src/services/fs/backend.rs | 1 +
core/src/services/ftp/backend.rs | 1 +
core/src/services/gcs/backend.rs | 1 +
core/src/services/gdrive/backend.rs | 36 ++++---
core/src/services/gdrive/builder.rs | 6 +-
core/src/services/gdrive/core.rs | 13 ++-
core/src/services/ghac/backend.rs | 1 +
core/src/services/hdfs/backend.rs | 1 +
core/src/services/http/backend.rs | 1 +
core/src/services/ipfs/backend.rs | 1 +
core/src/services/ipmfs/backend.rs | 1 +
core/src/services/obs/backend.rs | 1 +
core/src/services/onedrive/backend.rs | 43 ++++++--
core/src/services/onedrive/graph_model.rs | 3 +-
core/src/services/onedrive/pager.rs | 27 +++--
core/src/services/onedrive/writer.rs | 8 +-
core/src/services/oss/backend.rs | 1 +
core/src/services/s3/backend.rs | 1 +
core/src/services/sftp/backend.rs | 3 +-
core/src/services/sftp/error.rs | 6 +-
core/src/services/sftp/writer.rs | 4 +-
core/src/services/supabase/backend.rs | 1 +
core/src/services/vercel_artifacts/backend.rs | 35 ++++---
core/src/services/wasabi/backend.rs | 1 +
core/src/services/webdav/backend.rs | 1 +
core/src/services/webhdfs/backend.rs | 1 +
core/src/types/capability.rs | 3 +
core/src/types/ops.rs | 11 ++
59 files changed, 716 insertions(+), 72 deletions(-)
diff --git a/bin/oay/src/bin/oay.rs b/bin/oay/src/bin/oay.rs
index b5f4813f..8be9c719 100644
--- a/bin/oay/src/bin/oay.rs
+++ b/bin/oay/src/bin/oay.rs
@@ -21,7 +21,9 @@ use oay::Config;
use opendal::services::Memory;
use opendal::Operator;
use std::sync::Arc;
-use tracing_subscriber::{fmt, prelude::*, EnvFilter};
+use tracing_subscriber::fmt;
+use tracing_subscriber::prelude::*;
+use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<()> {
diff --git a/bindings/c/src/lib.rs b/bindings/c/src/lib.rs
index 649154ea..c2e8893e 100644
--- a/bindings/c/src/lib.rs
+++ b/bindings/c/src/lib.rs
@@ -28,7 +28,9 @@ use std::str::FromStr;
use ::opendal as od;
use error::opendal_code;
-use result::{opendal_result_is_exist, opendal_result_read,
opendal_result_stat};
+use result::opendal_result_is_exist;
+use result::opendal_result_read;
+use result::opendal_result_stat;
use types::opendal_metadata;
use crate::types::opendal_bytes;
diff --git a/bindings/c/src/result.rs b/bindings/c/src/result.rs
index 36591c30..74f0b9a5 100644
--- a/bindings/c/src/result.rs
+++ b/bindings/c/src/result.rs
@@ -21,7 +21,8 @@
//! we are defining all Result types here
use crate::error::opendal_code;
-use crate::types::{opendal_bytes, opendal_metadata};
+use crate::types::opendal_bytes;
+use crate::types::opendal_metadata;
/// The Rust-like Result type of opendal C binding, it contains
/// the data that the read operation returns and a error code
diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs
index 86f4f14e..7f7eb4b0 100644
--- a/core/src/layers/chaos.rs
+++ b/core/src/layers/chaos.rs
@@ -107,6 +107,7 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
type BlockingReader = ChaosReader<A::BlockingReader>;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
+ type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;
@@ -135,6 +136,10 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.blocking_write(path, args)
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
self.inner.list(path, args).await
}
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index eec72d56..e4488c6f 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -327,6 +327,7 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
type BlockingReader = CompleteReader<A, A::BlockingReader>;
type Writer = CompleteWriter<A::Writer>;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
+ type Appender = CompleteAppender<A::Appender>;
type Pager = CompletePager<A, A::Pager>;
type BlockingPager = CompletePager<A, A::BlockingPager>;
@@ -375,6 +376,13 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
.map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner
+ .append(path, args)
+ .await
+ .map(|(rp, a)| (rp, CompleteAppender::new(a)))
+ }
+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
self.complete_list(path, args).await
}
@@ -646,3 +654,51 @@ where
Ok(())
}
}
+
+pub struct CompleteAppender<A> {
+ inner: Option<A>,
+}
+
+impl<A> CompleteAppender<A> {
+ pub fn new(inner: A) -> CompleteAppender<A> {
+ CompleteAppender { inner: Some(inner) }
+ }
+}
+
+/// Check if the appender has been closed while debug_assertions enabled.
+/// This code will never be executed in release mode.
+#[cfg(debug_assertions)]
+impl<A> Drop for CompleteAppender<A> {
+ fn drop(&mut self) {
+ if self.inner.is_none() {
+ // Do we need to panic here?
+ log::warn!("appender has not been closed, must be a bug")
+ }
+ }
+}
+
+#[async_trait]
+impl<A> oio::Append for CompleteAppender<A>
+where
+ A: oio::Append,
+{
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let a = self
+ .inner
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has
been closed"))?;
+
+ a.append(bs).await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ let a = self
+ .inner
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has
been closed"))?;
+
+ a.close().await?;
+ self.inner = None;
+ Ok(())
+ }
+}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 07c4cfc4..5228eace 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -87,6 +87,7 @@ impl<A: Accessor> LayeredAccessor for
ConcurrentLimitAccessor<A> {
type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
type Writer = ConcurrentLimitWrapper<A::Writer>;
type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
+ type Appender = ConcurrentLimitWrapper<A::Appender>;
type Pager = ConcurrentLimitWrapper<A::Pager>;
type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>;
@@ -132,6 +133,20 @@ impl<A: Accessor> LayeredAccessor for
ConcurrentLimitAccessor<A> {
.map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ let permit = self
+ .semaphore
+ .clone()
+ .acquire_owned()
+ .await
+ .expect("semaphore must be valid");
+
+ self.inner
+ .append(path, args)
+ .await
+ .map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit)))
+ }
+
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self
.semaphore
@@ -309,6 +324,17 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
ConcurrentLimitWrapper<R> {
}
}
+#[async_trait]
+impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ self.inner.append(bs).await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await
+ }
+}
+
#[async_trait]
impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 194b3168..a23b78dd 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -26,6 +26,7 @@ use bytes::Bytes;
use futures::TryFutureExt;
use crate::ops::*;
+use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
@@ -71,6 +72,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A>
{
type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
type Writer = ErrorContextWrapper<A::Writer>;
type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
+ type Appender = ErrorContextWrapper<A::Appender>;
type Pager = ErrorContextWrapper<A::Pager>;
type BlockingPager = ErrorContextWrapper<A::BlockingPager>;
@@ -138,6 +140,27 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
.await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner
+ .append(path, args)
+ .map_ok(|(rp, os)| {
+ (
+ rp,
+ ErrorContextWrapper {
+ scheme: self.meta.scheme(),
+ path: path.to_string(),
+ inner: os,
+ },
+ )
+ })
+ .map_err(|err| {
+ err.with_operation(Operation::Append)
+ .with_context("service", self.meta.scheme())
+ .with_context("path", path)
+ })
+ .await
+ }
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
self.inner
.copy(from, to, args)
@@ -447,6 +470,25 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for
ErrorContextWrapper<T> {
}
}
+#[async_trait::async_trait]
+impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ self.inner.append(bs).await.map_err(|err| {
+ err.with_operation(AppendOperation::Append)
+ .with_context("service", self.scheme)
+ .with_context("path", &self.path)
+ })
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await.map_err(|err| {
+ err.with_operation(AppendOperation::Close)
+ .with_context("service", self.scheme)
+ .with_context("path", &self.path)
+ })
+ }
+}
+
#[async_trait::async_trait]
impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
diff --git a/core/src/layers/immutable_index.rs
b/core/src/layers/immutable_index.rs
index 01b4d8f0..70f3842d 100644
--- a/core/src/layers/immutable_index.rs
+++ b/core/src/layers/immutable_index.rs
@@ -139,6 +139,7 @@ impl<A: Accessor> LayeredAccessor for
ImmutableIndexAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
+ type Appender = A::Appender;
type Pager = ImmutableDir;
type BlockingPager = ImmutableDir;
@@ -194,6 +195,10 @@ impl<A: Accessor> LayeredAccessor for
ImmutableIndexAccessor<A> {
self.inner.blocking_write(path, args)
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
let mut path = path;
if path == "/" {
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index cbbc8ea1..2b900385 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -190,6 +190,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
type BlockingReader = LoggingReader<A::BlockingReader>;
type Writer = LoggingWriter<A::Writer>;
type BlockingWriter = LoggingWriter<A::BlockingWriter>;
+ type Appender = LoggingAppender<A::Appender>;
type Pager = LoggingPager<A::Pager>;
type BlockingPager = LoggingPager<A::BlockingPager>;
@@ -340,6 +341,51 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ debug!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} -> started",
+ self.scheme,
+ Operation::Append,
+ path
+ );
+
+ self.inner
+ .append(path, args)
+ .await
+ .map(|(rp, a)| {
+ debug!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} -> start appending",
+ self.scheme,
+ Operation::Append,
+ path,
+ );
+ let a = LoggingAppender::new(
+ self.scheme,
+ Operation::Append,
+ path,
+ a,
+ self.failure_level,
+ );
+ (rp, a)
+ })
+ .map_err(|err| {
+ if let Some(lvl) = self.err_level(&err) {
+ log!(
+ target: LOGGING_TARGET,
+ lvl,
+ "service={} operation={} path={} -> {}: {err:?}",
+ self.scheme,
+ Operation::Append,
+ path,
+ self.err_status(&err)
+ )
+ };
+ err
+ })
+ }
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
debug!(
target: LOGGING_TARGET,
@@ -1424,6 +1470,98 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
}
}
+pub struct LoggingAppender<A> {
+ scheme: Scheme,
+ op: Operation,
+ path: String,
+
+ failure_level: Option<Level>,
+
+ inner: A,
+}
+
+impl<A> LoggingAppender<A> {
+ fn new(
+ scheme: Scheme,
+ op: Operation,
+ path: &str,
+ appender: A,
+ failure_level: Option<Level>,
+ ) -> Self {
+ Self {
+ scheme,
+ op,
+ path: path.to_string(),
+
+ failure_level,
+
+ inner: appender,
+ }
+ }
+}
+
+#[async_trait]
+impl<A: oio::Append> oio::Append for LoggingAppender<A> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let len = bs.len();
+
+ match self.inner.append(bs).await {
+ Ok(_) => {
+ trace!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} -> data append {}B",
+ self.scheme,
+ self.op,
+ self.path,
+ len
+ );
+ Ok(())
+ }
+ Err(err) => {
+ if let Some(lvl) = self.failure_level {
+ log!(
+ target: LOGGING_TARGET,
+ lvl,
+ "service={} operation={} path={} -> data append
failed: {err:?}",
+ self.scheme,
+ self.op,
+ self.path,
+ )
+ }
+ Err(err)
+ }
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ match self.inner.close().await {
+ Ok(_) => {
+ debug!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} -> data appended
finished",
+ self.scheme,
+ self.op,
+ self.path,
+ );
+ Ok(())
+ }
+ Err(err) => {
+ if let Some(lvl) = self.failure_level {
+ log!(
+ target: LOGGING_TARGET,
+ lvl,
+ "service={} operation={} path={} -> data appender
close failed: {err:?}",
+ self.scheme,
+ self.op,
+ self.path,
+ )
+ }
+ Err(err)
+ }
+ }
+ }
+}
+
pub struct LoggingPager<P> {
scheme: Scheme,
path: String,
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index b9138d74..a70593e7 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -150,6 +150,7 @@ impl LayeredAccessor for MadsimAccessor {
type BlockingReader = ();
type Writer = MadsimWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = MadsimPager;
type BlockingPager = ();
@@ -221,6 +222,17 @@ impl LayeredAccessor for MadsimAccessor {
}
}
+ async fn append(
+ &self,
+ path: &str,
+ args: OpAppend,
+ ) -> crate::Result<(RpAppend, Self::Appender)> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "will not be supported in MadsimLayer",
+ ))
+ }
+
async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList,
Self::Pager)> {
Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index 4dd7b981..1a990b78 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -413,6 +413,7 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
type BlockingReader = MetricWrapper<A::BlockingReader>;
type Writer = MetricWrapper<A::Writer>;
type BlockingWriter = MetricWrapper<A::BlockingWriter>;
+ type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;
@@ -510,6 +511,10 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
.await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.handle.requests_total_stat.increment(1);
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 39a754af..d0a5939e 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -135,6 +135,7 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
type BlockingReader = MinitraceWrapper<A::BlockingReader>;
type Writer = MinitraceWrapper<A::Writer>;
type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
+ type Appender = A::Appender;
type Pager = MinitraceWrapper<A::Pager>;
type BlockingPager = MinitraceWrapper<A::BlockingPager>;
@@ -168,6 +169,10 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A>
{
.await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
#[trace("copy", enter_on_poll = true)]
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
self.inner().copy(from, to, args).await
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 8f754f0d..4894ad9b 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -73,6 +73,7 @@ impl<A: Accessor> LayeredAccessor for OtelTraceAccessor<A> {
type BlockingReader = OtelTraceWrapper<A::BlockingReader>;
type Writer = OtelTraceWrapper<A::Writer>;
type BlockingWriter = OtelTraceWrapper<A::BlockingWriter>;
+ type Appender = A::Appender;
type Pager = OtelTraceWrapper<A::Pager>;
type BlockingPager = OtelTraceWrapper<A::BlockingPager>;
@@ -116,6 +117,10 @@ impl<A: Accessor> LayeredAccessor for OtelTraceAccessor<A>
{
.map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
let tracer = global::tracer("opendal");
let mut span = tracer.start("copy");
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 394248ae..8f6672b5 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -193,6 +193,7 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A>
{
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
type Writer = PrometheusMetricWrapper<A::Writer>;
type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
+ type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;
@@ -298,6 +299,10 @@ impl<A: Accessor> LayeredAccessor for
PrometheusAccessor<A> {
})
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.stats
.requests_total
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 8ebd3838..8fcf615f 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -35,6 +35,7 @@ use futures::FutureExt;
use log::warn;
use crate::ops::*;
+use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
@@ -161,6 +162,7 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
type BlockingReader = RetryWrapper<A::BlockingReader>;
type Writer = RetryWrapper<A::Writer>;
type BlockingWriter = RetryWrapper<A::BlockingWriter>;
+ type Appender = RetryWrapper<A::Appender>;
type Pager = RetryWrapper<A::Pager>;
type BlockingPager = RetryWrapper<A::BlockingPager>;
@@ -219,6 +221,23 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
.await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ { || self.inner.append(path, args.clone()) }
+ .retry(&self.builder)
+ .when(|e| e.is_temporary())
+ .notify(|err, dur| {
+ warn!(
+ target: "opendal::service",
+ "operation={} -> retry after {}s: error={:?}",
+ Operation::Append, dur.as_secs_f64(), err)
+ })
+ .map(|v| {
+ v.map(|(rp, r)| (rp, RetryWrapper::new(r, path,
self.builder.clone())))
+ .map_err(|e| e.set_persistent())
+ })
+ .await
+ }
+
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
{ || self.inner.stat(path, args.clone()) }
.retry(&self.builder)
@@ -703,6 +722,51 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for
RetryWrapper<R> {
}
}
+#[async_trait]
+impl<A: oio::Append> oio::Append for RetryWrapper<A> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let mut backoff = self.builder.build();
+
+ loop {
+ match self.inner.append(bs.clone()).await {
+ Ok(v) => return Ok(v),
+ Err(e) if !e.is_temporary() => return Err(e),
+ Err(e) => match backoff.next() {
+ None => return Err(e),
+ Some(dur) => {
+ warn!(target: "opendal::service",
+ "operation={} path={} -> appender retry after
{}s: error={:?}",
+ AppendOperation::Append, self.path,
dur.as_secs_f64(), e);
+ tokio::time::sleep(dur).await;
+ continue;
+ }
+ },
+ }
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ let mut backoff = self.builder.build();
+
+ loop {
+ match self.inner.close().await {
+ Ok(v) => return Ok(v),
+ Err(e) if !e.is_temporary() => return Err(e),
+ Err(e) => match backoff.next() {
+ None => return Err(e),
+ Some(dur) => {
+ warn!(target: "opendal::service",
+ "operation={} path={} -> appender retry after
{}s: error={:?}",
+ AppendOperation::Close, self.path,
dur.as_secs_f64(), e);
+ tokio::time::sleep(dur).await;
+ continue;
+ }
+ },
+ }
+ }
+ }
+}
+
#[async_trait]
impl<P: oio::Page> oio::Page for RetryWrapper<P> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
@@ -790,6 +854,7 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = MockPager;
type BlockingPager = ();
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 873fef69..96f328a3 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -138,6 +138,7 @@ impl<A: Accessor> LayeredAccessor for TracingAccessor<A> {
type BlockingReader = TracingWrapper<A::BlockingReader>;
type Writer = TracingWrapper<A::Writer>;
type BlockingWriter = TracingWrapper<A::BlockingWriter>;
+ type Appender = A::Appender;
type Pager = TracingWrapper<A::Pager>;
type BlockingPager = TracingWrapper<A::BlockingPager>;
@@ -171,6 +172,10 @@ impl<A: Accessor> LayeredAccessor for TracingAccessor<A> {
.map(|(rp, r)| (rp, TracingWrapper::new(Span::current(), r)))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner.append(path, args).await
+ }
+
#[tracing::instrument(level = "debug", skip(self))]
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
self.inner().copy(from, to, args).await
diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs
index e47124b7..fb236646 100644
--- a/core/src/layers/type_eraser.rs
+++ b/core/src/layers/type_eraser.rs
@@ -58,6 +58,7 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
type BlockingReader = oio::BlockingReader;
type Writer = oio::Writer;
type BlockingWriter = oio::BlockingWriter;
+ type Appender = oio::Appender;
type Pager = oio::Pager;
type BlockingPager = oio::BlockingPager;
@@ -91,6 +92,13 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
.map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner
+ .append(path, args)
+ .await
+ .map(|(rp, a)| (rp, Box::new(a) as oio::Appender))
+ }
+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
self.inner
.list(path, args)
diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs
index 48e2ea64..8fb98d0c 100644
--- a/core/src/raw/accessor.rs
+++ b/core/src/raw/accessor.rs
@@ -71,6 +71,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
/// BlockingPager is the associated pager that could return in
/// `blocking_list` operation.
type BlockingPager: oio::BlockingPage;
+ /// Appender is the associated appender that could return in `append`
operation.
+ type Appender: oio::Append;
/// Invoke the `info` operation to get metadata of accessor.
///
@@ -137,6 +139,23 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
))
}
+ /// Invoke the `append` operation on the specified path, returns a
+ /// appended size if operate successful.
+ ///
+ /// Require [`Capability::append`]
+ ///
+ /// # Behavior
+ ///
+ /// - Input path MUST be file path, DON'T NEED to check mode.
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ let (_, _) = (path, args);
+
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "operation is not supported",
+ ))
+ }
+
/// Invoke the `copy` operation on the specified `from` path and `to` path.
///
/// Require [Capability::copy]
@@ -371,6 +390,7 @@ impl Accessor for () {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
@@ -392,6 +412,7 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
type BlockingReader = T::BlockingReader;
type Writer = T::Writer;
type BlockingWriter = T::BlockingWriter;
+ type Appender = T::Appender;
type Pager = T::Pager;
type BlockingPager = T::BlockingPager;
@@ -410,6 +431,10 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
self.as_ref().write(path, args).await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.as_ref().append(path, args).await
+ }
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
self.as_ref().copy(from, to, args).await
}
@@ -472,6 +497,7 @@ pub type FusedAccessor = Arc<
BlockingReader = oio::BlockingReader,
Writer = oio::Writer,
BlockingWriter = oio::BlockingWriter,
+ Appender = oio::Appender,
Pager = oio::Pager,
BlockingPager = oio::BlockingPager,
>,
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 3798021a..ee8d32ac 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -63,6 +63,7 @@ impl<S: Adapter> Accessor for Backend<S> {
type BlockingReader = oio::Cursor;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
+ type Appender = ();
type Pager = KvPager;
type BlockingPager = KvPager;
diff --git a/core/src/raw/adapters/typed_kv/api.rs
b/core/src/raw/adapters/typed_kv/api.rs
index 4f81589f..3217db57 100644
--- a/core/src/raw/adapters/typed_kv/api.rs
+++ b/core/src/raw/adapters/typed_kv/api.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::{fmt::Debug, mem::size_of};
+use std::fmt::Debug;
+use std::mem::size_of;
use async_trait::async_trait;
use bytes::Bytes;
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 3bb3147a..e04cd202 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -59,6 +59,7 @@ impl<S: Adapter> Accessor for Backend<S> {
type BlockingReader = oio::Cursor;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
+ type Appender = ();
type Pager = KvPager;
type BlockingPager = KvPager;
diff --git a/core/src/raw/layer.rs b/core/src/raw/layer.rs
index 7535aa2d..7c4f93f6 100644
--- a/core/src/raw/layer.rs
+++ b/core/src/raw/layer.rs
@@ -63,6 +63,7 @@ use crate::*;
/// type BlockingReader = A::BlockingReader;
/// type Writer = A::Writer;
/// type BlockingWriter = A::BlockingWriter;
+/// type Appender = A::Appender;
/// type Pager = A::Pager;
/// type BlockingPager = A::BlockingPager;
///
@@ -94,6 +95,10 @@ use crate::*;
/// self.inner.blocking_write(path, args)
/// }
///
+/// async fn append(&self, path: &str, args: OpAppend) ->
Result<(RpAppend, Self::Appender)> {
+/// self.inner.append(path, args).await
+/// }
+///
/// async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
/// self.inner.list(path, args).await
/// }
@@ -134,6 +139,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin +
'static {
type BlockingReader: oio::BlockingRead;
type Writer: oio::Write;
type BlockingWriter: oio::BlockingWrite;
+ type Appender: oio::Append;
type Pager: oio::Page;
type BlockingPager: oio::BlockingPage;
@@ -151,6 +157,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin +
'static {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)>;
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)>;
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
self.inner().copy(from, to, args).await
}
@@ -210,6 +218,7 @@ impl<L: LayeredAccessor> Accessor for L {
type BlockingReader = L::BlockingReader;
type Writer = L::Writer;
type BlockingWriter = L::BlockingWriter;
+ type Appender = L::Appender;
type Pager = L::Pager;
type BlockingPager = L::BlockingPager;
@@ -229,6 +238,10 @@ impl<L: LayeredAccessor> Accessor for L {
(self as &L).write(path, args).await
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ (self as &L).append(path, args).await
+ }
+
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
(self as &L).copy(from, to, args).await
}
@@ -323,6 +336,7 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/raw/oio/append.rs b/core/src/raw/oio/append.rs
new file mode 100644
index 00000000..6b5d576d
--- /dev/null
+++ b/core/src/raw/oio/append.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Display;
+use std::fmt::Formatter;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::*;
+
+/// AppendOperation is the name for APIs of Append.
+#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
+#[non_exhaustive]
+pub enum AppendOperation {
+ /// Operation for [`Append::append`]
+ Append,
+ /// Operation for [`Append::close`]
+ Close,
+}
+
+impl AppendOperation {
+ /// Convert self into static str.
+ pub fn into_static(self) -> &'static str {
+ self.into()
+ }
+}
+
+impl Display for AppendOperation {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.into_static())
+ }
+}
+
+impl From<AppendOperation> for &'static str {
+ fn from(v: AppendOperation) -> &'static str {
+ use AppendOperation::*;
+
+ match v {
+ Append => "Append::append",
+ Close => "Append::close",
+ }
+ }
+}
+
+/// Appender is a type erased [`Append`]
+pub type Appender = Box<dyn Append>;
+
+/// Append is the trait that OpenDAL returns to callers.
+///
+/// # Notes
+///
+/// Users will call `append` multiple times.
+#[async_trait]
+pub trait Append: Unpin + Send + Sync {
+ /// Append data to the end of file.
+ ///
+ /// Users will call `append` multiple times.
+ /// Please make sure `append` is safe to re-enter.
+ async fn append(&mut self, bs: Bytes) -> Result<()>;
+
+ /// Seal the file to mark it as unmodifiable.
+ async fn close(&mut self) -> Result<()>;
+}
+
+#[async_trait]
+impl Append for () {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let _ = bs;
+
+ unimplemented!("append is required to be implemented for oio::Append")
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "output appender doesn't support close",
+ ))
+ }
+}
+
+/// `Box<dyn Append>` won't implement `Append` automatically.
+///
+/// To make Appender work as expected, we must add this impl.
+#[async_trait]
+impl<T: Append + ?Sized> Append for Box<T> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ (**self).append(bs).await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ (**self).close().await
+ }
+}
diff --git a/core/src/raw/oio/into_reader/by_range.rs
b/core/src/raw/oio/into_reader/by_range.rs
index 71bb6906..5e37bc54 100644
--- a/core/src/raw/oio/into_reader/by_range.rs
+++ b/core/src/raw/oio/into_reader/by_range.rs
@@ -318,6 +318,7 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 56b48dda..ce15d4f1 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -41,6 +41,11 @@ pub use write::Write;
pub use write::WriteOperation;
pub use write::Writer;
+mod append;
+pub use append::Append;
+pub use append::AppendOperation;
+pub use append::Appender;
+
mod cursor;
pub use cursor::Cursor;
pub use cursor::VectorCursor;
diff --git a/core/src/raw/oio/to_flat_pager.rs
b/core/src/raw/oio/to_flat_pager.rs
index fdc9ffee..720ee3f8 100644
--- a/core/src/raw/oio/to_flat_pager.rs
+++ b/core/src/raw/oio/to_flat_pager.rs
@@ -249,6 +249,7 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = MockPager;
diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs
index dade202c..15f7b63c 100644
--- a/core/src/raw/oio/write.rs
+++ b/core/src/raw/oio/write.rs
@@ -123,8 +123,9 @@ impl Write for () {
}
}
-/// `Box<dyn Write>` won't implement `Write` automatically. To make Writer
-/// work as expected, we must add this impl.
+/// `Box<dyn Write>` won't implement `Write` automatically.
+///
+/// To make Writer work as expected, we must add this impl.
#[async_trait]
impl<T: Write + ?Sized> Write for Box<T> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
@@ -168,6 +169,7 @@ impl BlockingWrite for () {
}
/// `Box<dyn BlockingWrite>` won't implement `BlockingWrite` automatically.
+///
/// To make BlockingWriter work as expected, we must add this impl.
impl<T: BlockingWrite + ?Sized> BlockingWrite for Box<T> {
fn write(&mut self, bs: Bytes) -> Result<()> {
diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs
index a7b81c62..d0f61b47 100644
--- a/core/src/raw/operation.rs
+++ b/core/src/raw/operation.rs
@@ -31,6 +31,8 @@ pub enum Operation {
Read,
/// Operation for [`crate::raw::Accessor::write`]
Write,
+ /// Operation for [`crate::raw::Accessor::append`]
+ Append,
/// Operation for [`crate::raw::Accessor::copy`]
Copy,
/// Operation for [`crate::raw::Accessor::rename`]
@@ -83,6 +85,7 @@ impl From<Operation> for &'static str {
Operation::CreateDir => "create_dir",
Operation::Read => "read",
Operation::Write => "write",
+ Operation::Append => "append",
Operation::Copy => "copy",
Operation::Rename => "rename",
Operation::Stat => "stat",
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index bc45d145..7cd24611 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -195,6 +195,17 @@ impl RpWrite {
}
}
+/// Reply for `append` operation.
+#[derive(Debug, Clone, Default)]
+pub struct RpAppend {}
+
+impl RpAppend {
+ /// Create a new reply for `append`.
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
/// Reply for `copy` operation.
#[derive(Debug, Clone, Default)]
pub struct RpCopy {}
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index ac6d2e0f..9712c4c9 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -449,6 +449,7 @@ impl Accessor for AzblobBackend {
type BlockingReader = ();
type Writer = AzblobWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = AzblobPager;
type BlockingPager = ();
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index 57807205..d428a780 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -304,6 +304,7 @@ impl Accessor for AzdfsBackend {
type BlockingReader = ();
type Writer = AzdfsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = AzdfsPager;
type BlockingPager = ();
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6b5aaaea..282656d9 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -296,6 +296,7 @@ impl Accessor for FsBackend {
type BlockingReader = oio::into_blocking_reader::FdReader<std::fs::File>;
type Writer = FsWriter<tokio::fs::File>;
type BlockingWriter = FsWriter<std::fs::File>;
+ type Appender = ();
type Pager = Option<FsPager<tokio::fs::ReadDir>>;
type BlockingPager = Option<FsPager<std::fs::ReadDir>>;
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index d5ae8ec0..3efd60b0 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -314,6 +314,7 @@ impl Accessor for FtpBackend {
type BlockingReader = ();
type Writer = FtpWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = FtpPager;
type BlockingPager = ();
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 188b9713..dcc646eb 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -396,6 +396,7 @@ impl Accessor for GcsBackend {
type BlockingReader = ();
type Writer = GcsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = GcsPager;
type BlockingPager = ();
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index 03d8c83e..afbc1a07 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -18,19 +18,28 @@
use async_trait::async_trait;
use http::StatusCode;
-use std::{fmt::Debug, sync::Arc};
-
-use crate::{
- ops::{OpDelete, OpRead, OpWrite},
- raw::{
- parse_into_metadata, Accessor, AccessorInfo, HttpClient,
IncomingAsyncBody, RpDelete,
- RpRead, RpWrite,
- },
- types::Result,
- Capability, Error, ErrorKind,
-};
-
-use super::{core::GdriveCore, error::parse_error, writer::GdriveWriter};
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::ops::OpDelete;
+use crate::ops::OpRead;
+use crate::ops::OpWrite;
+use crate::raw::parse_into_metadata;
+use crate::raw::Accessor;
+use crate::raw::AccessorInfo;
+use crate::raw::HttpClient;
+use crate::raw::IncomingAsyncBody;
+use crate::raw::RpDelete;
+use crate::raw::RpRead;
+use crate::raw::RpWrite;
+use crate::types::Result;
+use crate::Capability;
+use crate::Error;
+use crate::ErrorKind;
+
+use super::core::GdriveCore;
+use super::error::parse_error;
+use super::writer::GdriveWriter;
#[derive(Clone, Debug)]
pub struct GdriveBackend {
@@ -56,6 +65,7 @@ impl Accessor for GdriveBackend {
type BlockingReader = ();
type Writer = GdriveWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/services/gdrive/builder.rs
b/core/src/services/gdrive/builder.rs
index 7a536b1b..9a7a3b38 100644
--- a/core/src/services/gdrive/builder.rs
+++ b/core/src/services/gdrive/builder.rs
@@ -16,12 +16,14 @@
// under the License.
use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
+use std::fmt::Formatter;
use log::debug;
use super::backend::GdriveBackend;
-use crate::raw::{normalize_root, HttpClient};
+use crate::raw::normalize_root;
+use crate::raw::HttpClient;
use crate::Scheme;
use crate::*;
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 6f66bf8b..02fc82cb 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -26,16 +26,19 @@ use crate::raw::HttpClient;
use crate::Error;
use crate::ErrorKind;
+use http::header;
use http::request::Builder;
+use http::Request;
+use http::Response;
use http::StatusCode;
-use http::{header, Request, Response};
use serde::Deserialize;
use tokio::sync::Mutex;
-use crate::{
- raw::{build_rooted_abs_path, new_request_build_error, AsyncBody,
IncomingAsyncBody},
- types::Result,
-};
+use crate::raw::build_rooted_abs_path;
+use crate::raw::new_request_build_error;
+use crate::raw::AsyncBody;
+use crate::raw::IncomingAsyncBody;
+use crate::types::Result;
use super::error::parse_error;
diff --git a/core/src/services/ghac/backend.rs
b/core/src/services/ghac/backend.rs
index cd264bb3..380dd9a2 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -296,6 +296,7 @@ impl Accessor for GhacBackend {
type BlockingReader = ();
type Writer = GhacWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/services/hdfs/backend.rs
b/core/src/services/hdfs/backend.rs
index 81d76699..bce999bd 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -235,6 +235,7 @@ impl Accessor for HdfsBackend {
type BlockingReader = oio::into_blocking_reader::FdReader<hdrs::File>;
type Writer = HdfsWriter<hdrs::AsyncFile>;
type BlockingWriter = HdfsWriter<hdrs::File>;
+ type Appender = ();
type Pager = Option<HdfsPager>;
type BlockingPager = Option<HdfsPager>;
diff --git a/core/src/services/http/backend.rs
b/core/src/services/http/backend.rs
index 3aa60878..8be0f16a 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -256,6 +256,7 @@ impl Accessor for HttpBackend {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/services/ipfs/backend.rs
b/core/src/services/ipfs/backend.rs
index 78da66f9..2436c578 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -217,6 +217,7 @@ impl Accessor for IpfsBackend {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
+ type Appender = ();
type Pager = DirStream;
type BlockingPager = ();
diff --git a/core/src/services/ipmfs/backend.rs
b/core/src/services/ipmfs/backend.rs
index 28129f86..43c69f8e 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -84,6 +84,7 @@ impl Accessor for IpmfsBackend {
type BlockingReader = ();
type Writer = IpmfsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = IpmfsPager;
type BlockingPager = ();
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 1ffa052e..39248a3b 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -301,6 +301,7 @@ impl Accessor for ObsBackend {
type BlockingReader = ();
type Writer = ObsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = ObsPager;
type BlockingPager = ();
diff --git a/core/src/services/onedrive/backend.rs
b/core/src/services/onedrive/backend.rs
index 0bf36f32..fcc10854 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -32,19 +32,39 @@ use super::graph_model::OnedriveGetItemBody;
use super::pager::OnedrivePager;
use super::writer::OneDriveWriter;
use crate::ops::OpCreateDir;
+use crate::ops::OpDelete;
+use crate::ops::OpList;
+use crate::ops::OpRead;
+use crate::ops::OpStat;
+use crate::ops::OpWrite;
+use crate::raw::build_abs_path;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::get_basename;
use crate::raw::get_parent;
+use crate::raw::new_json_deserialize_error;
+use crate::raw::new_json_serialize_error;
+use crate::raw::new_request_build_error;
+use crate::raw::parse_datetime_from_rfc3339;
+use crate::raw::parse_into_metadata;
+use crate::raw::parse_location;
+use crate::raw::percent_encode_path;
+use crate::raw::Accessor;
+use crate::raw::AccessorInfo;
+use crate::raw::AsyncBody;
+use crate::raw::HttpClient;
+use crate::raw::IncomingAsyncBody;
use crate::raw::RpCreateDir;
-use crate::{
- ops::{OpDelete, OpList, OpRead, OpStat, OpWrite},
- raw::{
- build_abs_path, build_rooted_abs_path, get_basename,
new_json_deserialize_error,
- new_json_serialize_error, new_request_build_error,
parse_datetime_from_rfc3339,
- parse_into_metadata, parse_location, percent_encode_path, Accessor,
AccessorInfo,
- AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, RpList, RpRead,
RpStat, RpWrite,
- },
- types::Result,
- Capability, EntryMode, Error, ErrorKind, Metadata,
-};
+use crate::raw::RpDelete;
+use crate::raw::RpList;
+use crate::raw::RpRead;
+use crate::raw::RpStat;
+use crate::raw::RpWrite;
+use crate::types::Result;
+use crate::Capability;
+use crate::EntryMode;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Metadata;
#[derive(Clone)]
pub struct OnedriveBackend {
@@ -78,6 +98,7 @@ impl Accessor for OnedriveBackend {
type BlockingReader = ();
type Writer = OneDriveWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = OnedrivePager;
type BlockingPager = ();
diff --git a/core/src/services/onedrive/graph_model.rs
b/core/src/services/onedrive/graph_model.rs
index 59cff4f2..9d788e26 100644
--- a/core/src/services/onedrive/graph_model.rs
+++ b/core/src/services/onedrive/graph_model.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use serde::{Deserialize, Serialize};
+use serde::Deserialize;
+use serde::Serialize;
use std::collections::HashMap;
diff --git a/core/src/services/onedrive/pager.rs
b/core/src/services/onedrive/pager.rs
index 8a29c7cd..681a17a0 100644
--- a/core/src/services/onedrive/pager.rs
+++ b/core/src/services/onedrive/pager.rs
@@ -15,20 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{
- raw::{
- build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
- oio::{self},
- percent_encode_path, IncomingAsyncBody,
- },
- EntryMode, Metadata,
-};
-
-use super::{
- backend::OnedriveBackend,
- error::parse_error,
- graph_model::{GraphApiOnedriveListResponse, ItemType},
-};
+use crate::raw::build_rel_path;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::new_json_deserialize_error;
+use crate::raw::oio::{self};
+use crate::raw::percent_encode_path;
+use crate::raw::IncomingAsyncBody;
+use crate::EntryMode;
+use crate::Metadata;
+
+use super::backend::OnedriveBackend;
+use super::error::parse_error;
+use super::graph_model::GraphApiOnedriveListResponse;
+use super::graph_model::ItemType;
use crate::Result;
use async_trait::async_trait;
use http::Response;
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 5779b4b6..c071f6ca 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,14 +16,14 @@
// under the License.
use async_trait::async_trait;
-use bytes::{Buf, Bytes};
+use bytes::Buf;
+use bytes::Bytes;
use http::StatusCode;
use super::backend::OnedriveBackend;
use super::error::parse_error;
-use super::graph_model::{
- OneDriveUploadSessionCreationRequestBody,
OneDriveUploadSessionCreationResponseBody,
-};
+use super::graph_model::OneDriveUploadSessionCreationRequestBody;
+use super::graph_model::OneDriveUploadSessionCreationResponseBody;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 097da6d2..713236f1 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -441,6 +441,7 @@ impl Accessor for OssBackend {
type BlockingReader = ();
type Writer = OssWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = OssPager;
type BlockingPager = ();
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 91ff14f1..51ecfccd 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -953,6 +953,7 @@ impl Accessor for S3Backend {
type BlockingReader = ();
type Writer = S3Writer;
type BlockingWriter = ();
+ type Appender = ();
type Pager = S3Pager;
type BlockingPager = ();
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
index bfdd9821..8f9b7be0 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -80,7 +80,7 @@ use crate::*;
///
/// ```no_run
/// use anyhow::Result;
-/// use opendal::services::Ftp;
+/// use opendal::services::Sftp;
/// use opendal::Object;
/// use opendal::Operator;
///
@@ -265,6 +265,7 @@ impl Accessor for SftpBackend {
type BlockingReader = ();
type Writer = SftpWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = Option<SftpPager>;
type BlockingPager = ();
diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs
index 45178712..9c0124e7 100644
--- a/core/src/services/sftp/error.rs
+++ b/core/src/services/sftp/error.rs
@@ -16,9 +16,11 @@
// under the License.
use openssh::Error as SshError;
-use openssh_sftp_client::{error::SftpErrorKind, Error as SftpClientError};
+use openssh_sftp_client::error::SftpErrorKind;
+use openssh_sftp_client::Error as SftpClientError;
-use crate::{Error, ErrorKind};
+use crate::Error;
+use crate::ErrorKind;
#[derive(Debug)]
pub enum SftpError {
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index 48126ca8..445a3457 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -20,7 +20,9 @@ use bytes::Bytes;
use openssh_sftp_client::file::File;
use crate::raw::oio;
-use crate::{Error, ErrorKind, Result};
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
pub struct SftpWriter {
file: File,
diff --git a/core/src/services/supabase/backend.rs
b/core/src/services/supabase/backend.rs
index 69621a33..34f31521 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -209,6 +209,7 @@ impl Accessor for SupabaseBackend {
type BlockingReader = ();
type Writer = SupabaseWriter;
type BlockingWriter = ();
+ type Appender = ();
// todo: implement Pager to support list and scan
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/services/vercel_artifacts/backend.rs
b/core/src/services/vercel_artifacts/backend.rs
index ea850303..d8804076 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -16,20 +16,30 @@
// under the License.
use async_trait::async_trait;
-use http::{header, Request, Response, StatusCode};
+use http::header;
+use http::Request;
+use http::Response;
+use http::StatusCode;
use std::fmt::Debug;
-use crate::{
- ops::{OpRead, OpWrite},
- raw::{
- new_request_build_error, parse_into_metadata, Accessor, AccessorInfo,
AsyncBody,
- HttpClient, IncomingAsyncBody, RpRead, RpWrite,
- },
- types::Result,
- Capability, Error, ErrorKind,
-};
-
-use super::{error::parse_error, writer::VercelArtifactsWriter};
+use crate::ops::OpRead;
+use crate::ops::OpWrite;
+use crate::raw::new_request_build_error;
+use crate::raw::parse_into_metadata;
+use crate::raw::Accessor;
+use crate::raw::AccessorInfo;
+use crate::raw::AsyncBody;
+use crate::raw::HttpClient;
+use crate::raw::IncomingAsyncBody;
+use crate::raw::RpRead;
+use crate::raw::RpWrite;
+use crate::types::Result;
+use crate::Capability;
+use crate::Error;
+use crate::ErrorKind;
+
+use super::error::parse_error;
+use super::writer::VercelArtifactsWriter;
#[derive(Clone)]
pub struct VercelArtifactsBackend {
@@ -51,6 +61,7 @@ impl Accessor for VercelArtifactsBackend {
type BlockingReader = ();
type Writer = VercelArtifactsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = ();
type BlockingPager = ();
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
index fe2344c5..39e7c680 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -896,6 +896,7 @@ impl Accessor for WasabiBackend {
type BlockingReader = ();
type Writer = WasabiWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = WasabiPager;
type BlockingPager = ();
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index 78b6463a..68b54019 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -262,6 +262,7 @@ impl Accessor for WebdavBackend {
type BlockingReader = ();
type Writer = WebdavWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = WebdavPager;
type BlockingPager = ();
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 10d5f066..01c6801c 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -463,6 +463,7 @@ impl Accessor for WebhdfsBackend {
type BlockingReader = ();
type Writer = WebhdfsWriter;
type BlockingWriter = ();
+ type Appender = ();
type Pager = WebhdfsPager;
type BlockingPager = ();
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index 1bf9e89b..14685c30 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -86,6 +86,9 @@ pub struct Capability {
/// If operator supports write with cache control natively, it will be
true.
pub write_with_cache_control: bool,
+ /// If operator supports append natively, it will be true.
+ pub append: bool,
+
/// If operator supports create dir natively, it will be true.
pub create_dir: bool,
diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs
index aba48950..78545dda 100644
--- a/core/src/types/ops.rs
+++ b/core/src/types/ops.rs
@@ -397,6 +397,17 @@ impl OpWrite {
}
}
+/// Args for `append` operation.
+#[derive(Debug, Clone, Default)]
+pub struct OpAppend {}
+
+impl OpAppend {
+ /// Create a new `OpAppend`.
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
/// Args for `copy` operation.
#[derive(Debug, Clone, Default)]
pub struct OpCopy {}