This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 844b8519bf Implement `copy_if_not_exist` for `AmazonS3` using DynamoDB 
(#4880) (#4918)
844b8519bf is described below

commit 844b8519bf4d49ec807d49db9fc49a9423c6dcb9
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Dec 26 13:06:13 2023 +0000

    Implement `copy_if_not_exist` for `AmazonS3` using DynamoDB (#4880) (#4918)
    
    * Implement DynamoDBLock (#4880)
    
    * Cleanup error handling
    
    * Clippy
    
    * Localstack support
    
    * Clippy
    
    * Handle integration test concurrency
    
    * More docs
    
    * Disable request timeout
    
    * Fix merge conflicts
    
    * Reduce test concurrency
    
    * Increase timeouts
---
 .github/workflows/object_store.yml   |   2 +
 object_store/src/aws/builder.rs      |  24 +-
 object_store/src/aws/client.rs       | 179 ++++-------
 object_store/src/aws/dynamo.rs       | 567 +++++++++++++++++++++++++++++++++++
 object_store/src/aws/mod.rs          |  59 +++-
 object_store/src/aws/precondition.rs |  17 ++
 6 files changed, 702 insertions(+), 146 deletions(-)

diff --git a/.github/workflows/object_store.yml 
b/.github/workflows/object_store.yml
index ecffa29b06..313d158090 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -112,6 +112,7 @@ jobs:
       AWS_SECRET_ACCESS_KEY: test
       AWS_ENDPOINT: http://localhost:4566
       AWS_ALLOW_HTTP: true
+      AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
       HTTP_URL: "http://localhost:8080";
       GOOGLE_BUCKET: test-bucket
       GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
@@ -136,6 +137,7 @@ jobs:
           docker run -d -p 4566:4566 localstack/localstack:3.0.1
           docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 
--imdsv2
           aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
+          aws --endpoint-url=http://localhost:4566 dynamodb create-table 
--table-name test-table --key-schema AttributeName=key,KeyType=HASH 
--attribute-definitions AttributeName=key,AttributeType=S 
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
 
       - name: Configure Azurite (Azure emulation)
         # the magical connection string is from
diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 596ff99b0e..5f7f1c945a 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -821,27 +821,23 @@ impl AmazonS3Builder {
             )) as _
         };
 
-        let endpoint: String;
-        let bucket_endpoint: String;
-
         // If `endpoint` is provided then its assumed to be consistent with
         // `virtual_hosted_style_request`. i.e. if 
`virtual_hosted_style_request` is true then
         // `endpoint` should have bucket name included.
-        if self.virtual_hosted_style_request.get()? {
-            endpoint = self
-                .endpoint
-                .unwrap_or_else(|| 
format!("https://{bucket}.s3.{region}.amazonaws.com";));
-            bucket_endpoint = endpoint.clone();
+        let bucket_endpoint = if self.virtual_hosted_style_request.get()? {
+            self.endpoint
+                .clone()
+                .unwrap_or_else(|| 
format!("https://{bucket}.s3.{region}.amazonaws.com";))
         } else {
-            endpoint = self
-                .endpoint
-                .unwrap_or_else(|| 
format!("https://s3.{region}.amazonaws.com";));
-            bucket_endpoint = format!("{endpoint}/{bucket}");
-        }
+            match &self.endpoint {
+                None => format!("https://s3.{region}.amazonaws.com/{bucket}";),
+                Some(endpoint) => format!("{endpoint}/{bucket}"),
+            }
+        };
 
         let config = S3Config {
             region,
-            endpoint,
+            endpoint: self.endpoint,
             bucket,
             bucket_endpoint,
             credentials,
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index ecbe556c6d..45d97ead6d 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -21,7 +21,7 @@ use crate::aws::{
     AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, 
STRICT_PATH_ENCODE_SET,
 };
 use crate::client::get::GetClient;
-use crate::client::header::HeaderConfig;
+use crate::client::header::{get_etag, HeaderConfig};
 use crate::client::header::{get_put_result, get_version};
 use crate::client::list::ListClient;
 use crate::client::retry::RetryExt;
@@ -39,6 +39,7 @@ use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
+use hyper::http;
 use hyper::http::HeaderName;
 use itertools::Itertools;
 use percent_encoding::{utf8_percent_encode, PercentEncode};
@@ -57,30 +58,12 @@ const VERSION_HEADER: &str = "x-amz-version-id";
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
 pub(crate) enum Error {
-    #[snafu(display("Error performing get request {}: {}", path, source))]
-    GetRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Error fetching get response body {}: {}", path, source))]
     GetResponseBody {
         source: reqwest::Error,
         path: String,
     },
 
-    #[snafu(display("Error performing put request {}: {}", path, source))]
-    PutRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
-    #[snafu(display("Error performing delete request {}: {}", path, source))]
-    DeleteRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Error performing DeleteObjects request: {}", source))]
     DeleteObjectsRequest { source: crate::client::retry::Error },
 
@@ -104,12 +87,6 @@ pub(crate) enum Error {
         source: Box<dyn std::error::Error + Send + Sync + 'static>,
     },
 
-    #[snafu(display("Error performing copy request {}: {}", path, source))]
-    CopyRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Error performing list request: {}", source))]
     ListRequest { source: crate::client::retry::Error },
 
@@ -142,15 +119,9 @@ pub(crate) enum Error {
 
 impl From<Error> for crate::Error {
     fn from(err: Error) -> Self {
-        match err {
-            Error::GetRequest { source, path }
-            | Error::DeleteRequest { source, path }
-            | Error::CopyRequest { source, path }
-            | Error::PutRequest { source, path } => source.error(STORE, path),
-            _ => Self::Generic {
-                store: STORE,
-                source: Box::new(err),
-            },
+        Self::Generic {
+            store: STORE,
+            source: Box::new(err),
         }
     }
 }
@@ -196,7 +167,7 @@ impl From<DeleteError> for Error {
 #[derive(Debug)]
 pub struct S3Config {
     pub region: String,
-    pub endpoint: String,
+    pub endpoint: Option<String>,
     pub bucket: String,
     pub bucket_endpoint: String,
     pub credentials: AwsCredentialProvider,
@@ -215,7 +186,7 @@ impl S3Config {
         format!("{}/{}", self.bucket_endpoint, encode_path(path))
     }
 
-    async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
+    pub(crate) async fn get_credential(&self) -> 
Result<Option<Arc<AwsCredential>>> {
         Ok(match self.skip_signature {
             false => Some(self.credentials.get_credential().await?),
             true => None,
@@ -223,30 +194,52 @@ impl S3Config {
     }
 }
 
-/// A builder for a put request allowing customisation of the headers and 
query string
-pub(crate) struct PutRequest<'a> {
+#[derive(Debug, Snafu)]
+pub enum RequestError {
+    #[snafu(context(false))]
+    Generic { source: crate::Error },
+    Retry {
+        source: crate::client::retry::Error,
+        path: String,
+    },
+}
+
+impl From<RequestError> for crate::Error {
+    fn from(value: RequestError) -> Self {
+        match value {
+            RequestError::Generic { source } => source,
+            RequestError::Retry { source, path } => source.error(STORE, path),
+        }
+    }
+}
+
+/// A builder for a request allowing customisation of the headers and query 
string
+pub(crate) struct Request<'a> {
     path: &'a Path,
     config: &'a S3Config,
     builder: RequestBuilder,
     payload_sha256: Option<Vec<u8>>,
 }
 
-impl<'a> PutRequest<'a> {
+impl<'a> Request<'a> {
     pub fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
         let builder = self.builder.query(query);
         Self { builder, ..self }
     }
 
-    pub fn header(self, k: &HeaderName, v: &str) -> Self {
+    pub fn header<K>(self, k: K, v: &str) -> Self
+    where
+        HeaderName: TryFrom<K>,
+        <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
+    {
         let builder = self.builder.header(k, v);
         Self { builder, ..self }
     }
 
-    pub async fn send(self) -> Result<PutResult> {
+    pub async fn send(self) -> Result<Response, RequestError> {
         let credential = self.config.get_credential().await?;
-
-        let response = self
-            .builder
+        let path = self.path.as_ref();
+        self.builder
             .with_aws_sigv4(
                 credential.as_deref(),
                 &self.config.region,
@@ -256,18 +249,19 @@ impl<'a> PutRequest<'a> {
             )
             .send_retry(&self.config.retry_config)
             .await
-            .context(PutRequestSnafu {
-                path: self.path.as_ref(),
-            })?;
+            .context(RetrySnafu { path })
+    }
 
+    pub async fn do_put(self) -> Result<PutResult> {
+        let response = self.send().await?;
         Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
 }
 
 #[derive(Debug)]
 pub(crate) struct S3Client {
-    config: S3Config,
-    client: ReqwestClient,
+    pub config: S3Config,
+    pub client: ReqwestClient,
 }
 
 impl S3Client {
@@ -276,20 +270,15 @@ impl S3Client {
         Ok(Self { config, client })
     }
 
-    /// Returns the config
-    pub fn config(&self) -> &S3Config {
-        &self.config
-    }
-
     /// Make an S3 PUT request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
     ///
     /// Returns the ETag
-    pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> 
PutRequest<'a> {
+    pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> 
Request<'a> {
         let url = self.config.path_url(path);
         let mut builder = self.client.request(Method::PUT, url);
         let mut payload_sha256 = None;
 
-        if let Some(checksum) = self.config().checksum {
+        if let Some(checksum) = self.config.checksum {
             let digest = checksum.digest(&bytes);
             builder = builder.header(checksum.header_name(), 
BASE64_STANDARD.encode(&digest));
             if checksum == Checksum::SHA256 {
@@ -302,11 +291,11 @@ impl S3Client {
             false => builder.body(bytes),
         };
 
-        if let Some(value) = 
self.config().client_options.get_content_type(path) {
+        if let Some(value) = self.config.client_options.get_content_type(path) 
{
             builder = builder.header(CONTENT_TYPE, value);
         }
 
-        PutRequest {
+        Request {
             path,
             builder,
             payload_sha256,
@@ -335,9 +324,7 @@ impl S3Client {
             )
             .send_retry(&self.config.retry_config)
             .await
-            .context(DeleteRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .map_err(|e| e.error(STORE, path.to_string()))?;
 
         Ok(())
     }
@@ -400,7 +387,7 @@ impl S3Client {
 
         // Compute checksum - S3 *requires* this for DeleteObjects requests, 
so we default to
         // their algorithm if the user hasn't specified one.
-        let checksum = self.config().checksum.unwrap_or(Checksum::SHA256);
+        let checksum = self.config.checksum.unwrap_or(Checksum::SHA256);
         let digest = checksum.digest(&body);
         builder = builder.header(checksum.header_name(), 
BASE64_STANDARD.encode(&digest));
         let payload_sha256 = if checksum == Checksum::SHA256 {
@@ -451,60 +438,21 @@ impl S3Client {
     }
 
     /// Make an S3 Copy request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
-    pub async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) 
-> Result<()> {
-        let credential = self.config.get_credential().await?;
+    pub fn copy_request<'a>(&'a self, from: &'a Path, to: &Path) -> 
Request<'a> {
         let url = self.config.path_url(to);
         let source = format!("{}/{}", self.config.bucket, encode_path(from));
 
-        let mut builder = self
+        let builder = self
             .client
             .request(Method::PUT, url)
             .header("x-amz-copy-source", source);
 
-        if !overwrite {
-            match &self.config.copy_if_not_exists {
-                Some(S3CopyIfNotExists::Header(k, v)) => {
-                    builder = builder.header(k, v);
-                }
-                Some(S3CopyIfNotExists::HeaderWithStatus(k, v, _)) => {
-                    builder = builder.header(k, v);
-                }
-                None => {
-                    return Err(crate::Error::NotSupported {
-                        source: "S3 does not support 
copy-if-not-exists".to_string().into(),
-                    })
-                }
-            }
+        Request {
+            builder,
+            path: from,
+            config: &self.config,
+            payload_sha256: None,
         }
-
-        let precondition_failure = match &self.config.copy_if_not_exists {
-            Some(S3CopyIfNotExists::HeaderWithStatus(_, _, code)) => *code,
-            _ => reqwest::StatusCode::PRECONDITION_FAILED,
-        };
-
-        builder
-            .with_aws_sigv4(
-                credential.as_deref(),
-                &self.config.region,
-                "s3",
-                self.config.sign_payload,
-                None,
-            )
-            .send_retry(&self.config.retry_config)
-            .await
-            .map_err(|source| match source.status() {
-                Some(error) if error == precondition_failure => 
crate::Error::AlreadyExists {
-                    source: Box::new(source),
-                    path: to.to_string(),
-                },
-                _ => Error::CopyRequest {
-                    source,
-                    path: from.to_string(),
-                }
-                .into(),
-            })?;
-
-        Ok(())
     }
 
     pub async fn create_multipart(&self, location: &Path) -> 
Result<MultipartId> {
@@ -543,15 +491,14 @@ impl S3Client {
     ) -> Result<PartId> {
         let part = (part_idx + 1).to_string();
 
-        let result = self
+        let response = self
             .put_request(path, data)
             .query(&[("partNumber", &part), ("uploadId", upload_id)])
             .send()
             .await?;
 
-        Ok(PartId {
-            content_id: result.e_tag.unwrap(),
-        })
+        let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
+        Ok(PartId { content_id })
     }
 
     pub async fn complete_multipart(
@@ -614,9 +561,7 @@ impl S3Client {
             )
             .send_retry(&self.config.retry_config)
             .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .map_err(|e| e.error(STORE, path.to_string()))?;
         Ok(response)
     }
 }
@@ -657,9 +602,7 @@ impl GetClient for S3Client {
             )
             .send_retry(&self.config.retry_config)
             .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .map_err(|e| e.error(STORE, path.to_string()))?;
 
         Ok(response)
     }
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
new file mode 100644
index 0000000000..ce1500bf40
--- /dev/null
+++ b/object_store/src/aws/dynamo.rs
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A DynamoDB based lock system
+
+use std::collections::HashMap;
+use std::time::{Duration, Instant};
+
+use chrono::Utc;
+use reqwest::{Response, StatusCode};
+use serde::ser::SerializeMap;
+use serde::{Deserialize, Serialize, Serializer};
+
+use crate::aws::client::S3Client;
+use crate::aws::credential::CredentialExt;
+use crate::aws::AwsCredential;
+use crate::client::get::GetClientExt;
+use crate::client::retry::Error as RetryError;
+use crate::client::retry::RetryExt;
+use crate::path::Path;
+use crate::{Error, GetOptions, Result};
+
+/// The exception returned by DynamoDB on conflict
+const CONFLICT: &str = "ConditionalCheckFailedException";
+
+const STORE: &str = "DynamoDB";
+
+/// A DynamoDB-based commit protocol, used to provide conditional write 
support for S3
+///
+/// ## Limitations
+///
+/// Only conditional operations, e.g. `copy_if_not_exists` will be 
synchronized, and can
+/// therefore race with non-conditional operations, e.g. `put`, `copy`, 
`delete`, or
+/// conditional operations performed by writers not configured to synchronize 
with DynamoDB.
+///
+/// Workloads making use of this mechanism **must** ensure:
+///
+/// * Conditional and non-conditional operations are not performed on the same 
paths
+/// * Conditional operations are only performed via similarly configured 
clients
+///
+/// Additionally as the locking mechanism relies on timeouts to detect stale 
locks,
+/// performance will be poor for systems that frequently delete and then create
+/// objects at the same path, instead being optimised for systems that 
primarily create
+/// files with paths never used before, or perform conditional updates to 
existing files
+///
+/// ## Commit Protocol
+///
+/// The DynamoDB schema is as follows:
+///
+/// * A string hash key named `"key"`
+/// * A numeric [TTL] attribute named `"ttl"`
+/// * A numeric attribute named `"generation"`
+/// * A numeric attribute named `"timeout"`
+///
+/// To perform a conditional operation on an object with a given `path` and 
`etag` (if exists),
+/// the commit protocol is as follows:
+///
+/// 1. Perform HEAD request on `path` and error on precondition mismatch
+/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured 
timeout
+///     1. On Success: Perform operation with the configured timeout
+///     2. On Conflict:
+///         1. Periodically re-perform HEAD request on `path` and error on 
precondition mismatch
+///         2. If `timeout * max_skew_rate` passed, replace the record 
incrementing the `"generation"`
+///             1. On Success: GOTO 2.1
+///             2. On Conflict: GOTO 2.2
+///
+/// Provided no writer modifies an object with a given `path` and `etag` 
without first adding a
+/// corresponding record to DynamoDB, we are guaranteed that only one writer 
will ever commit.
+///
+/// This is inspired by the [DynamoDB Lock Client] but simplified for the more 
limited
+/// requirements of synchronizing object storage. The major changes are:
+///
+/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
+///     * Cheaper to generate, serialize and compare
+///     * Cannot collide
+///     * More human readable / interpretable
+/// * Relies on [TTL] to eventually clean up old locks
+///
+/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit 
protocol, but
+/// generalised to not make assumptions about the workload and not rely on 
first writing
+/// to a temporary path.
+///
+/// [TTL]: 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
+/// [DynamoDB Lock Client]: 
https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
+/// [S3 Multi-Cluster]: 
https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
+#[derive(Debug, Clone, Eq, PartialEq)]
+pub struct DynamoCommit {
+    table_name: String,
+    /// The number of milliseconds a lease is valid for
+    timeout: u64,
+    /// The maximum clock skew rate tolerated by the system
+    max_clock_skew_rate: u32,
+    /// The length of time a record will be retained in DynamoDB before being 
cleaned up
+    ///
+    /// This is purely an optimisation to avoid indefinite growth of the 
DynamoDB table
+    /// and does not impact how long clients may wait to acquire a lock
+    ttl: Duration,
+    /// The backoff duration before retesting a condition
+    test_interval: Duration,
+}
+
+impl DynamoCommit {
+    /// Create a new [`DynamoCommit`] with a given table name
+    pub fn new(table_name: String) -> Self {
+        Self {
+            table_name,
+            timeout: 20_000,
+            max_clock_skew_rate: 3,
+            ttl: Duration::from_secs(60 * 60),
+            test_interval: Duration::from_millis(100),
+        }
+    }
+
+    /// Overrides the lock timeout.
+    ///
+    /// A longer lock timeout reduces the probability of spurious commit 
failures and multi-writer
+    /// races, but will increase the time that writers must wait to reclaim a 
lock lost. The
+    /// default value of 20 seconds should be appropriate for must use-cases.
+    pub fn with_timeout(mut self, millis: u64) -> Self {
+        self.timeout = millis;
+        self
+    }
+
+    /// The maximum clock skew rate tolerated by the system.
+    ///
+    /// An environment in which the clock on the fastest node ticks twice as 
fast as the slowest
+    /// node, would have a clock skew rate of 2. The default value of 3 should 
be appropriate
+    /// for most environments.
+    pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
+        self.max_clock_skew_rate = rate;
+        self
+    }
+
+    /// The length of time a record should be retained in DynamoDB before 
being cleaned up
+    ///
+    /// This should be significantly larger than the configured lock timeout, 
with the default
+    /// value of 1 hour appropriate for most use-cases.
+    pub fn with_ttl(mut self, ttl: Duration) -> Self {
+        self.ttl = ttl;
+        self
+    }
+
+    /// Returns the name of the DynamoDB table.
+    pub(crate) fn table_name(&self) -> &str {
+        &self.table_name
+    }
+
+    pub(crate) async fn copy_if_not_exists(
+        &self,
+        client: &S3Client,
+        from: &Path,
+        to: &Path,
+    ) -> Result<()> {
+        check_not_exists(client, to).await?;
+
+        let mut previous_lease = None;
+
+        loop {
+            let existing = previous_lease.as_ref();
+            match self.try_lock(client, to.as_ref(), existing).await? {
+                TryLockResult::Ok(lease) => {
+                    let fut = client.copy_request(from, to).send();
+                    let expiry = lease.acquire + lease.timeout;
+                    return match tokio::time::timeout_at(expiry.into(), 
fut).await {
+                        Ok(Ok(_)) => Ok(()),
+                        Ok(Err(e)) => Err(e.into()),
+                        Err(_) => Err(Error::Generic {
+                            store: "DynamoDB",
+                            source: format!(
+                                "Failed to perform copy operation in {} 
milliseconds",
+                                self.timeout
+                            )
+                            .into(),
+                        }),
+                    };
+                }
+                TryLockResult::Conflict(conflict) => {
+                    let mut interval = 
tokio::time::interval(self.test_interval);
+                    let expiry = conflict.timeout * self.max_clock_skew_rate;
+                    loop {
+                        interval.tick().await;
+                        check_not_exists(client, to).await?;
+                        if conflict.acquire.elapsed() > expiry {
+                            previous_lease = Some(conflict);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /// Retrieve a lock, returning an error if it doesn't exist
+    async fn get_lock(&self, s3: &S3Client, key: &str) -> Result<Lease> {
+        let key_attributes = [("key", AttributeValue::String(key))];
+        let req = GetItem {
+            table_name: &self.table_name,
+            key: Map(&key_attributes),
+        };
+        let credential = s3.config.get_credential().await?;
+
+        let resp = self
+            .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", 
req)
+            .await
+            .map_err(|e| e.error(STORE, key.to_string()))?;
+
+        let body = resp.bytes().await.map_err(|e| Error::Generic {
+            store: STORE,
+            source: Box::new(e),
+        })?;
+
+        let response: GetItemResponse<'_> =
+            serde_json::from_slice(body.as_ref()).map_err(|e| Error::Generic {
+                store: STORE,
+                source: Box::new(e),
+            })?;
+
+        extract_lease(&response.item).ok_or_else(|| Error::NotFound {
+            path: key.into(),
+            source: "DynamoDB GetItem returned no items".to_string().into(),
+        })
+    }
+
+    /// Attempt to acquire a lock, reclaiming an existing lease if provided
+    async fn try_lock(
+        &self,
+        s3: &S3Client,
+        key: &str,
+        existing: Option<&Lease>,
+    ) -> Result<TryLockResult> {
+        let attributes;
+        let (next_gen, condition_expression, expression_attribute_values) = 
match existing {
+            None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
+            Some(existing) => {
+                attributes = [(":g", 
AttributeValue::Number(existing.generation))];
+                (
+                    existing.generation.checked_add(1).unwrap(),
+                    "attribute_exists(#pk) AND generation = :g",
+                    Map(attributes.as_slice()),
+                )
+            }
+        };
+
+        let ttl = (Utc::now() + self.ttl).timestamp();
+        let items = [
+            ("key", AttributeValue::String(key)),
+            ("generation", AttributeValue::Number(next_gen)),
+            ("timeout", AttributeValue::Number(self.timeout)),
+            ("ttl", AttributeValue::Number(ttl as _)),
+        ];
+        let names = [("#pk", "key")];
+
+        let req = PutItem {
+            table_name: &self.table_name,
+            condition_expression,
+            expression_attribute_values,
+            expression_attribute_names: Map(&names),
+            item: Map(&items),
+            return_values: None,
+            return_values_on_condition_check_failure: 
Some(ReturnValues::AllOld),
+        };
+
+        let credential = s3.config.get_credential().await?;
+
+        let acquire = Instant::now();
+        match self
+            .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", 
req)
+            .await
+        {
+            Ok(_) => Ok(TryLockResult::Ok(Lease {
+                acquire,
+                generation: next_gen,
+                timeout: Duration::from_millis(self.timeout),
+            })),
+            Err(e) => match parse_error_response(&e) {
+                Some(e) if e.error.ends_with(CONFLICT) => match 
extract_lease(&e.item) {
+                    Some(lease) => Ok(TryLockResult::Conflict(lease)),
+                    // ReturnValuesOnConditionCheckFailure is a relatively 
recent addition
+                    // to DynamoDB and is not supported by dynamodb-local, 
which is used
+                    // by localstack. In such cases the conflict error will 
not contain
+                    // the conflicting item, and we must instead perform a get 
request
+                    //
+                    // There is a potential race here if the conflicting 
record is removed
+                    // before we retrieve it. We could retry the transaction 
in such a scenario,
+                    // but as this only occurs for emulators, we simply abort 
with a
+                    // not found error
+                    //
+                    // 
<https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/>
+                    // 
<https://repost.aws/questions/QUNfADrK4RT6WHe61RzTK8aw/dynamodblocal-support-for-returnvaluesonconditioncheckfailure-for-single-write-operations>
+                    // <https://github.com/localstack/localstack/issues/9040>
+                    None => Ok(TryLockResult::Conflict(self.get_lock(s3, 
key).await?)),
+                },
+                _ => Err(Error::Generic {
+                    store: STORE,
+                    source: Box::new(e),
+                }),
+            },
+        }
+    }
+
+    async fn request<R: Serialize + Send + Sync>(
+        &self,
+        s3: &S3Client,
+        cred: Option<&AwsCredential>,
+        target: &str,
+        req: R,
+    ) -> Result<Response, RetryError> {
+        let region = &s3.config.region;
+
+        let builder = match &s3.config.endpoint {
+            Some(e) => s3.client.post(e),
+            None => {
+                let url = format!("https://dynamodb.{region}.amazonaws.com";);
+                s3.client.post(url)
+            }
+        };
+
+        builder
+            .timeout(Duration::from_millis(self.timeout))
+            .json(&req)
+            .header("X-Amz-Target", target)
+            .with_aws_sigv4(cred, region, "dynamodb", true, None)
+            .send_retry(&s3.config.retry_config)
+            .await
+    }
+}
+
+#[derive(Debug)]
+enum TryLockResult {
+    /// Successfully acquired a lease
+    Ok(Lease),
+    /// An existing lease was found
+    Conflict(Lease),
+}
+
+/// Returns an [`Error::AlreadyExists`] if `path` exists
+async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> {
+    let options = GetOptions {
+        head: true,
+        ..Default::default()
+    };
+    match client.get_opts(path, options).await {
+        Ok(_) => Err(Error::AlreadyExists {
+            path: path.to_string(),
+            source: "Already Exists".to_string().into(),
+        }),
+        Err(Error::NotFound { .. }) => Ok(()),
+        Err(e) => Err(e),
+    }
+}
+
+/// Parses the error response if any
+fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
+    match e {
+        RetryError::Client {
+            status: StatusCode::BAD_REQUEST,
+            body: Some(b),
+        } => serde_json::from_str(b).ok(),
+        _ => None,
+    }
+}
+
+/// Extracts a lease from `item`, returning `None` on error
+fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
+    let generation = match item.get("generation") {
+        Some(AttributeValue::Number(generation)) => generation,
+        _ => return None,
+    };
+
+    let timeout = match item.get("timeout") {
+        Some(AttributeValue::Number(timeout)) => *timeout,
+        _ => return None,
+    };
+
+    Some(Lease {
+        acquire: Instant::now(),
+        generation: *generation,
+        timeout: Duration::from_millis(timeout),
+    })
+}
+
+/// A lock lease
+#[derive(Debug, Clone)]
+struct Lease {
+    acquire: Instant,
+    generation: u64,
+    timeout: Duration,
+}
+
+/// A DynamoDB [PutItem] payload
+///
+/// [PutItem]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
+#[derive(Serialize)]
+#[serde(rename_all = "PascalCase")]
+struct PutItem<'a> {
+    /// The table name
+    table_name: &'a str,
+
+    /// A condition that must be satisfied in order for a conditional PutItem 
operation to succeed.
+    condition_expression: &'a str,
+
+    /// One or more substitution tokens for attribute names in an expression
+    expression_attribute_names: Map<'a, &'a str, &'a str>,
+
+    /// One or more values that can be substituted in an expression
+    expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
+
+    /// A map of attribute name/value pairs, one for each attribute
+    item: Map<'a, &'a str, AttributeValue<'a>>,
+
+    /// Use ReturnValues if you want to get the item attributes as they 
appeared
+    /// before they were updated with the PutItem request.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    return_values: Option<ReturnValues>,
+
+    /// An optional parameter that returns the item attributes for a PutItem 
operation
+    /// that failed a condition check.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    return_values_on_condition_check_failure: Option<ReturnValues>,
+}
+
+/// A DynamoDB [GetItem] payload
+///
+/// [GetItem]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html
+#[derive(Serialize)]
+#[serde(rename_all = "PascalCase")]
+struct GetItem<'a> {
+    /// The table name
+    table_name: &'a str,
+    /// The primary key
+    key: Map<'a, &'a str, AttributeValue<'a>>,
+}
+
+#[derive(Deserialize)]
+struct GetItemResponse<'a> {
+    #[serde(borrow, default, rename = "Item")]
+    item: HashMap<&'a str, AttributeValue<'a>>,
+}
+
+#[derive(Deserialize)]
+struct ErrorResponse<'a> {
+    #[serde(rename = "__type")]
+    error: &'a str,
+
+    #[serde(borrow, default, rename = "Item")]
+    item: HashMap<&'a str, AttributeValue<'a>>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum ReturnValues {
+    AllOld,
+}
+
+/// A collection of key value pairs
+///
+/// This provides cheap, ordered serialization of maps
+struct Map<'a, K, V>(&'a [(K, V)]);
+
+impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        if self.0.is_empty() {
+            return serializer.serialize_none();
+        }
+        let mut map = serializer.serialize_map(Some(self.0.len()))?;
+        for (k, v) in self.0 {
+            map.serialize_entry(k, v)?
+        }
+        map.end()
+    }
+}
+
+/// A DynamoDB [AttributeValue]
+///
+/// [AttributeValue]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
+#[derive(Debug, Serialize, Deserialize)]
+enum AttributeValue<'a> {
+    #[serde(rename = "S")]
+    String(&'a str),
+    #[serde(rename = "N", with = "number")]
+    Number(u64),
+}
+
+/// Numbers are serialized as strings
+mod number {
+    use serde::{Deserialize, Deserializer, Serializer};
+
+    pub fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok, S::Error> {
+        s.serialize_str(&v.to_string())
+    }
+
+    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64, 
D::Error> {
+        let v: &str = Deserialize::deserialize(d)?;
+        v.parse().map_err(serde::de::Error::custom)
+    }
+}
+
+/// Re-export integration_test to be called by s3_test
+#[cfg(test)]
+pub(crate) use tests::integration_test;
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::aws::AmazonS3;
+    use crate::ObjectStore;
+
+    #[test]
+    fn test_attribute_serde() {
+        let serde = 
serde_json::to_string(&AttributeValue::Number(23)).unwrap();
+        assert_eq!(serde, "{\"N\":\"23\"}");
+        let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
+        assert!(matches!(back, AttributeValue::Number(23)));
+    }
+
+    /// An integration test for DynamoDB
+    ///
+    /// This is a function called by s3_test to avoid test concurrency issues
+    pub async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) {
+        let client = integration.client.as_ref();
+
+        let src = Path::from("dynamo_path_src");
+        integration.put(&src, "asd".into()).await.unwrap();
+
+        let dst = Path::from("dynamo_path");
+        let _ = integration.delete(&dst).await; // Delete if present
+
+        // Create a lock if not already exists
+        let existing = match d.try_lock(client, dst.as_ref(), 
None).await.unwrap() {
+            TryLockResult::Conflict(l) => l,
+            TryLockResult::Ok(l) => l,
+        };
+
+        // Should not be able to acquire a lock again
+        let r = d.try_lock(client, dst.as_ref(), None).await;
+        assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
+
+        // But should still be able to reclaim lock and perform copy
+        d.copy_if_not_exists(client, &src, &dst).await.unwrap();
+
+        match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
+            TryLockResult::Conflict(new) => {
+                // Should have incremented generation to do so
+                assert_eq!(new.generation, existing.generation + 1);
+            }
+            _ => panic!("Should conflict"),
+        }
+    }
+}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 0985263459..75b43d448b 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -36,12 +36,12 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
-use reqwest::Method;
+use reqwest::{Method, StatusCode};
 use std::{sync::Arc, time::Duration};
 use tokio::io::AsyncWrite;
 use url::Url;
 
-use crate::aws::client::S3Client;
+use crate::aws::client::{RequestError, S3Client};
 use crate::client::get::GetClientExt;
 use crate::client::list::ListClientExt;
 use crate::client::CredentialProvider;
@@ -58,11 +58,13 @@ mod builder;
 mod checksum;
 mod client;
 mod credential;
+mod dynamo;
 mod precondition;
 mod resolve;
 
 pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
 pub use checksum::Checksum;
+pub use dynamo::DynamoCommit;
 pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
 pub use resolve::resolve_bucket_region;
 
@@ -93,19 +95,19 @@ pub struct AmazonS3 {
 
 impl std::fmt::Display for AmazonS3 {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "AmazonS3({})", self.client.config().bucket)
+        write!(f, "AmazonS3({})", self.client.config.bucket)
     }
 }
 
 impl AmazonS3 {
     /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`]
     pub fn credentials(&self) -> &AwsCredentialProvider {
-        &self.client.config().credentials
+        &self.client.config.credentials
     }
 
     /// Create a full URL to the resource specified by `path` with this 
instance's configuration.
     fn path_url(&self, path: &Path) -> String {
-        self.client.config().path_url(path)
+        self.client.config.path_url(path)
     }
 }
 
@@ -145,7 +147,7 @@ impl Signer for AmazonS3 {
     /// ```
     async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url> {
         let credential = self.credentials().get_credential().await?;
-        let authorizer = AwsAuthorizer::new(&credential, "s3", 
&self.client.config().region);
+        let authorizer = AwsAuthorizer::new(&credential, "s3", 
&self.client.config.region);
 
         let path_url = self.path_url(path);
         let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
@@ -164,15 +166,15 @@ impl ObjectStore for AmazonS3 {
     async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
         let mut request = self.client.put_request(location, bytes);
         let tags = opts.tags.encoded();
-        if !tags.is_empty() && !self.client.config().disable_tagging {
+        if !tags.is_empty() && !self.client.config.disable_tagging {
             request = request.header(&TAGS_HEADER, tags);
         }
 
-        match (opts.mode, &self.client.config().conditional_put) {
-            (PutMode::Overwrite, _) => request.send().await,
+        match (opts.mode, &self.client.config.conditional_put) {
+            (PutMode::Overwrite, _) => request.do_put().await,
             (PutMode::Create | PutMode::Update(_), None) => 
Err(Error::NotImplemented),
             (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
-                match request.header(&IF_NONE_MATCH, "*").send().await {
+                match request.header(&IF_NONE_MATCH, "*").do_put().await {
                     // Technically If-None-Match should return NotModified but 
some stores,
                     // such as R2, instead return PreconditionFailed
                     // 
https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject
@@ -190,7 +192,7 @@ impl ObjectStore for AmazonS3 {
                     store: STORE,
                     source: "ETag required for conditional 
put".to_string().into(),
                 })?;
-                request.header(&IF_MATCH, etag.as_str()).send().await
+                request.header(&IF_MATCH, etag.as_str()).do_put().await
             }
         }
     }
@@ -261,11 +263,35 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, true).await
+        self.client.copy_request(from, to).send().await?;
+        Ok(())
     }
 
     async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to, false).await
+        let (k, v, status) = match &self.client.config.copy_if_not_exists {
+            Some(S3CopyIfNotExists::Header(k, v)) => (k, v, 
StatusCode::PRECONDITION_FAILED),
+            Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, 
*status),
+            Some(S3CopyIfNotExists::Dynamo(lock)) => {
+                return lock.copy_if_not_exists(&self.client, from, to).await
+            }
+            None => {
+                return Err(Error::NotSupported {
+                    source: "S3 does not support 
copy-if-not-exists".to_string().into(),
+                })
+            }
+        };
+
+        let req = self.client.copy_request(from, to);
+        match req.header(k, v).send().await {
+            Err(RequestError::Retry { source, path }) if source.status() == 
Some(status) => {
+                Err(Error::AlreadyExists {
+                    source: Box::new(source),
+                    path,
+                })
+            }
+            Err(e) => Err(e.into()),
+            Ok(_) => Ok(()),
+        }
     }
 }
 
@@ -335,7 +361,7 @@ mod tests {
         let config = AmazonS3Builder::from_env();
 
         let integration = config.build().unwrap();
-        let config = integration.client.config();
+        let config = &integration.client.config;
         let test_not_exists = config.copy_if_not_exists.is_some();
         let test_conditional_put = config.conditional_put.is_some();
 
@@ -368,6 +394,11 @@ mod tests {
         let builder = 
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
         let integration = builder.build().unwrap();
         put_get_delete_list_opts(&integration).await;
+
+        match &integration.client.config.copy_if_not_exists {
+            Some(S3CopyIfNotExists::Dynamo(d)) => 
dynamo::integration_test(&integration, d).await,
+            _ => eprintln!("Skipping dynamo integration test - dynamo not 
configured"),
+        };
     }
 
     #[tokio::test]
diff --git a/object_store/src/aws/precondition.rs 
b/object_store/src/aws/precondition.rs
index ada5f3b83f..83d45db82c 100644
--- a/object_store/src/aws/precondition.rs
+++ b/object_store/src/aws/precondition.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::aws::dynamo::DynamoCommit;
 use crate::config::Parse;
 
 use itertools::Itertools;
@@ -45,6 +46,15 @@ pub enum S3CopyIfNotExists {
     ///
     /// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` 
ignoring whitespace
     HeaderWithStatus(String, String, reqwest::StatusCode),
+    /// The name of a DynamoDB table to use for coordination
+    ///
+    /// Encoded as either `dynamodb:<TABLE_NAME>` or 
`dynamodb:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+    /// ignoring whitespace. The default timeout is used if not specified
+    ///
+    /// See [`DynamoCommit`] for more information
+    ///
+    /// This will use the same region, credentials and endpoint as configured 
for S3
+    Dynamo(DynamoCommit),
 }
 
 impl std::fmt::Display for S3CopyIfNotExists {
@@ -54,6 +64,7 @@ impl std::fmt::Display for S3CopyIfNotExists {
             Self::HeaderWithStatus(k, v, code) => {
                 write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
             }
+            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
         }
     }
 }
@@ -77,6 +88,12 @@ impl S3CopyIfNotExists {
                     code,
                 ))
             }
+            "dynamo" => Some(Self::Dynamo(match value.split_once(':') {
+                Some((table_name, timeout)) => 
DynamoCommit::new(table_name.trim().to_string())
+                    .with_timeout(timeout.parse().ok()?),
+                None => DynamoCommit::new(value.trim().to_string()),
+            })),
+
             _ => None,
         }
     }


Reply via email to