This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 37ee020 refactor!: `copy` & `copy_if_not_exists` => `copy_opts` (#548)
37ee020 is described below
commit 37ee0209055b06657bd92fbe2ba7e6395b67203e
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Nov 25 14:23:50 2025 +0100
refactor!: `copy` & `copy_if_not_exists` => `copy_opts` (#548)
Change the `ObjectStore` core trait to have a single, extensible copy
operation. This helps #385 and #297.
Also adds extensions similar to
https://github.com/apache/arrow-rs/pull/7170
and
https://github.com/apache/arrow-rs/pull/7213 .
Closes #116.
---
src/aws/mod.rs | 148 ++++++++++++++++++++++++++----------------------
src/aws/precondition.rs | 8 +--
src/azure/mod.rs | 19 ++++---
src/chunked.rs | 12 ++--
src/gcp/mod.rs | 15 +++--
src/http/mod.rs | 20 ++++---
src/lib.rs | 124 +++++++++++++++++++++++++++++++---------
src/limit.rs | 15 ++---
src/local.rs | 129 ++++++++++++++++++++++-------------------
src/memory.rs | 34 ++++++-----
src/prefix.rs | 12 +---
src/throttle.rs | 12 +---
tests/get_range_file.rs | 6 +-
13 files changed, 324 insertions(+), 230 deletions(-)
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index bcd429d..3e658af 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
- Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta,
- ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result,
- UploadPart,
+ CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload,
+ ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions,
PutPayload, PutResult,
+ Result, UploadPart,
};
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -305,77 +305,89 @@ impl ObjectStore for AmazonS3 {
self.client.list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client
- .copy_request(from, to)
- .idempotent(true)
- .send()
- .await?;
- Ok(())
- }
-
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let (k, v, status) = match &self.client.config.copy_if_not_exists {
- Some(S3CopyIfNotExists::Header(k, v)) => (k, v,
StatusCode::PRECONDITION_FAILED),
- Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v,
*status),
- Some(S3CopyIfNotExists::Multipart) => {
- let upload_id = self
- .client
- .create_multipart(to, PutMultipartOptions::default())
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
+
+ match mode {
+ CopyMode::Overwrite => {
+ self.client
+ .copy_request(from, to)
+ .idempotent(true)
+ .send()
.await?;
+ Ok(())
+ }
+ CopyMode::Create => {
+ let (k, v, status) = match
&self.client.config.copy_if_not_exists {
+ Some(S3CopyIfNotExists::Header(k, v)) => {
+ (k, v, StatusCode::PRECONDITION_FAILED)
+ }
+ Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) =>
(k, v, *status),
+ Some(S3CopyIfNotExists::Multipart) => {
+ let upload_id = self
+ .client
+ .create_multipart(to,
PutMultipartOptions::default())
+ .await?;
+
+ let res = async {
+ let part_id = self
+ .client
+ .put_part(to, &upload_id, 0,
PutPartPayload::Copy(from))
+ .await?;
+ match self
+ .client
+ .complete_multipart(
+ to,
+ &upload_id,
+ vec![part_id],
+ CompleteMultipartMode::Create,
+ )
+ .await
+ {
+ Err(e @ Error::Precondition { .. }) =>
Err(Error::AlreadyExists {
+ path: to.to_string(),
+ source: Box::new(e),
+ }),
+ Ok(_) => Ok(()),
+ Err(e) => Err(e),
+ }
+ }
+ .await;
+
+ // If the multipart upload failed, make a best effort
attempt to
+ // clean it up. It's the caller's responsibility to
add a
+ // lifecycle rule if guaranteed cleanup is required,
as we
+ // cannot protect against an ill-timed process crash.
+ if res.is_err() {
+ let _ = self.client.abort_multipart(to,
&upload_id).await;
+ }
- let res = async {
- let part_id = self
- .client
- .put_part(to, &upload_id, 0,
PutPartPayload::Copy(from))
- .await?;
- match self
- .client
- .complete_multipart(
- to,
- &upload_id,
- vec![part_id],
- CompleteMultipartMode::Create,
- )
- .await
+ return res;
+ }
+ None => {
+ return Err(Error::NotSupported {
+ source: "S3 does not support
copy-if-not-exists".to_string().into(),
+ });
+ }
+ };
+
+ let req = self.client.copy_request(from, to);
+ match req.header(k, v).send().await {
+ Err(RequestError::Retry { source, path })
+ if source.status() == Some(status) =>
{
- Err(e @ Error::Precondition { .. }) =>
Err(Error::AlreadyExists {
- path: to.to_string(),
- source: Box::new(e),
- }),
- Ok(_) => Ok(()),
- Err(e) => Err(e),
+ Err(Error::AlreadyExists {
+ source: Box::new(source),
+ path,
+ })
}
+ Err(e) => Err(e.into()),
+ Ok(_) => Ok(()),
}
- .await;
-
- // If the multipart upload failed, make a best effort attempt
to
- // clean it up. It's the caller's responsibility to add a
- // lifecycle rule if guaranteed cleanup is required, as we
- // cannot protect against an ill-timed process crash.
- if res.is_err() {
- let _ = self.client.abort_multipart(to, &upload_id).await;
- }
-
- return res;
- }
- None => {
- return Err(Error::NotSupported {
- source: "S3 does not support
copy-if-not-exists".to_string().into(),
- });
- }
- };
-
- let req = self.client.copy_request(from, to);
- match req.header(k, v).send().await {
- Err(RequestError::Retry { source, path }) if source.status() ==
Some(status) => {
- Err(Error::AlreadyExists {
- source: Box::new(source),
- path,
- })
}
- Err(e) => Err(e.into()),
- Ok(_) => Ok(()),
}
}
}
diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs
index 52ecb9f..b4ae938 100644
--- a/src/aws/precondition.rs
+++ b/src/aws/precondition.rs
@@ -19,9 +19,9 @@ use crate::config::Parse;
use itertools::Itertools;
-/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for
[`AmazonS3`].
+/// Configure how to provide [`CopyMode::Create`] for [`AmazonS3`].
///
-/// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
+/// [`CopyMode::Create`]: crate::CopyMode::Create
/// [`AmazonS3`]: super::AmazonS3
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
@@ -29,7 +29,7 @@ pub enum S3CopyIfNotExists {
/// Some S3-compatible stores, such as Cloudflare R2, support copy if not
exists
/// semantics through custom headers.
///
- /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy
operation
+ /// If set, [`CopyMode::Create`] will perform a normal copy operation
/// with the provided header pair, and expect the store to fail with `412
Precondition Failed`
/// if the destination file already exists.
///
@@ -38,7 +38,7 @@ pub enum S3CopyIfNotExists {
/// For example `header: cf-copy-destination-if-none-match: *`, would set
/// the header `cf-copy-destination-if-none-match` to `*`
///
- /// [`ObjectStore::copy_if_not_exists`]:
crate::ObjectStore::copy_if_not_exists
+ /// [`CopyMode::Create`]: crate::CopyMode::Create
Header(String, String),
/// The same as [`S3CopyIfNotExists::Header`] but allows custom status
code checking, for object stores that return values
/// other than 412.
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 3f5c723..d22ffcf 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -23,8 +23,9 @@
//!
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
- GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
- PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
+ CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId,
MultipartUpload,
+ ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result,
+ UploadPart,
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
@@ -151,12 +152,16 @@ impl ObjectStore for MicrosoftAzure {
self.client.list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, true).await
- }
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, false).await
+ match mode {
+ CopyMode::Overwrite => self.client.copy_request(from, to,
true).await,
+ CopyMode::Create => self.client.copy_request(from, to,
false).await,
+ }
}
}
diff --git a/src/chunked.rs b/src/chunked.rs
index 843781c..53dbf1e 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -28,8 +28,8 @@ use futures::stream::BoxStream;
use crate::path::Path;
use crate::{
- GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
- PutMultipartOptions, PutOptions, PutResult,
+ CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
+ ObjectStore, PutMultipartOptions, PutOptions, PutResult,
};
use crate::{PutPayload, Result};
@@ -166,18 +166,14 @@ impl ObjectStore for ChunkedStore {
self.inner.list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.inner.copy(from, to).await
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ self.inner.copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.rename(from, to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.inner.copy_if_not_exists(from, to).await
- }
-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
self.inner.rename_if_not_exists(from, to).await
}
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index befe6ae..270b89a 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -40,6 +40,7 @@ use std::time::Duration;
use crate::client::CredentialProvider;
use crate::gcp::credential::GCSAuthorizer;
use crate::signer::Signer;
+use crate::{CopyMode, CopyOptions};
use crate::{
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
UploadPart, multipart::PartId,
@@ -218,12 +219,16 @@ impl ObjectStore for GoogleCloudStorage {
self.client.list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, false).await
- }
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, true).await
+ match mode {
+ CopyMode::Overwrite => self.client.copy_request(from, to,
true).await,
+ CopyMode::Create => self.client.copy_request(from, to,
false).await,
+ }
}
}
diff --git a/src/http/mod.rs b/src/http/mod.rs
index cd6311a..673419c 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -45,9 +45,9 @@ use crate::client::{HttpConnector, http_connector};
use crate::http::client::Client;
use crate::path::Path;
use crate::{
- ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result,
- RetryConfig,
+ ClientConfigKey, ClientOptions, CopyMode, CopyOptions, GetOptions,
GetResult, ListResult,
+ MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions,
PutOptions, PutPayload,
+ PutResult, Result, RetryConfig,
};
mod client;
@@ -210,12 +210,16 @@ impl ObjectStore for HttpStore {
})
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy(from, to, true).await
- }
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy(from, to, false).await
+ match mode {
+ CopyMode::Overwrite => self.client.copy(from, to, true).await,
+ CopyMode::Create => self.client.copy(from, to, false).await,
+ }
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 31c5352..9b59627 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -890,7 +890,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// # use futures::stream::{BoxStream, StreamExt};
/// # use object_store::path::Path;
/// # use object_store::{
- /// # GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
+ /// # CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
/// # PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
/// # };
/// # use std::fmt;
@@ -969,11 +969,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// # todo!()
/// # }
/// #
- /// # async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
- /// # todo!()
- /// # }
- /// #
- /// # async fn copy_if_not_exists(&self, _: &Path, _: &Path) ->
Result<()> {
+ /// # async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) ->
Result<()> {
/// # todo!()
/// # }
/// # }
@@ -1033,9 +1029,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult>;
/// Copy an object from one path to another in the same object store.
- ///
- /// If there exists an object at the destination, it will be overwritten.
- async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()>;
/// Move an object from one path to another in the same object store.
///
@@ -1048,15 +1042,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
self.delete(from).await
}
- /// Copy an object from one path to another, only if destination is empty.
- ///
- /// Will return an error if the destination already has an object.
- ///
- /// Performs an atomic operation if the underlying object storage supports
it.
- /// If atomic operations are not supported by the underlying object
storage (like S3)
- /// it will return an error.
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
-
/// Move an object from one path to another in the same object store.
///
/// Will return an error if the destination already has an object.
@@ -1127,18 +1112,14 @@ macro_rules! as_ref_impl {
self.as_ref().list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.as_ref().copy(from, to).await
+ async fn copy_opts(&self, from: &Path, to: &Path, options:
CopyOptions) -> Result<()> {
+ self.as_ref().copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename(from, to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) ->
Result<()> {
- self.as_ref().copy_if_not_exists(from, to).await
- }
-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) ->
Result<()> {
self.as_ref().rename_if_not_exists(from, to).await
}
@@ -1243,6 +1224,20 @@ pub trait ObjectStoreExt: ObjectStore {
/// Return the metadata for the specified location
fn head(&self, location: &Path) -> impl Future<Output =
Result<ObjectMeta>>;
+
+ /// Copy an object from one path to another in the same object store.
+ ///
+ /// If there exists an object at the destination, it will be overwritten.
+ fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
+
+ /// Copy an object from one path to another, only if destination is empty.
+ ///
+ /// Will return an error if the destination already has an object.
+ ///
+ /// Performs an atomic operation if the underlying object storage supports
it.
+ /// If atomic operations are not supported by the underlying object
storage (like S3)
+ /// it will return an error.
+ fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output
= Result<()>>;
}
impl<T> ObjectStoreExt for T
@@ -1272,6 +1267,16 @@ where
let options = GetOptions::new().with_head(true);
Ok(self.get_opts(location, options).await?.meta)
}
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
+ self.copy_opts(from, to, options).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ let options = CopyOptions::new().with_mode(CopyMode::Create);
+ self.copy_opts(from, to, options).await
+ }
}
/// Result of a list call that includes objects, prefixes (directories) and a
@@ -1633,7 +1638,7 @@ pub struct PutOptions {
///
/// These extensions are ignored entirely by backends offered through this
crate.
///
- /// They are also eclused from [`PartialEq`] and [`Eq`].
+ /// They are also excluded from [`PartialEq`] and [`Eq`].
pub extensions: Extensions,
}
@@ -1705,7 +1710,7 @@ pub struct PutMultipartOptions {
///
/// These extensions are ignored entirely by backends offered through this
crate.
///
- /// They are also eclused from [`PartialEq`] and [`Eq`].
+ /// They are also excluded from [`PartialEq`] and [`Eq`].
pub extensions: Extensions,
}
@@ -1756,6 +1761,73 @@ pub struct PutResult {
pub version: Option<String>,
}
+/// Configure preconditions for the copy operation
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum CopyMode {
+ /// Perform an atomic write operation, overwriting any object present at
the provided path
+ #[default]
+ Overwrite,
+ /// Perform an atomic write operation, returning [`Error::AlreadyExists`]
if an
+ /// object already exists at the provided path
+ Create,
+}
+
+/// Options for a copy request
+#[derive(Debug, Clone, Default)]
+pub struct CopyOptions {
+ /// Configure the [`CopyMode`] for this operation
+ pub mode: CopyMode,
+ /// Implementation-specific extensions. Intended for use by
[`ObjectStore`] implementations
+ /// that need to pass context-specific information (like tracing spans)
via trait methods.
+ ///
+ /// These extensions are ignored entirely by backends offered through this
crate.
+ ///
+ /// They are also excluded from [`PartialEq`] and [`Eq`].
+ pub extensions: Extensions,
+}
+
+impl CopyOptions {
+ /// Create a new [`CopyOptions`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Sets the `mode.
+ ///
+ /// See [`CopyOptions::mode`].
+ #[must_use]
+ pub fn with_mode(mut self, mode: CopyMode) -> Self {
+ self.mode = mode;
+ self
+ }
+
+ /// Sets the `extensions`.
+ ///
+ /// See [`CopyOptions::extensions`].
+ #[must_use]
+ pub fn with_extensions(mut self, extensions: Extensions) -> Self {
+ self.extensions = extensions;
+ self
+ }
+}
+
+impl PartialEq<Self> for CopyOptions {
+ fn eq(&self, other: &Self) -> bool {
+ let Self {
+ mode,
+ extensions: _,
+ } = self;
+ let Self {
+ mode: mode_other,
+ extensions: _,
+ } = other;
+
+ mode == mode_other
+ }
+}
+
+impl Eq for CopyOptions {}
+
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
diff --git a/src/limit.rs b/src/limit.rs
index 9efe879..7e1e2b4 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -18,9 +18,9 @@
//! An object store that limits the maximum concurrency of the wrapped
implementation
use crate::{
- BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult,
Result, StreamExt,
- UploadPart,
+ BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload,
ListResult, MultipartUpload,
+ ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions,
PutPayload, PutResult, Result,
+ StreamExt, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -151,9 +151,9 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.list_with_delimiter(prefix).await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.copy(from, to).await
+ self.inner.copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -161,11 +161,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.rename(from, to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.copy_if_not_exists(from, to).await
- }
-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.rename_if_not_exists(from, to).await
diff --git a/src/local.rs b/src/local.rs
index 0bd0c88..ebe9527 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -40,6 +40,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
+use crate::{CopyMode, CopyOptions};
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
@@ -189,7 +190,7 @@ impl From<Error> for super::Error {
///
/// # Cross-Filesystem Copy
///
-/// [`LocalFileSystem::copy`] is implemented using [`std::fs::hard_link`], and
therefore
+/// [`LocalFileSystem::copy_opts`] is implemented using
[`std::fs::hard_link`], and therefore
/// does not support copying across filesystem boundaries.
///
#[derive(Debug)]
@@ -533,38 +534,80 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
+
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
- let mut id = 0;
- // In order to make this atomic we:
- //
- // - hard link to a hidden temporary file
- // - atomically rename this temporary file into place
- //
- // This is necessary because hard_link returns an error if the
destination already exists
- maybe_spawn_blocking(move || {
- loop {
- let staged = staged_upload_path(&to, &id.to_string());
- match std::fs::hard_link(&from, &staged) {
- Ok(_) => {
- return std::fs::rename(&staged, &to).map_err(|source| {
- let _ = std::fs::remove_file(&staged); // Attempt
to clean up
- Error::UnableToCopyFile { from, to, source }.into()
- });
+
+ match mode {
+ CopyMode::Overwrite => {
+ let mut id = 0;
+ // In order to make this atomic we:
+ //
+ // - hard link to a hidden temporary file
+ // - atomically rename this temporary file into place
+ //
+ // This is necessary because hard_link returns an error if the
destination already exists
+ maybe_spawn_blocking(move || {
+ loop {
+ let staged = staged_upload_path(&to, &id.to_string());
+ match std::fs::hard_link(&from, &staged) {
+ Ok(_) => {
+ return std::fs::rename(&staged,
&to).map_err(|source| {
+ let _ = std::fs::remove_file(&staged); //
Attempt to clean up
+ Error::UnableToCopyFile { from, to, source
}.into()
+ });
+ }
+ Err(source) => match source.kind() {
+ ErrorKind::AlreadyExists => id += 1,
+ ErrorKind::NotFound => match from.exists() {
+ true => create_parent_dirs(&to, source)?,
+ false => {
+ return Err(Error::NotFound { path:
from, source }.into());
+ }
+ },
+ _ => {
+ return Err(Error::UnableToCopyFile { from,
to, source }.into());
+ }
+ },
+ }
}
- Err(source) => match source.kind() {
- ErrorKind::AlreadyExists => id += 1,
- ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
- false => return Err(Error::NotFound { path: from,
source }.into()),
- },
- _ => return Err(Error::UnableToCopyFile { from, to,
source }.into()),
- },
- }
+ })
+ .await
}
- })
- .await
+ CopyMode::Create => {
+ maybe_spawn_blocking(move || {
+ loop {
+ match std::fs::hard_link(&from, &to) {
+ Ok(_) => return Ok(()),
+ Err(source) => match source.kind() {
+ ErrorKind::AlreadyExists => {
+ return Err(Error::AlreadyExists {
+ path: to.to_str().unwrap().to_string(),
+ source,
+ }
+ .into());
+ }
+ ErrorKind::NotFound => match from.exists() {
+ true => create_parent_dirs(&to, source)?,
+ false => {
+ return Err(Error::NotFound { path:
from, source }.into());
+ }
+ },
+ _ => {
+ return Err(Error::UnableToCopyFile { from,
to, source }.into());
+ }
+ },
+ }
+ }
+ })
+ .await
+ }
+ }
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -586,34 +629,6 @@ impl ObjectStore for LocalFileSystem {
})
.await
}
-
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let from = self.path_to_filesystem(from)?;
- let to = self.path_to_filesystem(to)?;
-
- maybe_spawn_blocking(move || {
- loop {
- match std::fs::hard_link(&from, &to) {
- Ok(_) => return Ok(()),
- Err(source) => match source.kind() {
- ErrorKind::AlreadyExists => {
- return Err(Error::AlreadyExists {
- path: to.to_str().unwrap().to_string(),
- source,
- }
- .into());
- }
- ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
- false => return Err(Error::NotFound { path: from,
source }.into()),
- },
- _ => return Err(Error::UnableToCopyFile { from, to,
source }.into()),
- },
- }
- }
- })
- .await
- }
}
impl LocalFileSystem {
diff --git a/src/memory.rs b/src/memory.rs
index 9cc084f..08e41c2 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -33,7 +33,7 @@ use crate::{
ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions,
PutResult, Result,
UpdateVersion, UploadPart, path::Path,
};
-use crate::{GetOptions, PutPayload};
+use crate::{CopyMode, CopyOptions, GetOptions, PutPayload};
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, thiserror::Error)]
@@ -391,24 +391,30 @@ impl ObjectStore for InMemory {
})
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- let entry = self.entry(from)?;
- self.storage
- .write()
- .insert(to, entry.data, entry.attributes);
- Ok(())
- }
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
+ let CopyOptions {
+ mode,
+ extensions: _,
+ } = options;
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let entry = self.entry(from)?;
let mut storage = self.storage.write();
- if storage.map.contains_key(to) {
- return Err(Error::AlreadyExists {
- path: to.to_string(),
+
+ match mode {
+ CopyMode::Overwrite => {
+ storage.insert(to, entry.data, entry.attributes);
+ }
+ CopyMode::Create => {
+ if storage.map.contains_key(to) {
+ return Err(Error::AlreadyExists {
+ path: to.to_string(),
+ }
+ .into());
+ }
+ storage.insert(to, entry.data, entry.attributes);
}
- .into());
}
- storage.insert(to, entry.data, entry.attributes);
+
Ok(())
}
}
diff --git a/src/prefix.rs b/src/prefix.rs
index 392cfac..c37f55d 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -22,7 +22,7 @@ use std::ops::Range;
use crate::path::Path;
use crate::{
- GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
+ CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
};
@@ -182,10 +182,10 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
})
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
let full_from = self.full_path(from);
let full_to = self.full_path(to);
- self.inner.copy(&full_from, &full_to).await
+ self.inner.copy_opts(&full_from, &full_to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -194,12 +194,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.rename(&full_from, &full_to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let full_from = self.full_path(from);
- let full_to = self.full_path(to);
- self.inner.copy_if_not_exists(&full_from, &full_to).await
- }
-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
let full_from = self.full_path(from);
let full_to = self.full_path(to);
diff --git a/src/throttle.rs b/src/throttle.rs
index e5fade5..bd5795e 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -21,7 +21,7 @@ use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
use crate::multipart::{MultipartStore, PartId};
-use crate::{GetOptions, UploadPart};
+use crate::{CopyOptions, GetOptions, UploadPart};
use crate::{
GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
@@ -255,10 +255,10 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
}
}
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) ->
Result<()> {
sleep(self.config().wait_put_per_call).await;
- self.inner.copy(from, to).await
+ self.inner.copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
@@ -267,12 +267,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.rename(from, to).await
}
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- sleep(self.config().wait_put_per_call).await;
-
- self.inner.copy_if_not_exists(from, to).await
- }
-
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>
{
sleep(self.config().wait_put_per_call).await;
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index d39d972..df0a414 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -77,11 +77,7 @@ impl ObjectStore for MyStore {
todo!()
}
- async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
- todo!()
- }
-
- async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> Result<()> {
+ async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) ->
Result<()> {
todo!()
}
}