This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 37ee020  refactor!: `copy` & `copy_if_not_exists` => `copy_opts` (#548)
37ee020 is described below

commit 37ee0209055b06657bd92fbe2ba7e6395b67203e
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Nov 25 14:23:50 2025 +0100

    refactor!: `copy` & `copy_if_not_exists` => `copy_opts` (#548)
    
    Change the `ObjectStore` core trait to have a single, extensible copy
    operation. This helps #385 and #297.
    
    Also adds extensions similar to
    https://github.com/apache/arrow-rs/pull/7170
    and
    https://github.com/apache/arrow-rs/pull/7213 .
    
    Closes #116.
---
 src/aws/mod.rs          | 148 ++++++++++++++++++++++++++----------------------
 src/aws/precondition.rs |   8 +--
 src/azure/mod.rs        |  19 ++++---
 src/chunked.rs          |  12 ++--
 src/gcp/mod.rs          |  15 +++--
 src/http/mod.rs         |  20 ++++---
 src/lib.rs              | 124 +++++++++++++++++++++++++++++++---------
 src/limit.rs            |  15 ++---
 src/local.rs            | 129 ++++++++++++++++++++++-------------------
 src/memory.rs           |  34 ++++++-----
 src/prefix.rs           |  12 +---
 src/throttle.rs         |  12 +---
 tests/get_range_file.rs |   6 +-
 13 files changed, 324 insertions(+), 230 deletions(-)

diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index bcd429d..3e658af 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId};
 use crate::signer::Signer;
 use crate::util::STRICT_ENCODE_SET;
 use crate::{
-    Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta,
-    ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, 
PutResult, Result,
-    UploadPart,
+    CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
+    ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, 
PutPayload, PutResult,
+    Result, UploadPart,
 };
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -305,77 +305,89 @@ impl ObjectStore for AmazonS3 {
         self.client.list_with_delimiter(prefix).await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client
-            .copy_request(from, to)
-            .idempotent(true)
-            .send()
-            .await?;
-        Ok(())
-    }
-
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let (k, v, status) = match &self.client.config.copy_if_not_exists {
-            Some(S3CopyIfNotExists::Header(k, v)) => (k, v, 
StatusCode::PRECONDITION_FAILED),
-            Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, 
*status),
-            Some(S3CopyIfNotExists::Multipart) => {
-                let upload_id = self
-                    .client
-                    .create_multipart(to, PutMultipartOptions::default())
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
+
+        match mode {
+            CopyMode::Overwrite => {
+                self.client
+                    .copy_request(from, to)
+                    .idempotent(true)
+                    .send()
                     .await?;
+                Ok(())
+            }
+            CopyMode::Create => {
+                let (k, v, status) = match 
&self.client.config.copy_if_not_exists {
+                    Some(S3CopyIfNotExists::Header(k, v)) => {
+                        (k, v, StatusCode::PRECONDITION_FAILED)
+                    }
+                    Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => 
(k, v, *status),
+                    Some(S3CopyIfNotExists::Multipart) => {
+                        let upload_id = self
+                            .client
+                            .create_multipart(to, 
PutMultipartOptions::default())
+                            .await?;
+
+                        let res = async {
+                            let part_id = self
+                                .client
+                                .put_part(to, &upload_id, 0, 
PutPartPayload::Copy(from))
+                                .await?;
+                            match self
+                                .client
+                                .complete_multipart(
+                                    to,
+                                    &upload_id,
+                                    vec![part_id],
+                                    CompleteMultipartMode::Create,
+                                )
+                                .await
+                            {
+                                Err(e @ Error::Precondition { .. }) => 
Err(Error::AlreadyExists {
+                                    path: to.to_string(),
+                                    source: Box::new(e),
+                                }),
+                                Ok(_) => Ok(()),
+                                Err(e) => Err(e),
+                            }
+                        }
+                        .await;
+
+                        // If the multipart upload failed, make a best effort 
attempt to
+                        // clean it up. It's the caller's responsibility to 
add a
+                        // lifecycle rule if guaranteed cleanup is required, 
as we
+                        // cannot protect against an ill-timed process crash.
+                        if res.is_err() {
+                            let _ = self.client.abort_multipart(to, 
&upload_id).await;
+                        }
 
-                let res = async {
-                    let part_id = self
-                        .client
-                        .put_part(to, &upload_id, 0, 
PutPartPayload::Copy(from))
-                        .await?;
-                    match self
-                        .client
-                        .complete_multipart(
-                            to,
-                            &upload_id,
-                            vec![part_id],
-                            CompleteMultipartMode::Create,
-                        )
-                        .await
+                        return res;
+                    }
+                    None => {
+                        return Err(Error::NotSupported {
+                            source: "S3 does not support 
copy-if-not-exists".to_string().into(),
+                        });
+                    }
+                };
+
+                let req = self.client.copy_request(from, to);
+                match req.header(k, v).send().await {
+                    Err(RequestError::Retry { source, path })
+                        if source.status() == Some(status) =>
                     {
-                        Err(e @ Error::Precondition { .. }) => 
Err(Error::AlreadyExists {
-                            path: to.to_string(),
-                            source: Box::new(e),
-                        }),
-                        Ok(_) => Ok(()),
-                        Err(e) => Err(e),
+                        Err(Error::AlreadyExists {
+                            source: Box::new(source),
+                            path,
+                        })
                     }
+                    Err(e) => Err(e.into()),
+                    Ok(_) => Ok(()),
                 }
-                .await;
-
-                // If the multipart upload failed, make a best effort attempt 
to
-                // clean it up. It's the caller's responsibility to add a
-                // lifecycle rule if guaranteed cleanup is required, as we
-                // cannot protect against an ill-timed process crash.
-                if res.is_err() {
-                    let _ = self.client.abort_multipart(to, &upload_id).await;
-                }
-
-                return res;
-            }
-            None => {
-                return Err(Error::NotSupported {
-                    source: "S3 does not support 
copy-if-not-exists".to_string().into(),
-                });
-            }
-        };
-
-        let req = self.client.copy_request(from, to);
-        match req.header(k, v).send().await {
-            Err(RequestError::Retry { source, path }) if source.status() == 
Some(status) => {
-                Err(Error::AlreadyExists {
-                    source: Box::new(source),
-                    path,
-                })
             }
-            Err(e) => Err(e.into()),
-            Ok(_) => Ok(()),
         }
     }
 }
diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs
index 52ecb9f..b4ae938 100644
--- a/src/aws/precondition.rs
+++ b/src/aws/precondition.rs
@@ -19,9 +19,9 @@ use crate::config::Parse;
 
 use itertools::Itertools;
 
-/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for 
[`AmazonS3`].
+/// Configure how to provide [`CopyMode::Create`] for [`AmazonS3`].
 ///
-/// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
+/// [`CopyMode::Create`]: crate::CopyMode::Create
 /// [`AmazonS3`]: super::AmazonS3
 #[derive(Debug, Clone, PartialEq, Eq)]
 #[non_exhaustive]
@@ -29,7 +29,7 @@ pub enum S3CopyIfNotExists {
     /// Some S3-compatible stores, such as Cloudflare R2, support copy if not 
exists
     /// semantics through custom headers.
     ///
-    /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy 
operation
+    /// If set, [`CopyMode::Create`] will perform a normal copy operation
     /// with the provided header pair, and expect the store to fail with `412 
Precondition Failed`
     /// if the destination file already exists.
     ///
@@ -38,7 +38,7 @@ pub enum S3CopyIfNotExists {
     /// For example `header: cf-copy-destination-if-none-match: *`, would set
     /// the header `cf-copy-destination-if-none-match` to `*`
     ///
-    /// [`ObjectStore::copy_if_not_exists`]: 
crate::ObjectStore::copy_if_not_exists
+    /// [`CopyMode::Create`]: crate::CopyMode::Create
     Header(String, String),
     /// The same as [`S3CopyIfNotExists::Header`] but allows custom status 
code checking, for object stores that return values
     /// other than 412.
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 3f5c723..d22ffcf 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -23,8 +23,9 @@
 //!
 //! Unused blocks will automatically be dropped after 7 days.
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
+    CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, 
MultipartUpload,
+    ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, 
PutResult, Result,
+    UploadPart,
     multipart::{MultipartStore, PartId},
     path::Path,
     signer::Signer,
@@ -151,12 +152,16 @@ impl ObjectStore for MicrosoftAzure {
         self.client.list_with_delimiter(prefix).await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, true).await
-    }
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, false).await
+        match mode {
+            CopyMode::Overwrite => self.client.copy_request(from, to, 
true).await,
+            CopyMode::Create => self.client.copy_request(from, to, 
false).await,
+        }
     }
 }
 
diff --git a/src/chunked.rs b/src/chunked.rs
index 843781c..53dbf1e 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -28,8 +28,8 @@ use futures::stream::BoxStream;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
-    PutMultipartOptions, PutOptions, PutResult,
+    CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
+    ObjectStore, PutMultipartOptions, PutOptions, PutResult,
 };
 use crate::{PutPayload, Result};
 
@@ -166,18 +166,14 @@ impl ObjectStore for ChunkedStore {
         self.inner.list_with_delimiter(prefix).await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.inner.copy(from, to).await
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        self.inner.copy_opts(from, to, options).await
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
         self.inner.rename(from, to).await
     }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.inner.copy_if_not_exists(from, to).await
-    }
-
     async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
         self.inner.rename_if_not_exists(from, to).await
     }
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index befe6ae..270b89a 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -40,6 +40,7 @@ use std::time::Duration;
 use crate::client::CredentialProvider;
 use crate::gcp::credential::GCSAuthorizer;
 use crate::signer::Signer;
+use crate::{CopyMode, CopyOptions};
 use crate::{
     GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, 
UploadPart, multipart::PartId,
@@ -218,12 +219,16 @@ impl ObjectStore for GoogleCloudStorage {
         self.client.list_with_delimiter(prefix).await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, false).await
-    }
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, true).await
+        match mode {
+            CopyMode::Overwrite => self.client.copy_request(from, to, 
true).await,
+            CopyMode::Create => self.client.copy_request(from, to, 
false).await,
+        }
     }
 }
 
diff --git a/src/http/mod.rs b/src/http/mod.rs
index cd6311a..673419c 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -45,9 +45,9 @@ use crate::client::{HttpConnector, http_connector};
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
-    ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, 
PutResult, Result,
-    RetryConfig,
+    ClientConfigKey, ClientOptions, CopyMode, CopyOptions, GetOptions, 
GetResult, ListResult,
+    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, 
PutOptions, PutPayload,
+    PutResult, Result, RetryConfig,
 };
 
 mod client;
@@ -210,12 +210,16 @@ impl ObjectStore for HttpStore {
         })
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy(from, to, true).await
-    }
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy(from, to, false).await
+        match mode {
+            CopyMode::Overwrite => self.client.copy(from, to, true).await,
+            CopyMode::Create => self.client.copy(from, to, false).await,
+        }
     }
 }
 
diff --git a/src/lib.rs b/src/lib.rs
index 31c5352..9b59627 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -890,7 +890,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// # use futures::stream::{BoxStream, StreamExt};
     /// # use object_store::path::Path;
     /// # use object_store::{
-    /// #     GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
+    /// #     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
     /// #     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
     /// # };
     /// # use std::fmt;
@@ -969,11 +969,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// #         todo!()
     /// #     }
     /// #
-    /// #     async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
-    /// #         todo!()
-    /// #     }
-    /// #
-    /// #     async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> 
Result<()> {
+    /// #     async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> 
Result<()> {
     /// #         todo!()
     /// #     }
     /// # }
@@ -1033,9 +1029,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult>;
 
     /// Copy an object from one path to another in the same object store.
-    ///
-    /// If there exists an object at the destination, it will be overwritten.
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()>;
 
     /// Move an object from one path to another in the same object store.
     ///
@@ -1048,15 +1042,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
         self.delete(from).await
     }
 
-    /// Copy an object from one path to another, only if destination is empty.
-    ///
-    /// Will return an error if the destination already has an object.
-    ///
-    /// Performs an atomic operation if the underlying object storage supports 
it.
-    /// If atomic operations are not supported by the underlying object 
storage (like S3)
-    /// it will return an error.
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
-
     /// Move an object from one path to another in the same object store.
     ///
     /// Will return an error if the destination already has an object.
@@ -1127,18 +1112,14 @@ macro_rules! as_ref_impl {
                 self.as_ref().list_with_delimiter(prefix).await
             }
 
-            async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-                self.as_ref().copy(from, to).await
+            async fn copy_opts(&self, from: &Path, to: &Path, options: 
CopyOptions) -> Result<()> {
+                self.as_ref().copy_opts(from, to, options).await
             }
 
             async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
                 self.as_ref().rename(from, to).await
             }
 
-            async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> 
Result<()> {
-                self.as_ref().copy_if_not_exists(from, to).await
-            }
-
             async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> 
Result<()> {
                 self.as_ref().rename_if_not_exists(from, to).await
             }
@@ -1243,6 +1224,20 @@ pub trait ObjectStoreExt: ObjectStore {
 
     /// Return the metadata for the specified location
     fn head(&self, location: &Path) -> impl Future<Output = 
Result<ObjectMeta>>;
+
+    /// Copy an object from one path to another in the same object store.
+    ///
+    /// If there exists an object at the destination, it will be overwritten.
+    fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
+
+    /// Copy an object from one path to another, only if destination is empty.
+    ///
+    /// Will return an error if the destination already has an object.
+    ///
+    /// Performs an atomic operation if the underlying object storage supports 
it.
+    /// If atomic operations are not supported by the underlying object 
storage (like S3)
+    /// it will return an error.
+    fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output 
= Result<()>>;
 }
 
 impl<T> ObjectStoreExt for T
@@ -1272,6 +1267,16 @@ where
         let options = GetOptions::new().with_head(true);
         Ok(self.get_opts(location, options).await?.meta)
     }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
+        self.copy_opts(from, to, options).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let options = CopyOptions::new().with_mode(CopyMode::Create);
+        self.copy_opts(from, to, options).await
+    }
 }
 
 /// Result of a list call that includes objects, prefixes (directories) and a
@@ -1633,7 +1638,7 @@ pub struct PutOptions {
     ///
     /// These extensions are ignored entirely by backends offered through this 
crate.
     ///
-    /// They are also eclused from [`PartialEq`] and [`Eq`].
+    /// They are also excluded from [`PartialEq`] and [`Eq`].
     pub extensions: Extensions,
 }
 
@@ -1705,7 +1710,7 @@ pub struct PutMultipartOptions {
     ///
     /// These extensions are ignored entirely by backends offered through this 
crate.
     ///
-    /// They are also eclused from [`PartialEq`] and [`Eq`].
+    /// They are also excluded from [`PartialEq`] and [`Eq`].
     pub extensions: Extensions,
 }
 
@@ -1756,6 +1761,73 @@ pub struct PutResult {
     pub version: Option<String>,
 }
 
+/// Configure preconditions for the copy operation
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum CopyMode {
+    /// Perform an atomic write operation, overwriting any object present at 
the provided path
+    #[default]
+    Overwrite,
+    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] 
if an
+    /// object already exists at the provided path
+    Create,
+}
+
+/// Options for a copy request
+#[derive(Debug, Clone, Default)]
+pub struct CopyOptions {
+    /// Configure the [`CopyMode`] for this operation
+    pub mode: CopyMode,
+    /// Implementation-specific extensions. Intended for use by 
[`ObjectStore`] implementations
+    /// that need to pass context-specific information (like tracing spans) 
via trait methods.
+    ///
+    /// These extensions are ignored entirely by backends offered through this 
crate.
+    ///
+    /// They are also excluded from [`PartialEq`] and [`Eq`].
+    pub extensions: Extensions,
+}
+
+impl CopyOptions {
+    /// Create a new [`CopyOptions`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Sets the `mode.
+    ///
+    /// See [`CopyOptions::mode`].
+    #[must_use]
+    pub fn with_mode(mut self, mode: CopyMode) -> Self {
+        self.mode = mode;
+        self
+    }
+
+    /// Sets the `extensions`.
+    ///
+    /// See [`CopyOptions::extensions`].
+    #[must_use]
+    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
+        self.extensions = extensions;
+        self
+    }
+}
+
+impl PartialEq<Self> for CopyOptions {
+    fn eq(&self, other: &Self) -> bool {
+        let Self {
+            mode,
+            extensions: _,
+        } = self;
+        let Self {
+            mode: mode_other,
+            extensions: _,
+        } = other;
+
+        mode == mode_other
+    }
+}
+
+impl Eq for CopyOptions {}
+
 /// A specialized `Result` for object store-related errors
 pub type Result<T, E = Error> = std::result::Result<T, E>;
 
diff --git a/src/limit.rs b/src/limit.rs
index 9efe879..7e1e2b4 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -18,9 +18,9 @@
 //! An object store that limits the maximum concurrency of the wrapped 
implementation
 
 use crate::{
-    BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
-    ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
Result, StreamExt,
-    UploadPart,
+    BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, 
ListResult, MultipartUpload,
+    ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, 
PutPayload, PutResult, Result,
+    StreamExt, UploadPart,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -151,9 +151,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.list_with_delimiter(prefix).await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
         let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.copy(from, to).await
+        self.inner.copy_opts(from, to, options).await
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -161,11 +161,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.rename(from, to).await
     }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let _permit = self.semaphore.acquire().await.unwrap();
-        self.inner.copy_if_not_exists(from, to).await
-    }
-
     async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.rename_if_not_exists(from, to).await
diff --git a/src/local.rs b/src/local.rs
index 0bd0c88..ebe9527 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,6 +40,7 @@ use crate::{
     path::{Path, absolute_path_to_url},
     util::InvalidGetRange,
 };
+use crate::{CopyMode, CopyOptions};
 
 /// A specialized `Error` for filesystem object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -189,7 +190,7 @@ impl From<Error> for super::Error {
 ///
 /// # Cross-Filesystem Copy
 ///
-/// [`LocalFileSystem::copy`] is implemented using [`std::fs::hard_link`], and 
therefore
+/// [`LocalFileSystem::copy_opts`] is implemented using 
[`std::fs::hard_link`], and therefore
 /// does not support copying across filesystem boundaries.
 ///
 #[derive(Debug)]
@@ -533,38 +534,80 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
+
         let from = self.path_to_filesystem(from)?;
         let to = self.path_to_filesystem(to)?;
-        let mut id = 0;
-        // In order to make this atomic we:
-        //
-        // - hard link to a hidden temporary file
-        // - atomically rename this temporary file into place
-        //
-        // This is necessary because hard_link returns an error if the 
destination already exists
-        maybe_spawn_blocking(move || {
-            loop {
-                let staged = staged_upload_path(&to, &id.to_string());
-                match std::fs::hard_link(&from, &staged) {
-                    Ok(_) => {
-                        return std::fs::rename(&staged, &to).map_err(|source| {
-                            let _ = std::fs::remove_file(&staged); // Attempt 
to clean up
-                            Error::UnableToCopyFile { from, to, source }.into()
-                        });
+
+        match mode {
+            CopyMode::Overwrite => {
+                let mut id = 0;
+                // In order to make this atomic we:
+                //
+                // - hard link to a hidden temporary file
+                // - atomically rename this temporary file into place
+                //
+                // This is necessary because hard_link returns an error if the 
destination already exists
+                maybe_spawn_blocking(move || {
+                    loop {
+                        let staged = staged_upload_path(&to, &id.to_string());
+                        match std::fs::hard_link(&from, &staged) {
+                            Ok(_) => {
+                                return std::fs::rename(&staged, 
&to).map_err(|source| {
+                                    let _ = std::fs::remove_file(&staged); // 
Attempt to clean up
+                                    Error::UnableToCopyFile { from, to, source 
}.into()
+                                });
+                            }
+                            Err(source) => match source.kind() {
+                                ErrorKind::AlreadyExists => id += 1,
+                                ErrorKind::NotFound => match from.exists() {
+                                    true => create_parent_dirs(&to, source)?,
+                                    false => {
+                                        return Err(Error::NotFound { path: 
from, source }.into());
+                                    }
+                                },
+                                _ => {
+                                    return Err(Error::UnableToCopyFile { from, 
to, source }.into());
+                                }
+                            },
+                        }
                     }
-                    Err(source) => match source.kind() {
-                        ErrorKind::AlreadyExists => id += 1,
-                        ErrorKind::NotFound => match from.exists() {
-                            true => create_parent_dirs(&to, source)?,
-                            false => return Err(Error::NotFound { path: from, 
source }.into()),
-                        },
-                        _ => return Err(Error::UnableToCopyFile { from, to, 
source }.into()),
-                    },
-                }
+                })
+                .await
             }
-        })
-        .await
+            CopyMode::Create => {
+                maybe_spawn_blocking(move || {
+                    loop {
+                        match std::fs::hard_link(&from, &to) {
+                            Ok(_) => return Ok(()),
+                            Err(source) => match source.kind() {
+                                ErrorKind::AlreadyExists => {
+                                    return Err(Error::AlreadyExists {
+                                        path: to.to_str().unwrap().to_string(),
+                                        source,
+                                    }
+                                    .into());
+                                }
+                                ErrorKind::NotFound => match from.exists() {
+                                    true => create_parent_dirs(&to, source)?,
+                                    false => {
+                                        return Err(Error::NotFound { path: 
from, source }.into());
+                                    }
+                                },
+                                _ => {
+                                    return Err(Error::UnableToCopyFile { from, 
to, source }.into());
+                                }
+                            },
+                        }
+                    }
+                })
+                .await
+            }
+        }
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -586,34 +629,6 @@ impl ObjectStore for LocalFileSystem {
         })
         .await
     }
-
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let from = self.path_to_filesystem(from)?;
-        let to = self.path_to_filesystem(to)?;
-
-        maybe_spawn_blocking(move || {
-            loop {
-                match std::fs::hard_link(&from, &to) {
-                    Ok(_) => return Ok(()),
-                    Err(source) => match source.kind() {
-                        ErrorKind::AlreadyExists => {
-                            return Err(Error::AlreadyExists {
-                                path: to.to_str().unwrap().to_string(),
-                                source,
-                            }
-                            .into());
-                        }
-                        ErrorKind::NotFound => match from.exists() {
-                            true => create_parent_dirs(&to, source)?,
-                            false => return Err(Error::NotFound { path: from, 
source }.into()),
-                        },
-                        _ => return Err(Error::UnableToCopyFile { from, to, 
source }.into()),
-                    },
-                }
-            }
-        })
-        .await
-    }
 }
 
 impl LocalFileSystem {
diff --git a/src/memory.rs b/src/memory.rs
index 9cc084f..08e41c2 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -33,7 +33,7 @@ use crate::{
     ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, 
PutResult, Result,
     UpdateVersion, UploadPart, path::Path,
 };
-use crate::{GetOptions, PutPayload};
+use crate::{CopyMode, CopyOptions, GetOptions, PutPayload};
 
 /// A specialized `Error` for in-memory object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -391,24 +391,30 @@ impl ObjectStore for InMemory {
         })
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        let entry = self.entry(from)?;
-        self.storage
-            .write()
-            .insert(to, entry.data, entry.attributes);
-        Ok(())
-    }
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
+        let CopyOptions {
+            mode,
+            extensions: _,
+        } = options;
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
         let entry = self.entry(from)?;
         let mut storage = self.storage.write();
-        if storage.map.contains_key(to) {
-            return Err(Error::AlreadyExists {
-                path: to.to_string(),
+
+        match mode {
+            CopyMode::Overwrite => {
+                storage.insert(to, entry.data, entry.attributes);
+            }
+            CopyMode::Create => {
+                if storage.map.contains_key(to) {
+                    return Err(Error::AlreadyExists {
+                        path: to.to_string(),
+                    }
+                    .into());
+                }
+                storage.insert(to, entry.data, entry.attributes);
             }
-            .into());
         }
-        storage.insert(to, entry.data, entry.attributes);
+
         Ok(())
     }
 }
diff --git a/src/prefix.rs b/src/prefix.rs
index 392cfac..c37f55d 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -22,7 +22,7 @@ use std::ops::Range;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
+    CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
 };
 
@@ -182,10 +182,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
             })
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
-        self.inner.copy(&full_from, &full_to).await
+        self.inner.copy_opts(&full_from, &full_to, options).await
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -194,12 +194,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.rename(&full_from, &full_to).await
     }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let full_from = self.full_path(from);
-        let full_to = self.full_path(to);
-        self.inner.copy_if_not_exists(&full_from, &full_to).await
-    }
-
     async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
         let full_from = self.full_path(from);
         let full_to = self.full_path(to);
diff --git a/src/throttle.rs b/src/throttle.rs
index e5fade5..bd5795e 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -21,7 +21,7 @@ use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
 use crate::multipart::{MultipartStore, PartId};
-use crate::{GetOptions, UploadPart};
+use crate::{CopyOptions, GetOptions, UploadPart};
 use crate::{
     GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, 
ObjectMeta, ObjectStore,
     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
@@ -255,10 +255,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         }
     }
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> 
Result<()> {
         sleep(self.config().wait_put_per_call).await;
 
-        self.inner.copy(from, to).await
+        self.inner.copy_opts(from, to, options).await
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -267,12 +267,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         self.inner.rename(from, to).await
     }
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        sleep(self.config().wait_put_per_call).await;
-
-        self.inner.copy_if_not_exists(from, to).await
-    }
-
     async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
         sleep(self.config().wait_put_per_call).await;
 
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index d39d972..df0a414 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -77,11 +77,7 @@ impl ObjectStore for MyStore {
         todo!()
     }
 
-    async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
-        todo!()
-    }
-
-    async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> Result<()> {
+    async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> 
Result<()> {
         todo!()
     }
 }


Reply via email to