This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8f8615382 feat(types): synchronous blocking operator and operator's
API (#2924)
8f8615382 is described below
commit 8f8615382a3c18da8bcdeac5195fd5e59b479039
Author: oowl <[email protected]>
AuthorDate: Thu Aug 24 20:50:43 2023 +0800
feat(types): synchronous blocking operator and operator's API (#2924)
---
core/src/types/operator/blocking_operator.rs | 49 ++++++--
core/src/types/operator/operator_functions.rs | 159 +++++++++++++++++++++++++-
core/src/types/operator/operator_futures.rs | 6 +
3 files changed, 202 insertions(+), 12 deletions(-)
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 13ea1ec56..2c373e378 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -593,19 +593,48 @@ impl BlockingOperator {
/// # }
/// ```
pub fn writer(&self, path: &str) -> Result<BlockingWriter> {
+ self.writer_with(path).call()
+ }
+
+ /// Create a new reader with extra options
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::BlockingOperator;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut w = op.writer_with("path/to/file").call()?;
+ /// w.write(vec![0; 4096])?;
+ /// w.write(vec![1; 4096])?;
+ /// w.close()?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn writer_with(&self, path: &str) -> FunctionWriter {
let path = normalize_path(path);
- if !validate_path(&path, EntryMode::FILE) {
- return Err(
- Error::new(ErrorKind::IsADirectory, "write path is a
directory")
- .with_operation("BlockingOperator::writer")
- .with_context("service",
self.info().scheme().into_static())
- .with_context("path", &path),
- );
- }
+ FunctionWriter(OperatorFunction::new(
+ self.inner().clone(),
+ path,
+ OpWrite::default(),
+ |inner, path, args| {
+ let path = normalize_path(&path);
- let op = OpWrite::default();
- BlockingWriter::create(self.inner().clone(), &path, op)
+ if !validate_path(&path, EntryMode::FILE) {
+ return Err(
+ Error::new(ErrorKind::IsADirectory, "write path is a
directory")
+ .with_operation("BlockingOperator::writer_with")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path),
+ );
+ }
+
+ BlockingWriter::create(inner.clone(), &path, args)
+ },
+ ))
}
/// Delete given path.
diff --git a/core/src/types/operator/operator_functions.rs
b/core/src/types/operator/operator_functions.rs
index ba865d9dd..49a0e7e85 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -76,7 +76,35 @@ pub struct FunctionWrite(
);
impl FunctionWrite {
- /// Set the content length for this operation.
+ /// Set the append mode of op.
+ ///
+ /// If the append mode is set, the data will be appended to the end of the
file.
+ ///
+ /// # Notes
+ ///
+ /// Service could return `Unsupported` if the underlying storage does not
support append.
+ pub fn append(mut self, v: bool) -> Self {
+ self.0 = self.0.map_args(|(args, bs)| (args.with_append(v), bs));
+ self
+ }
+
+ /// Set the buffer size of op.
+ ///
+ /// If buffer size is set, the data will be buffered by the underlying
writer.
+ ///
+ /// ## NOTE
+ ///
+ /// Service could have their own minimum buffer size while perform write
operations like
+ /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ pub fn buffer_size(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
+ self
+ }
+
+ /// Set the content length of op.
+ ///
+ /// If the content length is not set, the content length will be
+ /// calculated automatically by buffering part of data.
pub fn content_length(mut self, v: u64) -> Self {
self.0 = self
.0
@@ -84,7 +112,7 @@ impl FunctionWrite {
self
}
- /// Set the content type for this operation.
+ /// Set the content type of option
pub fn content_type(mut self, v: &str) -> Self {
self.0 = self
.0
@@ -92,6 +120,22 @@ impl FunctionWrite {
self
}
+ /// Set the content disposition of option
+ pub fn content_disposition(mut self, v: &str) -> Self {
+ self.0 = self
+ .0
+ .map_args(|(args, bs)| (args.with_content_disposition(v), bs));
+ self
+ }
+
+ /// Set the content type of option
+ pub fn cache_control(mut self, v: &str) -> Self {
+ self.0 = self
+ .0
+ .map_args(|(args, bs)| (args.with_cache_control(v), bs));
+ self
+ }
+
/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<()> {
@@ -99,6 +143,75 @@ impl FunctionWrite {
}
}
+/// Function that generated by [`BlockingOperator::writer_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionWriter(
+ /// The args for FunctionWriter is a bit special because we also
+ /// need to move the bytes input this function.
+ pub(crate) OperatorFunction<OpWrite, BlockingWriter>,
+);
+
+impl FunctionWriter {
+ /// Set the append mode of op.
+ ///
+ /// If the append mode is set, the data will be appended to the end of the
file.
+ ///
+ /// # Notes
+ ///
+ /// Service could return `Unsupported` if the underlying storage does not
support append.
+ pub fn append(mut self, v: bool) -> Self {
+ self.0 = self.0.map_args(|args| args.with_append(v));
+ self
+ }
+
+ /// Set the buffer size of op.
+ ///
+ /// If buffer size is set, the data will be buffered by the underlying
writer.
+ ///
+ /// ## NOTE
+ ///
+ /// Service could have their own minimum buffer size while perform write
operations like
+ /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ pub fn buffer_size(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_buffer_size(v));
+ self
+ }
+
+ /// Set the content length of op.
+ ///
+ /// If the content length is not set, the content length will be
+ /// calculated automatically by buffering part of data.
+ pub fn content_length(mut self, v: u64) -> Self {
+ self.0 = self.0.map_args(|args| args.with_content_length(v));
+ self
+ }
+
+ /// Set the content type of option
+ pub fn content_type(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_content_type(v));
+ self
+ }
+
+ /// Set the content disposition of option
+ pub fn content_disposition(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_content_disposition(v));
+ self
+ }
+
+ /// Set the content type of option
+ pub fn cache_control(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_cache_control(v));
+ self
+ }
+
+ /// Call the function to consume all the input and generate a
+ /// result.
+ pub fn call(self) -> Result<BlockingWriter> {
+ self.0.call()
+ }
+}
+
/// Function that generated by [`BlockingOperator::delete_with`].
///
/// Users can add more options by public functions provided by this struct.
@@ -223,6 +336,48 @@ impl FunctionReader {
self
}
+ /// Sets the content-disposition header that should be send back by the
remote read operation.
+ pub fn override_content_disposition(mut self, content_disposition: &str)
-> Self {
+ self.0 = self
+ .0
+ .map_args(|args|
args.with_override_content_disposition(content_disposition));
+ self
+ }
+
+ /// Sets the cache-control header that should be send back by the remote
read operation.
+ pub fn override_cache_control(mut self, cache_control: &str) -> Self {
+ self.0 = self
+ .0
+ .map_args(|args| args.with_override_cache_control(cache_control));
+ self
+ }
+
+ /// Sets the content-type header that should be send back by the remote
read operation.
+ pub fn override_content_type(mut self, content_type: &str) -> Self {
+ self.0 = self
+ .0
+ .map_args(|args| args.with_override_content_type(content_type));
+ self
+ }
+
+ /// Set the If-Match for this operation.
+ pub fn if_match(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_if_match(v));
+ self
+ }
+
+ /// Set the If-None-Match for this operation.
+ pub fn if_none_match(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_if_none_match(v));
+ self
+ }
+
+ /// Set the version for this operation.
+ pub fn version(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_version(v));
+ self
+ }
+
/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<BlockingReader> {
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index 10a9c061c..7f2df677a 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -546,6 +546,12 @@ impl Future for FutureDelete {
pub struct FutureList(pub(crate) OperatorFuture<OpList, Vec<Entry>>);
impl FutureList {
+ /// Change the limit of this list operation.
+ pub fn limit(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_limit(v));
+ self
+ }
+
/// Change the start_after of this list operation.
pub fn start_after(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_start_after(v));