This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch feat/copy-if-not-exists in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 6c441d542d4a64d28d3d0b9ea80c01fbc3a4e1e4 Author: Xuanwo <git...@xuanwo.io> AuthorDate: Tue Jul 29 18:35:03 2025 +0800 feat: Add if-not-exists support for copy Signed-off-by: Xuanwo <git...@xuanwo.io> --- core/src/raw/ops.rs | 18 ++++- core/src/services/s3/backend.rs | 6 +- core/src/services/s3/core.rs | 21 +++-- core/src/types/capability.rs | 2 + core/src/types/operator/operator.rs | 121 ++++++++++++++++++++++++++++ core/src/types/operator/operator_futures.rs | 30 +++++++ core/src/types/options.rs | 20 +++++ core/tests/behavior/async_copy.rs | 75 +++++++++++++++++ 8 files changed, 285 insertions(+), 8 deletions(-) diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index fc95611bc..7a6760ab2 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -864,13 +864,29 @@ impl From<options::WriteOptions> for (OpWrite, OpWriter) { /// Args for `copy` operation. #[derive(Debug, Clone, Default)] -pub struct OpCopy {} +pub struct OpCopy { + if_not_exists: bool, +} impl OpCopy { /// Create a new `OpCopy`. pub fn new() -> Self { Self::default() } + + /// Set the if_not_exists flag for the operation. + /// + /// When set to true, the copy operation will only proceed if the destination + /// doesn't already exist. + pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + /// Get if_not_exists flag. + pub fn if_not_exists(&self) -> bool { + self.if_not_exists + } } /// Args for `rename` operation. diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index a36ff1a41..dba0b4d4f 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -964,6 +964,7 @@ impl Builder for S3Builder { delete_with_version: self.config.enable_versioning, copy: true, + copy_with_if_not_exists: true, list: true, list_with_limit: true, @@ -1115,8 +1116,9 @@ impl Access for S3Backend { Ok((RpList::default(), l)) } - async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { - let resp = self.core.s3_copy_object(from, to).await?; + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { + let if_not_exists = args.if_not_exists(); + let resp = self.core.s3_copy_object(from, to, args).await?; let status = resp.status(); diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index d23b8cadc..8554f184a 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -54,6 +54,7 @@ use crate::*; pub mod constants { pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source"; + pub const X_AMZ_COPY_SOURCE_IF_NONE_MATCH: &str = "x-amz-copy-source-if-none-match"; pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester"); @@ -652,7 +653,12 @@ impl S3Core { self.send(req).await } - pub async fn s3_copy_object(&self, from: &str, to: &str) -> Result<Response<Buffer>> { + pub async fn s3_copy_object( + &self, + from: &str, + to: &str, + args: OpCopy, + ) -> Result<Response<Buffer>> { let from = build_abs_path(&self.root, from); let to = build_abs_path(&self.root, to); @@ -661,6 +667,11 @@ impl S3Core { let mut req = Request::put(&target); + // Add if_not_exists condition using native S3 header + if args.if_not_exists() { + req = req.header(constants::X_AMZ_COPY_SOURCE_IF_NONE_MATCH, "*"); + } + // Set SSE headers. req = self.insert_sse_headers(req, true); @@ -703,12 +714,12 @@ impl S3Core { // Set request payer header if enabled. req = self.insert_request_payer_header(req); - let mut req = req + let req = req // Inject operation to the request. .extension(Operation::Copy) - .header(constants::X_AMZ_COPY_SOURCE, &source) - .body(Buffer::new()) - .map_err(new_request_build_error)?; + .header(constants::X_AMZ_COPY_SOURCE, &source); + + let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; self.sign(&mut req).await?; diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 2cadeaa0d..4d582b6e8 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -148,6 +148,8 @@ pub struct Capability { /// Indicates if copy operations are supported. pub copy: bool, + /// Indicates if conditional copy operations with if-not-exists are supported. + pub copy_with_if_not_exists: bool, /// Indicates if rename operations are supported. pub rename: bool, diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 19f3211f4..90f1bd8b2 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1073,6 +1073,127 @@ impl Operator { Ok(()) } + /// Copy a file from `from` to `to` with additional options. + /// + /// # Notes + /// + /// - `from` and `to` must be a file. + /// - If `from` and `to` are the same, an `IsSameFile` error will occur. + /// - `copy` is idempotent. For same `from` and `to` input, the result will be the same. + /// + /// # Options + /// + /// Visit [`options::CopyOptions`] for all available options. + /// + /// # Examples + /// + /// Copy a file only if the destination doesn't exist: + /// + /// ``` + /// # use opendal::Result; + /// # use opendal::Operator; + /// + /// # async fn test(op: Operator) -> Result<()> { + /// op.copy_with("path/to/file", "path/to/file2") + /// .if_not_exists(true) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn copy_with(&self, from: &str, to: &str) -> FutureCopy<impl Future<Output = Result<()>>> { + let from = normalize_path(from); + let to = normalize_path(to); + + OperatorFuture::new( + self.inner().clone(), + from, + (options::CopyOptions::default(), to), + Self::copy_inner, + ) + } + + /// Copy a file from `from` to `to` with additional options. + /// + /// # Notes + /// + /// - `from` and `to` must be a file. + /// - If `from` and `to` are the same, an `IsSameFile` error will occur. + /// - `copy` is idempotent. For same `from` and `to` input, the result will be the same. + /// + /// # Options + /// + /// Check [`options::CopyOptions`] for all available options. + /// + /// # Examples + /// + /// Copy a file only if the destination doesn't exist: + /// + /// ``` + /// # use opendal::Result; + /// # use opendal::Operator; + /// # use opendal::options::CopyOptions; + /// + /// # async fn test(op: Operator) -> Result<()> { + /// let mut opts = CopyOptions::default(); + /// opts.if_not_exists = true; + /// op.copy_options("path/to/file", "path/to/file2", opts).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn copy_options( + &self, + from: &str, + to: &str, + opts: impl Into<options::CopyOptions>, + ) -> Result<()> { + let from = normalize_path(from); + let to = normalize_path(to); + let opts = opts.into(); + + Self::copy_inner(self.inner().clone(), from, (opts, to)).await + } + + async fn copy_inner( + acc: Accessor, + from: String, + (opts, to): (options::CopyOptions, String), + ) -> Result<()> { + if !validate_path(&from, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "from path is a directory") + .with_operation("Operator::copy") + .with_context("service", acc.info().scheme()) + .with_context("from", from), + ); + } + + if !validate_path(&to, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "to path is a directory") + .with_operation("Operator::copy") + .with_context("service", acc.info().scheme()) + .with_context("to", to), + ); + } + + if from == to { + return Err( + Error::new(ErrorKind::IsSameFile, "from and to paths are same") + .with_operation("Operator::copy") + .with_context("service", acc.info().scheme()) + .with_context("from", &from) + .with_context("to", &to), + ); + } + + let mut op = OpCopy::new(); + if opts.if_not_exists { + op = op.with_if_not_exists(true); + } + + acc.copy(&from, &to, op).await.map(|_| ()) + } + /// Rename a file from `from` to `to`. /// /// # Notes diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index ef263fd24..1fcb46e25 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -1398,3 +1398,33 @@ impl<F: Future<Output = Result<Lister>>> FutureLister<F> { self } } + +/// Future that generated by [`Operator::copy_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub type FutureCopy<F> = OperatorFuture<(options::CopyOptions, String), (), F>; + +impl<F: Future<Output = Result<()>>> FutureCopy<F> { + /// Sets the condition that copy operation will succeed only if target does not exist. + /// + /// Refer to [`options::CopyOptions::if_not_exists`] for more details. + /// + /// ### Example + /// + /// ``` + /// # use opendal::Result; + /// # use opendal::Operator; + /// + /// # async fn test(op: Operator) -> Result<()> { + /// let _ = op + /// .copy_with("source/path", "target/path") + /// .if_not_exists(true) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn if_not_exists(mut self, v: bool) -> Self { + self.args.0.if_not_exists = v; + self + } +} diff --git a/core/src/types/options.rs b/core/src/types/options.rs index c8d5e78b9..d63be89bb 100644 --- a/core/src/types/options.rs +++ b/core/src/types/options.rs @@ -512,3 +512,23 @@ pub struct WriteOptions { /// - Better utilize network bandwidth pub chunk: Option<usize>, } + +/// Options for copy operations. +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct CopyOptions { + /// Sets the condition that copy operation will succeed only if target does not exist. + /// + /// ### Capability + /// + /// Check [`Capability::copy_with_if_not_exists`] before using this feature. + /// + /// ### Behavior + /// + /// - If supported, the copy operation will only succeed if the target path does not exist + /// - Will return error if target already exists + /// - If not supported, the value will be ignored + /// + /// This operation provides a way to ensure copy operations only create new resources + /// without overwriting existing ones, useful for implementing "copy if not exists" logic. + pub if_not_exists: bool, +} diff --git a/core/tests/behavior/async_copy.rs b/core/tests/behavior/async_copy.rs index 56a372fe7..58e6dfa84 100644 --- a/core/tests/behavior/async_copy.rs +++ b/core/tests/behavior/async_copy.rs @@ -37,6 +37,14 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) { test_copy_overwrite )) } + + if cap.read && cap.write && cap.copy && cap.copy_with_if_not_exists { + tests.extend(async_trials!( + op, + test_copy_with_if_not_exists_to_new_file, + test_copy_with_if_not_exists_to_existing_file + )) + } } /// Copy a file with ascii name and test contents. @@ -229,3 +237,70 @@ pub async fn test_copy_overwrite(op: Operator) -> Result<()> { op.delete(&target_path).await.expect("delete must succeed"); Ok(()) } + +/// Copy with if_not_exists to a new file should succeed. +pub async fn test_copy_with_if_not_exists_to_new_file(op: Operator) -> Result<()> { + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + + op.write(&source_path, source_content.clone()).await?; + + let target_path = uuid::Uuid::new_v4().to_string(); + + // Copy with if_not_exists to a non-existing file should succeed + op.copy_with(&source_path, &target_path) + .if_not_exists(true) + .await?; + + let target_content = op + .read(&target_path) + .await + .expect("read must succeed") + .to_bytes(); + assert_eq!( + format!("{:x}", Sha256::digest(target_content)), + format!("{:x}", Sha256::digest(&source_content)), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + Ok(()) +} + +/// Copy with if_not_exists to an existing file should fail. +pub async fn test_copy_with_if_not_exists_to_existing_file(op: Operator) -> Result<()> { + let source_path = uuid::Uuid::new_v4().to_string(); + let (source_content, _) = gen_bytes(op.info().full_capability()); + + op.write(&source_path, source_content.clone()).await?; + + let target_path = uuid::Uuid::new_v4().to_string(); + let (target_content, _) = gen_bytes(op.info().full_capability()); + assert_ne!(source_content, target_content); + + // Write to target file first + op.write(&target_path, target_content.clone()).await?; + + // Copy with if_not_exists to an existing file should fail + let err = op + .copy_with(&source_path, &target_path) + .if_not_exists(true) + .await + .expect_err("copy must fail"); + assert_eq!(err.kind(), ErrorKind::ConditionNotMatch); + + // Verify target file content is unchanged + let current_content = op + .read(&target_path) + .await + .expect("read must succeed") + .to_bytes(); + assert_eq!( + format!("{:x}", Sha256::digest(current_content)), + format!("{:x}", Sha256::digest(&target_content)), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + Ok(()) +}