This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 96c4c0b1687 Replace AsyncWrite with Upload trait and rename
MultiPartStore to MultipartStore (#5458) (#5500)
96c4c0b1687 is described below
commit 96c4c0b1687867a4809ed4d3391982d4f5f7273e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Mar 20 12:26:54 2024 +1300
Replace AsyncWrite with Upload trait and rename MultiPartStore to
MultipartStore (#5458) (#5500)
* Replace AsyncWrite with Upload trait (#5458)
* Make BufWriter abortable
* Flesh out cloud implementations
* Review feedback
* Misc tweaks and fixes
* Format
* Replace multi-part with multipart
* More docs
* Clippy
* Rename to MultipartUpload
* Rename ChunkedUpload to WriteMultipart
* Doc tweaks
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* Docs
* Format
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
object_store/src/aws/mod.rs | 104 +++++-----
object_store/src/azure/mod.rs | 80 ++++----
object_store/src/buffered.rs | 84 ++++----
object_store/src/chunked.rs | 16 +-
object_store/src/client/mod.rs | 3 +
object_store/src/client/parts.rs | 48 +++++
object_store/src/gcp/client.rs | 2 +-
object_store/src/gcp/mod.rs | 106 ++++++-----
object_store/src/http/mod.rs | 14 +-
object_store/src/lib.rs | 127 ++++---------
object_store/src/limit.rs | 79 ++++----
object_store/src/local.rs | 358 ++++++++++++-----------------------
object_store/src/memory.rs | 95 ++++------
object_store/src/multipart.rs | 243 +-----------------------
object_store/src/prefix.rs | 14 +-
object_store/src/throttle.rs | 16 +-
object_store/src/upload.rs | 175 +++++++++++++++++
object_store/tests/get_range_file.rs | 10 +-
18 files changed, 691 insertions(+), 883 deletions(-)
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b11f4513b6d..b33771de9a8 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -17,17 +17,14 @@
//! An object store implementation for S3
//!
-//! ## Multi-part uploads
+//! ## Multipart uploads
//!
-//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart]
method.
-//! Data passed to the writer is automatically buffered to meet the minimum
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
+//! Multipart uploads can be initiated with the [ObjectStore::put_multipart]
method.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart]
method
-//! to abort the upload and drop those unneeded parts. In addition, you may
wish to
-//! consider implementing [automatic cleanup] of unused parts that are older
than one
-//! week.
+//! used that you will be charged for. [`MultipartUpload::abort`] may be
invoked to drop
+//! these unneeded parts, however, it is recommended that you consider
implementing
+//! [automatic cleanup] of unused parts that are older than some threshold.
//!
//! [automatic cleanup]:
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
@@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
-use tokio::io::AsyncWrite;
use url::Url;
use crate::aws::client::{RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
-use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
+use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::{
- Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, PutMode,
- PutOptions, PutResult, Result,
+ Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta,
+ ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
};
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
@@ -85,6 +81,7 @@ const STORE: &str = "S3";
/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential =
AwsCredential>>;
+use crate::client::parts::Parts;
pub use credential::{AwsAuthorizer, AwsCredential};
/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
@@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 {
}
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let id = self.client.create_multipart(location).await?;
-
- let upload = S3MultiPartUpload {
- location: location.clone(),
- upload_id: id.clone(),
- client: Arc::clone(&self.client),
- };
-
- Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
- }
-
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- self.client
- .delete_request(location, &[("uploadId", multipart_id)])
- .await
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ let upload_id = self.client.create_multipart(location).await?;
+
+ Ok(Box::new(S3MultiPartUpload {
+ part_idx: 0,
+ state: Arc::new(UploadState {
+ client: Arc::clone(&self.client),
+ location: location.clone(),
+ upload_id: upload_id.clone(),
+ parts: Default::default(),
+ }),
+ }))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 {
}
}
+#[derive(Debug)]
struct S3MultiPartUpload {
+ part_idx: usize,
+ state: Arc<UploadState>,
+}
+
+#[derive(Debug)]
+struct UploadState {
+ parts: Parts,
location: Path,
upload_id: String,
client: Arc<S3Client>,
}
#[async_trait]
-impl PutPart for S3MultiPartUpload {
- async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
- self.client
- .put_part(&self.location, &self.upload_id, part_idx, buf.into())
+impl MultipartUpload for S3MultiPartUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ let idx = self.part_idx;
+ self.part_idx += 1;
+ let state = Arc::clone(&self.state);
+ Box::pin(async move {
+ let part = state
+ .client
+ .put_part(&state.location, &state.upload_id, idx, data)
+ .await?;
+ state.parts.put(idx, part);
+ Ok(())
+ })
+ }
+
+ async fn complete(&mut self) -> Result<PutResult> {
+ let parts = self.state.parts.finish(self.part_idx)?;
+
+ self.state
+ .client
+ .complete_multipart(&self.state.location, &self.state.upload_id,
parts)
.await
}
- async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
- self.client
- .complete_multipart(&self.location, &self.upload_id,
completed_parts)
- .await?;
- Ok(())
+ async fn abort(&mut self) -> Result<()> {
+ self.state
+ .client
+ .delete_request(&self.state.location, &[("uploadId",
&self.state.upload_id)])
+ .await
}
}
#[async_trait]
-impl MultiPartStore for AmazonS3 {
+impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}
@@ -377,7 +392,6 @@ mod tests {
use crate::{client::get::GetClient, tests::*};
use bytes::Bytes;
use hyper::HeaderMap;
- use tokio::io::AsyncWriteExt;
const NON_EXISTENT_NAME: &str = "nonexistentname";
@@ -542,9 +556,9 @@ mod tests {
store.put(&locations[0], data.clone()).await.unwrap();
store.copy(&locations[0], &locations[1]).await.unwrap();
- let (_, mut writer) =
store.put_multipart(&locations[2]).await.unwrap();
- writer.write_all(&data).await.unwrap();
- writer.shutdown().await.unwrap();
+ let mut upload = store.put_multipart(&locations[2]).await.unwrap();
+ upload.put_part(data.clone()).await.unwrap();
+ upload.complete().await.unwrap();
for location in &locations {
let res = store
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 712b7a36c56..5d3a405ccc9 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -19,19 +19,15 @@
//!
//! ## Streaming uploads
//!
-//! [ObjectStore::put_multipart] will upload data in blocks and write a blob
from those
-//! blocks. Data is buffered internally to make blocks of at least 5MB and
blocks
-//! are uploaded concurrently.
+//! [ObjectStore::put_multipart] will upload data in blocks and write a blob
from those blocks.
//!
-//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't
provide
-//! a way to drop old blocks. Instead unused blocks are automatically cleaned
up
-//! after 7 days.
+//! Unused blocks will automatically be dropped after 7 days.
use crate::{
- multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
+ multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult,
- Result,
+ GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
+ PutOptions, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -40,7 +36,6 @@ use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
-use tokio::io::AsyncWrite;
use url::Url;
use crate::client::get::GetClientExt;
@@ -54,6 +49,8 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential =
AzureCredential>>;
+use crate::azure::client::AzureClient;
+use crate::client::parts::Parts;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;
@@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure {
self.client.put_blob(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let inner = AzureMultiPartUpload {
- client: Arc::clone(&self.client),
- location: location.to_owned(),
- };
- Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
- }
-
- async fn abort_multipart(&self, _location: &Path, _multipart_id:
&MultipartId) -> Result<()> {
- // There is no way to drop blocks that have been uploaded. Instead,
they simply
- // expire in 7 days.
- Ok(())
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ Ok(Box::new(AzureMultiPartUpload {
+ part_idx: 0,
+ state: Arc::new(UploadState {
+ client: Arc::clone(&self.client),
+ location: location.clone(),
+ parts: Default::default(),
+ }),
+ }))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure {
/// put_multipart_part -> PUT block
/// complete -> PUT block list
/// abort -> No equivalent; blocks are simply dropped after 7 days
-#[derive(Debug, Clone)]
+#[derive(Debug)]
struct AzureMultiPartUpload {
- client: Arc<client::AzureClient>,
+ part_idx: usize,
+ state: Arc<UploadState>,
+}
+
+#[derive(Debug)]
+struct UploadState {
location: Path,
+ parts: Parts,
+ client: Arc<AzureClient>,
}
#[async_trait]
-impl PutPart for AzureMultiPartUpload {
- async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
- self.client.put_block(&self.location, idx, buf.into()).await
+impl MultipartUpload for AzureMultiPartUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ let idx = self.part_idx;
+ self.part_idx += 1;
+ let state = Arc::clone(&self.state);
+ Box::pin(async move {
+ let part = state.client.put_block(&state.location, idx,
data).await?;
+ state.parts.put(idx, part);
+ Ok(())
+ })
+ }
+
+ async fn complete(&mut self) -> Result<PutResult> {
+ let parts = self.state.parts.finish(self.part_idx)?;
+
+ self.state
+ .client
+ .put_block_list(&self.state.location, parts)
+ .await
}
- async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
- self.client.put_block_list(&self.location, parts).await?;
+ async fn abort(&mut self) -> Result<()> {
+ // Nothing to do
Ok(())
}
}
#[async_trait]
-impl MultiPartStore for MicrosoftAzure {
+impl MultipartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index 9299e1147bc..39f8eafbef7 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -18,7 +18,7 @@
//! Utilities for performing tokio-style buffered IO
use crate::path::Path;
-use crate::{MultipartId, ObjectMeta, ObjectStore};
+use crate::{ObjectMeta, ObjectStore, WriteMultipart};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
@@ -27,7 +27,7 @@ use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, AsyncWriteExt,
ReadBuf};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
/// The default buffer size used by [`BufReader`]
pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
@@ -217,7 +217,6 @@ impl AsyncBufRead for BufReader {
pub struct BufWriter {
capacity: usize,
state: BufWriterState,
- multipart_id: Option<MultipartId>,
store: Arc<dyn ObjectStore>,
}
@@ -225,22 +224,19 @@ impl std::fmt::Debug for BufWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufWriter")
.field("capacity", &self.capacity)
- .field("multipart_id", &self.multipart_id)
.finish()
}
}
-type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);
-
enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, Vec<u8>),
/// [`ObjectStore::put_multipart`]
- Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
+ Prepare(BoxFuture<'static, std::io::Result<WriteMultipart>>),
/// Write to a multipart upload
- Write(Box<dyn AsyncWrite + Send + Unpin>),
+ Write(Option<WriteMultipart>),
/// [`ObjectStore::put`]
- Put(BoxFuture<'static, std::io::Result<()>>),
+ Flush(BoxFuture<'static, std::io::Result<()>>),
}
impl BufWriter {
@@ -255,14 +251,20 @@ impl BufWriter {
capacity,
store,
state: BufWriterState::Buffer(path, Vec::new()),
- multipart_id: None,
}
}
- /// Returns the [`MultipartId`] of the multipart upload created by this
- /// writer, if any.
- pub fn multipart_id(&self) -> Option<&MultipartId> {
- self.multipart_id.as_ref()
+ /// Abort this writer, cleaning up any partially uploaded state
+ ///
+ /// # Panic
+ ///
+ /// Panics if this writer has already been shutdown or aborted
+ pub async fn abort(&mut self) -> crate::Result<()> {
+ match &mut self.state {
+ BufWriterState::Buffer(_, _) | BufWriterState::Prepare(_) =>
Ok(()),
+ BufWriterState::Flush(_) => panic!("Already shut down"),
+ BufWriterState::Write(x) => x.take().unwrap().abort().await,
+ }
}
}
@@ -275,12 +277,15 @@ impl AsyncWrite for BufWriter {
let cap = self.capacity;
loop {
return match &mut self.state {
- BufWriterState::Write(write) => Pin::new(write).poll_write(cx,
buf),
- BufWriterState::Put(_) => panic!("Already shut down"),
+ BufWriterState::Write(Some(write)) => {
+ write.write(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+ BufWriterState::Write(None) | BufWriterState::Flush(_) => {
+ panic!("Already shut down")
+ }
BufWriterState::Prepare(f) => {
- let (id, w) = ready!(f.poll_unpin(cx)?);
- self.state = BufWriterState::Write(w);
- self.multipart_id = Some(id);
+ self.state =
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
BufWriterState::Buffer(path, b) => {
@@ -289,9 +294,10 @@ impl AsyncWrite for BufWriter {
let path = std::mem::take(path);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async
move {
- let (id, mut writer) =
store.put_multipart(&path).await?;
- writer.write_all(&buffer).await?;
- Ok((id, writer))
+ let upload = store.put_multipart(&path).await?;
+ let mut chunked = WriteMultipart::new(upload);
+ chunked.write(&buffer);
+ Ok(chunked)
}));
continue;
}
@@ -305,13 +311,10 @@ impl AsyncWrite for BufWriter {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Result<(), Error>> {
loop {
return match &mut self.state {
- BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
- BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
- BufWriterState::Put(_) => panic!("Already shut down"),
+ BufWriterState::Write(_) | BufWriterState::Buffer(_, _) =>
Poll::Ready(Ok(())),
+ BufWriterState::Flush(_) => panic!("Already shut down"),
BufWriterState::Prepare(f) => {
- let (id, w) = ready!(f.poll_unpin(cx)?);
- self.state = BufWriterState::Write(w);
- self.multipart_id = Some(id);
+ self.state =
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
};
@@ -322,21 +325,28 @@ impl AsyncWrite for BufWriter {
loop {
match &mut self.state {
BufWriterState::Prepare(f) => {
- let (id, w) = ready!(f.poll_unpin(cx)?);
- self.state = BufWriterState::Write(w);
- self.multipart_id = Some(id);
+ self.state =
BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
}
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
let path = std::mem::take(p);
let store = Arc::clone(&self.store);
- self.state = BufWriterState::Put(Box::pin(async move {
+ self.state = BufWriterState::Flush(Box::pin(async move {
store.put(&path, buf.into()).await?;
Ok(())
}));
}
- BufWriterState::Put(f) => return f.poll_unpin(cx),
- BufWriterState::Write(w) => return
Pin::new(w).poll_shutdown(cx),
+ BufWriterState::Flush(f) => return f.poll_unpin(cx),
+ BufWriterState::Write(x) => {
+ let upload = x.take().unwrap();
+ self.state = BufWriterState::Flush(
+ async move {
+ upload.finish().await?;
+ Ok(())
+ }
+ .boxed(),
+ )
+ }
}
}
}
@@ -357,7 +367,7 @@ mod tests {
use super::*;
use crate::memory::InMemory;
use crate::path::Path;
- use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};
+ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt,
AsyncWriteExt};
#[tokio::test]
async fn test_buf_reader() {
@@ -448,9 +458,7 @@ mod tests {
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
- assert!(writer.multipart_id().is_none());
writer.shutdown().await.unwrap();
- assert!(writer.multipart_id().is_none());
assert_eq!(store.head(&path).await.unwrap().size, 25);
// Test multipart
@@ -458,9 +466,7 @@ mod tests {
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
- assert!(writer.multipart_id().is_some());
writer.shutdown().await.unwrap();
- assert!(writer.multipart_id().is_some());
assert_eq!(store.head(&path).await.unwrap().size, 40);
}
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index d33556f4b12..6db7f4b35e2 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -25,14 +25,13 @@ use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::StreamExt;
-use tokio::io::AsyncWrite;
use crate::path::Path;
+use crate::Result;
use crate::{
- GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, PutOptions,
- PutResult,
+ GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
+ PutOptions, PutResult,
};
-use crate::{MultipartId, Result};
/// Wraps a [`ObjectStore`] and makes its get response return chunks
/// in a controllable manner.
@@ -67,17 +66,10 @@ impl ObjectStore for ChunkedStore {
self.inner.put_opts(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
self.inner.put_multipart(location).await
}
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- self.inner.abort_multipart(location, multipart_id).await
- }
-
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let r = self.inner.get_opts(location, options).await?;
let stream = match r.payload {
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 252e9fdcadf..7728f38954f 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -40,6 +40,9 @@ pub mod header;
#[cfg(any(feature = "aws", feature = "gcp"))]
pub mod s3;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod parts;
+
use async_trait::async_trait;
use std::collections::HashMap;
use std::str::FromStr;
diff --git a/object_store/src/client/parts.rs b/object_store/src/client/parts.rs
new file mode 100644
index 00000000000..9fc301edcf8
--- /dev/null
+++ b/object_store/src/client/parts.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::multipart::PartId;
+use parking_lot::Mutex;
+
+/// An interior mutable collection of upload parts and their corresponding
part index
+#[derive(Debug, Default)]
+pub(crate) struct Parts(Mutex<Vec<(usize, PartId)>>);
+
+impl Parts {
+ /// Record the [`PartId`] for a given index
+ ///
+ /// Note: calling this method multiple times with the same `part_idx`
+ /// will result in multiple [`PartId`] in the final output
+ pub(crate) fn put(&self, part_idx: usize, id: PartId) {
+ self.0.lock().push((part_idx, id))
+ }
+
+ /// Produce the final list of [`PartId`] ordered by `part_idx`
+ ///
+ /// `expected` is the number of parts expected in the final result
+ pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>>
{
+ let mut parts = self.0.lock();
+ if parts.len() != expected {
+ return Err(crate::Error::Generic {
+ store: "Parts",
+ source: "Missing part".to_string().into(),
+ });
+ }
+ parts.sort_unstable_by_key(|(idx, _)| *idx);
+ Ok(parts.drain(..).map(|(_, v)| v).collect())
+ }
+}
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index e4b0f9af7d1..def53beefe7 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -272,7 +272,7 @@ impl GoogleCloudStorageClient {
})
}
- /// Initiate a multi-part upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
+ /// Initiate a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId>
{
let credential = self.get_credential().await?;
let url = self.object_url(path);
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 8633abbfb4d..2058d1f8055 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -17,18 +17,14 @@
//! An object store implementation for Google Cloud Storage
//!
-//! ## Multi-part uploads
+//! ## Multipart uploads
//!
-//! [Multi-part
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
-//! can be initiated with the [ObjectStore::put_multipart] method.
-//! Data passed to the writer is automatically buffered to meet the minimum
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
-//!
-//! If the writer fails for any reason, you may have parts uploaded to GCS but
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart]
method
-//! to abort the upload and drop those unneeded parts. In addition, you may
wish to
-//! consider implementing automatic clean up of unused parts that are older
than one
-//! week.
+//! [Multipart
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
+//! can be initiated with the [ObjectStore::put_multipart] method. If neither
+//! [`MultipartUpload::complete`] nor [`MultipartUpload::abort`] is invoked,
you may
+//! have parts uploaded to GCS but not used, that you will be charged for. It
is recommended
+//! you configure a [lifecycle rule] to abort incomplete multipart uploads
after a certain
+//! period of time to avoid being charged for storing partial uploads.
//!
//! ## Using HTTP/2
//!
@@ -36,24 +32,24 @@
//! because it allows much higher throughput in our benchmarks (see
//! [#5194](https://github.com/apache/arrow-rs/issues/5194)). HTTP/2 can be
//! enabled by setting [crate::ClientConfigKey::Http1Only] to false.
+//!
+//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
use std::sync::Arc;
use crate::client::CredentialProvider;
use crate::{
- multipart::{PartId, PutPart, WriteMultiPart},
- path::Path,
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult,
- Result,
+ multipart::PartId, path::Path, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload,
+ ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
use client::GoogleCloudStorageClient;
use futures::stream::BoxStream;
-use tokio::io::AsyncWrite;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
-use crate::multipart::MultiPartStore;
+use crate::client::parts::Parts;
+use crate::multipart::MultipartStore;
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
pub use credential::GcpCredential;
@@ -89,27 +85,50 @@ impl GoogleCloudStorage {
}
}
+#[derive(Debug)]
struct GCSMultipartUpload {
+ state: Arc<UploadState>,
+ part_idx: usize,
+}
+
+#[derive(Debug)]
+struct UploadState {
client: Arc<GoogleCloudStorageClient>,
path: Path,
multipart_id: MultipartId,
+ parts: Parts,
}
#[async_trait]
-impl PutPart for GCSMultipartUpload {
- /// Upload an object part
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
- async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
- self.client
- .put_part(&self.path, &self.multipart_id, part_idx, buf.into())
+impl MultipartUpload for GCSMultipartUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ let idx = self.part_idx;
+ self.part_idx += 1;
+ let state = Arc::clone(&self.state);
+ Box::pin(async move {
+ let part = state
+ .client
+ .put_part(&state.path, &state.multipart_id, idx, data)
+ .await?;
+ state.parts.put(idx, part);
+ Ok(())
+ })
+ }
+
+ async fn complete(&mut self) -> Result<PutResult> {
+ let parts = self.state.parts.finish(self.part_idx)?;
+
+ self.state
+ .client
+ .multipart_complete(&self.state.path, &self.state.multipart_id,
parts)
.await
}
- /// Complete a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
- async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
- self.client
- .multipart_complete(&self.path, &self.multipart_id,
completed_parts)
- .await?;
- Ok(())
+ async fn abort(&mut self) -> Result<()> {
+ self.state
+ .client
+ .multipart_cleanup(&self.state.path, &self.state.multipart_id)
+ .await
}
}
@@ -119,27 +138,18 @@ impl ObjectStore for GoogleCloudStorage {
self.client.put(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
let upload_id = self.client.multipart_initiate(location).await?;
- let inner = GCSMultipartUpload {
- client: Arc::clone(&self.client),
- path: location.clone(),
- multipart_id: upload_id.clone(),
- };
-
- Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
- }
-
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- self.client
- .multipart_cleanup(location, multipart_id)
- .await?;
-
- Ok(())
+ Ok(Box::new(GCSMultipartUpload {
+ part_idx: 0,
+ state: Arc::new(UploadState {
+ client: Arc::clone(&self.client),
+ path: location.clone(),
+ multipart_id: upload_id.clone(),
+ parts: Default::default(),
+ }),
+ }))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -176,7 +186,7 @@ impl ObjectStore for GoogleCloudStorage {
}
#[async_trait]
-impl MultiPartStore for GoogleCloudStorage {
+impl MultipartStore for GoogleCloudStorage {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.multipart_initiate(path).await
}
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index f1d11db4762..626337df27f 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -37,7 +37,6 @@ use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
-use tokio::io::AsyncWrite;
use url::Url;
use crate::client::get::GetClientExt;
@@ -45,7 +44,7 @@ use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
- ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartId, ObjectMeta,
+ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartUpload, ObjectMeta,
ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig,
};
@@ -115,15 +114,8 @@ impl ObjectStore for HttpStore {
})
}
- async fn put_multipart(
- &self,
- _location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- Err(super::Error::NotImplemented)
- }
-
- async fn abort_multipart(&self, _location: &Path, _multipart_id:
&MultipartId) -> Result<()> {
- Err(super::Error::NotImplemented)
+ async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ Err(crate::Error::NotImplemented)
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 4960f3ba390..e02675d88ab 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -269,12 +269,11 @@
//!
//! # Multipart Upload
//!
-//! Use the [`ObjectStore::put_multipart`] method to atomically write a large
amount of data,
-//! with implementations automatically handling parallel, chunked upload where
appropriate.
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large
amount of data
//!
//! ```
//! # use object_store::local::LocalFileSystem;
-//! # use object_store::ObjectStore;
+//! # use object_store::{ObjectStore, WriteMultipart};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
@@ -286,12 +285,10 @@
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let upload = object_store.put_multipart(&path).await.unwrap();
+//! let mut write = WriteMultipart::new(upload);
+//! write.write(b"hello");
+//! write.finish().await.unwrap();
//! # }
//! ```
//!
@@ -501,9 +498,11 @@ pub use tags::TagSet;
pub mod multipart;
mod parse;
+mod upload;
mod util;
pub use parse::{parse_url, parse_url_opts};
+pub use upload::*;
pub use util::GetRange;
use crate::path::Path;
@@ -520,12 +519,11 @@ use std::fmt::{Debug, Formatter};
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
-use tokio::io::AsyncWrite;
/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStore;
-/// Id type for multi-part uploads.
+/// Id type for multipart uploads.
pub type MultipartId = String;
/// Universal API to multiple object store services.
@@ -543,48 +541,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// Save the provided bytes to the specified location with the given
options
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions)
-> Result<PutResult>;
- /// Get a multi-part upload that allows writing data in chunks.
- ///
- /// Most cloud-based uploads will buffer and upload parts in parallel.
- ///
- /// To complete the upload, [AsyncWrite::poll_shutdown] must be called
- /// to completion. This operation is guaranteed to be atomic, it will
either
- /// make all the written data available at `location`, or fail. No clients
- /// should be able to observe a partially written object.
- ///
- /// For some object stores (S3, GCS, and local in particular), if the
- /// writer fails or panics, you must call [ObjectStore::abort_multipart]
- /// to clean up partially written data.
- ///
- /// <div class="warning">
- /// It is recommended applications wait for any in-flight requests to
complete by calling `flush`, if
- /// there may be a significant gap in time (> ~30s) before the next write.
- /// These gaps can include times where the function returns control to the
- /// caller while keeping the writer open. If `flush` is not called, futures
- /// for in-flight requests may be left unpolled long enough for the
requests
- /// to time out, causing the write to fail.
- /// </div>
- ///
- /// For applications requiring fine-grained control of multipart uploads
- /// see [`MultiPartStore`], although note that this interface cannot be
- /// supported by all [`ObjectStore`] backends.
- ///
- /// For applications looking to implement this interface for a custom
- /// multipart API, see [`WriteMultiPart`] which handles the complexities
- /// of performing parallel uploads of fixed size parts.
- ///
- /// [`WriteMultiPart`]: multipart::WriteMultiPart
- /// [`MultiPartStore`]: multipart::MultiPartStore
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;
-
- /// Cleanup an aborted upload.
+ /// Perform a multipart upload
///
- /// See documentation for individual stores for exact behavior, as
capabilities
- /// vary by object store.
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()>;
+ /// Client should prefer [`ObjectStore::put`] for small payloads, as
streaming uploads
+ /// typically require multiple separate requests. See [`MultipartUpload`]
for more information
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>>;
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -769,21 +730,10 @@ macro_rules! as_ref_impl {
self.as_ref().put_opts(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
self.as_ref().put_multipart(location).await
}
- async fn abort_multipart(
- &self,
- location: &Path,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- self.as_ref().abort_multipart(location, multipart_id).await
- }
-
async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
@@ -1246,14 +1196,12 @@ mod test_util {
#[cfg(test)]
mod tests {
use super::*;
- use crate::multipart::MultiPartStore;
+ use crate::multipart::MultipartStore;
use crate::test_util::flatten_list_stream;
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
- use std::future::Future;
- use tokio::io::AsyncWriteExt;
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
put_get_delete_list_opts(storage).await
@@ -1928,12 +1876,11 @@ mod tests {
let location = Path::from("test_dir/test_upload_file.txt");
// Can write to storage
- let data = get_chunks(5_000, 10);
+ let data = get_chunks(5 * 1024 * 1024, 3);
let bytes_expected = data.concat();
- let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
- for chunk in &data {
- writer.write_all(chunk).await.unwrap();
- }
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ let uploads = data.into_iter().map(|x| upload.put_part(x));
+ futures::future::try_join_all(uploads).await.unwrap();
// Object should not yet exist in store
let meta_res = storage.head(&location).await;
@@ -1949,7 +1896,8 @@ mod tests {
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
- writer.shutdown().await.unwrap();
+ upload.complete().await.unwrap();
+
let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
@@ -1957,22 +1905,19 @@ mod tests {
// Sizes chosen to ensure we write three parts
let data = get_chunks(3_200_000, 7);
let bytes_expected = data.concat();
- let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
+ let upload = storage.put_multipart(&location).await.unwrap();
+ let mut writer = WriteMultipart::new(upload);
for chunk in &data {
- writer.write_all(chunk).await.unwrap();
+ writer.write(chunk)
}
- writer.shutdown().await.unwrap();
+ writer.finish().await.unwrap();
let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
- let (upload_id, writer) =
storage.put_multipart(&location).await.unwrap();
- drop(writer);
- storage
- .abort_multipart(&location, &upload_id)
- .await
- .unwrap();
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ upload.abort().await.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
@@ -1981,17 +1926,13 @@ mod tests {
));
// We can abort an in-progress write
- let (upload_id, mut writer) =
storage.put_multipart(&location).await.unwrap();
- if let Some(chunk) = data.first() {
- writer.write_all(chunk).await.unwrap();
- let _ = writer.write(chunk).await.unwrap();
- }
- drop(writer);
-
- storage
- .abort_multipart(&location, &upload_id)
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ upload
+ .put_part(data.first().unwrap().clone())
.await
.unwrap();
+
+ upload.abort().await.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
@@ -2186,7 +2127,7 @@ mod tests {
storage.delete(&path2).await.unwrap();
}
- pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn
MultiPartStore) {
+ pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn
MultipartStore) {
let path = Path::from("test_multipart");
let chunk_size = 5 * 1024 * 1024;
@@ -2253,7 +2194,7 @@ mod tests {
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate:
bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
- Fut: Future<Output = Result<reqwest::Response>> + Send,
+ Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
{
use bytes::Buf;
use serde::Deserialize;
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index d1363d9a4d4..e5f6841638e 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -18,18 +18,16 @@
//! An object store that limits the maximum concurrency of the wrapped
implementation
use crate::{
- BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartId, ObjectMeta,
- ObjectStore, Path, PutOptions, PutResult, Result, StreamExt,
+ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
+ ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{FutureExt, Stream};
-use std::io::{Error, IoSlice};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use tokio::io::AsyncWrite;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// Store wrapper that wraps an inner store and limits the maximum number of
concurrent
@@ -81,18 +79,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.put_opts(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- let (id, write) = self.inner.put_multipart(location).await?;
- Ok((id, Box::new(PermitWrapper::new(write, permit))))
- }
-
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- let _permit = self.semaphore.acquire().await.unwrap();
- self.inner.abort_multipart(location, multipart_id).await
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ let upload = self.inner.put_multipart(location).await?;
+ Ok(Box::new(LimitUpload {
+ semaphore: Arc::clone(&self.semaphore),
+ upload,
+ }))
}
async fn get(&self, location: &Path) -> Result<GetResult> {
let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
@@ -221,39 +213,42 @@ impl<T: Stream + Unpin> Stream for PermitWrapper<T> {
}
}
-impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<std::result::Result<usize, Error>> {
- Pin::new(&mut self.inner).poll_write(cx, buf)
- }
+/// An [`MultipartUpload`] wrapper that limits the maximum number of
concurrent requests
+#[derive(Debug)]
+pub struct LimitUpload {
+ upload: Box<dyn MultipartUpload>,
+ semaphore: Arc<Semaphore>,
+}
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.inner).poll_flush(cx)
+impl LimitUpload {
+ /// Create a new [`LimitUpload`] limiting `upload` to `max_concurrency`
concurrent requests
+ pub fn new(upload: Box<dyn MultipartUpload>, max_concurrency: usize) ->
Self {
+ Self {
+ upload,
+ semaphore: Arc::new(Semaphore::new(max_concurrency)),
+ }
}
+}
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.inner).poll_shutdown(cx)
+#[async_trait]
+impl MultipartUpload for LimitUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ let upload = self.upload.put_part(data);
+ let s = Arc::clone(&self.semaphore);
+ Box::pin(async move {
+ let _permit = s.acquire().await.unwrap();
+ upload.await
+ })
}
- fn poll_write_vectored(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- bufs: &[IoSlice<'_>],
- ) -> Poll<std::result::Result<usize, Error>> {
- Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+ async fn complete(&mut self) -> Result<PutResult> {
+ let _permit = self.semaphore.acquire().await.unwrap();
+ self.upload.complete().await
}
- fn is_write_vectored(&self) -> bool {
- self.inner.is_write_vectored()
+ async fn abort(&mut self) -> Result<()> {
+ let _permit = self.semaphore.acquire().await.unwrap();
+ self.upload.abort().await
}
}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index d631771778d..a7eb4661f68 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -16,34 +16,32 @@
// under the License.
//! An object store implementation for a local filesystem
-use crate::{
- maybe_spawn_blocking,
- path::{absolute_path_to_url, Path},
- util::InvalidGetRange,
- GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
ObjectMeta, ObjectStore,
- PutMode, PutOptions, PutResult, Result,
-};
-use async_trait::async_trait;
-use bytes::Bytes;
-use chrono::{DateTime, Utc};
-use futures::future::BoxFuture;
-use futures::ready;
-use futures::{stream::BoxStream, StreamExt};
-use futures::{FutureExt, TryStreamExt};
-use snafu::{ensure, ResultExt, Snafu};
use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
-use std::pin::Pin;
use std::sync::Arc;
-use std::task::Poll;
use std::time::SystemTime;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::VecDeque, path::PathBuf};
-use tokio::io::AsyncWrite;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt};
+use futures::{FutureExt, TryStreamExt};
+use parking_lot::Mutex;
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
use url::Url;
use walkdir::{DirEntry, WalkDir};
+use crate::{
+ maybe_spawn_blocking,
+ path::{absolute_path_to_url, Path},
+ util::InvalidGetRange,
+ GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
+ PutMode, PutOptions, PutResult, Result, UploadPart,
+};
+
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@@ -155,6 +153,9 @@ pub(crate) enum Error {
InvalidPath {
path: String,
},
+
+ #[snafu(display("Upload aborted"))]
+ Aborted,
}
impl From<Error> for super::Error {
@@ -342,8 +343,7 @@ impl ObjectStore for LocalFileSystem {
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
- let (mut file, suffix) = new_staged_upload(&path)?;
- let staging_path = staged_upload_path(&path, &suffix);
+ let (mut file, staging_path) = new_staged_upload(&path)?;
let mut e_tag = None;
let err = match file.write_all(&bytes) {
@@ -395,31 +395,10 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let dest = self.path_to_filesystem(location)?;
-
- let (file, suffix) = new_staged_upload(&dest)?;
- Ok((
- suffix.clone(),
- Box::new(LocalUpload::new(dest, suffix, Arc::new(file))),
- ))
- }
-
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
let dest = self.path_to_filesystem(location)?;
- let path: PathBuf = staged_upload_path(&dest, multipart_id);
-
- maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
- Ok(_) => Ok(()),
- Err(source) => match source.kind() {
- ErrorKind::NotFound => Ok(()), // Already deleted
- _ => Err(Error::UnableToDeleteFile { path, source }.into()),
- },
- })
- .await
+ let (file, src) = new_staged_upload(&dest)?;
+ Ok(Box::new(LocalUpload::new(src, dest, file)))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -677,17 +656,17 @@ fn create_parent_dirs(path: &std::path::Path, source:
io::Error) -> Result<()> {
Ok(())
}
-/// Generates a unique file path `{base}#{suffix}`, returning the opened
`File` and `suffix`
+/// Generates a unique file path `{base}#{suffix}`, returning the opened
`File` and `path`
///
/// Creates any directories if necessary
-fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
+fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
let mut multipart_id = 1;
loop {
let suffix = multipart_id.to_string();
let path = staged_upload_path(base, &suffix);
let mut options = OpenOptions::new();
match options.read(true).write(true).create_new(true).open(&path) {
- Ok(f) => return Ok((f, suffix)),
+ Ok(f) => return Ok((f, path)),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
@@ -705,194 +684,91 @@ fn staged_upload_path(dest: &std::path::Path, suffix:
&str) -> PathBuf {
staging_path.into()
}
-enum LocalUploadState {
- /// Upload is ready to send new data
- Idle(Arc<File>),
- /// In the middle of a write
- Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
- /// In the middle of syncing data and closing file.
- ///
- /// Future will contain last reference to file, so it will call drop on
completion.
- ShuttingDown(BoxFuture<'static, Result<(), io::Error>>),
- /// File is being moved from it's temporary location to the final location
- Committing(BoxFuture<'static, Result<(), io::Error>>),
- /// Upload is complete
- Complete,
+#[derive(Debug)]
+struct LocalUpload {
+ /// The upload state
+ state: Arc<UploadState>,
+ /// The location of the temporary file
+ src: Option<PathBuf>,
+ /// The next offset to write into the file
+ offset: u64,
}
-struct LocalUpload {
- inner_state: LocalUploadState,
+#[derive(Debug)]
+struct UploadState {
dest: PathBuf,
- multipart_id: MultipartId,
+ file: Mutex<Option<File>>,
}
impl LocalUpload {
- pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) ->
Self {
+ pub fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
Self {
- inner_state: LocalUploadState::Idle(file),
- dest,
- multipart_id,
+ state: Arc::new(UploadState {
+ dest,
+ file: Mutex::new(Some(file)),
+ }),
+ src: Some(src),
+ offset: 0,
}
}
}
-impl AsyncWrite for LocalUpload {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- let invalid_state = |condition: &str| -> Poll<Result<usize,
io::Error>> {
- Poll::Ready(Err(io::Error::new(
- ErrorKind::InvalidInput,
- format!("Tried to write to file {condition}."),
- )))
- };
+#[async_trait]
+impl MultipartUpload for LocalUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ let offset = self.offset;
+ self.offset += data.len() as u64;
- if let Ok(runtime) = tokio::runtime::Handle::try_current() {
- let mut data: Vec<u8> = buf.to_vec();
- let data_len = data.len();
-
- loop {
- match &mut self.inner_state {
- LocalUploadState::Idle(file) => {
- let file = Arc::clone(file);
- let file2 = Arc::clone(&file);
- let data: Vec<u8> = std::mem::take(&mut data);
- self.inner_state = LocalUploadState::Writing(
- file,
- Box::pin(
- runtime
- .spawn_blocking(move ||
(&*file2).write_all(&data))
- .map(move |res| match res {
- Err(err) =>
Err(io::Error::new(ErrorKind::Other, err)),
- Ok(res) => res.map(move |_| data_len),
- }),
- ),
- );
- }
- LocalUploadState::Writing(file, inner_write) => {
- let res = ready!(inner_write.poll_unpin(cx));
- self.inner_state =
LocalUploadState::Idle(Arc::clone(file));
- return Poll::Ready(res);
- }
- LocalUploadState::ShuttingDown(_) => {
- return invalid_state("when writer is shutting down");
- }
- LocalUploadState::Committing(_) => {
- return invalid_state("when writer is committing data");
- }
- LocalUploadState::Complete => {
- return invalid_state("when writer is complete");
- }
- }
- }
- } else if let LocalUploadState::Idle(file) = &self.inner_state {
- let file = Arc::clone(file);
- (&*file).write_all(buf)?;
- Poll::Ready(Ok(buf.len()))
- } else {
- // If we are running on this thread, then only possible states are
Idle and Complete.
- invalid_state("when writer is already complete.")
- }
+ let s = Arc::clone(&self.state);
+ maybe_spawn_blocking(move || {
+ let mut f = s.file.lock();
+ let file = f.as_mut().context(AbortedSnafu)?;
+ file.seek(SeekFrom::Start(offset))
+ .context(SeekSnafu { path: &s.dest })?;
+ file.write_all(&data).context(UnableToCopyDataToFileSnafu)?;
+ Ok(())
+ })
+ .boxed()
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- Poll::Ready(Ok(()))
+ async fn complete(&mut self) -> Result<PutResult> {
+ let src = self.src.take().context(AbortedSnafu)?;
+ let s = Arc::clone(&self.state);
+ maybe_spawn_blocking(move || {
+ // Ensure no inflight writes
+ let f = s.file.lock().take().context(AbortedSnafu)?;
+ std::fs::rename(&src, &s.dest).context(UnableToRenameFileSnafu)?;
+ let metadata = f.metadata().map_err(|e| Error::Metadata {
+ source: e.into(),
+ path: src.to_string_lossy().to_string(),
+ })?;
+
+ Ok(PutResult {
+ e_tag: Some(get_etag(&metadata)),
+ version: None,
+ })
+ })
+ .await
}
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- if let Ok(runtime) = tokio::runtime::Handle::try_current() {
- loop {
- match &mut self.inner_state {
- LocalUploadState::Idle(file) => {
- // We are moving file into the future, and it will be
dropped on it's completion, closing the file.
- let file = Arc::clone(file);
- self.inner_state =
LocalUploadState::ShuttingDown(Box::pin(
- runtime
- .spawn_blocking(move || (*file).sync_all())
- .map(move |res| match res {
- Err(err) =>
Err(io::Error::new(io::ErrorKind::Other, err)),
- Ok(res) => res,
- }),
- ));
- }
- LocalUploadState::ShuttingDown(fut) => match
fut.poll_unpin(cx) {
- Poll::Ready(res) => {
- res?;
- let staging_path = staged_upload_path(&self.dest,
&self.multipart_id);
- let dest = self.dest.clone();
- self.inner_state =
LocalUploadState::Committing(Box::pin(
- runtime
- .spawn_blocking(move ||
std::fs::rename(&staging_path, &dest))
- .map(move |res| match res {
- Err(err) =>
Err(io::Error::new(io::ErrorKind::Other, err)),
- Ok(res) => res,
- }),
- ));
- }
- Poll::Pending => {
- return Poll::Pending;
- }
- },
- LocalUploadState::Writing(_, _) => {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- "Tried to commit a file where a write is in
progress.",
- )));
- }
- LocalUploadState::Committing(fut) => {
- let res = ready!(fut.poll_unpin(cx));
- self.inner_state = LocalUploadState::Complete;
- return Poll::Ready(res);
- }
- LocalUploadState::Complete => {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- "Already complete",
- )))
- }
- }
- }
- } else {
- let staging_path = staged_upload_path(&self.dest,
&self.multipart_id);
- match &mut self.inner_state {
- LocalUploadState::Idle(file) => {
- let file = Arc::clone(file);
- self.inner_state = LocalUploadState::Complete;
- file.sync_all()?;
- drop(file);
- std::fs::rename(staging_path, &self.dest)?;
- Poll::Ready(Ok(()))
- }
- _ => {
- // If we are running on this thread, then only possible
states are Idle and Complete.
- Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already
complete")))
- }
- }
- }
+ async fn abort(&mut self) -> Result<()> {
+ let src = self.src.take().context(AbortedSnafu)?;
+ maybe_spawn_blocking(move || {
+ std::fs::remove_file(&src).context(UnableToDeleteFileSnafu { path:
&src })?;
+ Ok(())
+ })
+ .await
}
}
impl Drop for LocalUpload {
fn drop(&mut self) {
- match self.inner_state {
- LocalUploadState::Complete => (),
- _ => {
- self.inner_state = LocalUploadState::Complete;
- let path = staged_upload_path(&self.dest, &self.multipart_id);
- // Try to cleanup intermediate file ignoring any error
- match tokio::runtime::Handle::try_current() {
- Ok(r) => drop(r.spawn_blocking(move ||
std::fs::remove_file(path))),
- Err(_) => drop(std::fs::remove_file(path)),
- };
- }
+ if let Some(src) = self.src.take() {
+ // Try to clean up intermediate file ignoring any error
+ match tokio::runtime::Handle::try_current() {
+ Ok(r) => drop(r.spawn_blocking(move ||
std::fs::remove_file(src))),
+ Err(_) => drop(std::fs::remove_file(src)),
+ };
}
}
}
@@ -1095,12 +971,13 @@ fn convert_walkdir_result(
#[cfg(test)]
mod tests {
- use super::*;
- use crate::test_util::flatten_list_stream;
- use crate::tests::*;
use futures::TryStreamExt;
use tempfile::{NamedTempFile, TempDir};
- use tokio::io::AsyncWriteExt;
+
+ use crate::test_util::flatten_list_stream;
+ use crate::tests::*;
+
+ use super::*;
#[tokio::test]
async fn file_test() {
@@ -1125,7 +1002,18 @@ mod tests {
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
- stream_get(&integration).await;
+
+ // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
+ let p = Path::from("manual_upload");
+ let mut upload = integration.put_multipart(&p).await.unwrap();
+ upload.put_part(Bytes::from_static(b"123")).await.unwrap();
+ upload.put_part(Bytes::from_static(b"45678")).await.unwrap();
+ let r = upload.complete().await.unwrap();
+
+ let get = integration.get(&p).await.unwrap();
+ assert_eq!(get.meta.e_tag.as_ref().unwrap(),
r.e_tag.as_ref().unwrap());
+ let actual = get.bytes().await.unwrap();
+ assert_eq!(actual.as_ref(), b"12345678");
});
}
@@ -1422,12 +1310,11 @@ mod tests {
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
- let (multipart_id, mut writer) =
integration.put_multipart(&location).await.unwrap();
- writer.write_all(&data).await.unwrap();
+ let mut u1 = integration.put_multipart(&location).await.unwrap();
+ u1.put_part(data.clone()).await.unwrap();
- let (multipart_id_2, mut writer_2) =
integration.put_multipart(&location).await.unwrap();
- assert_ne!(multipart_id, multipart_id_2);
- writer_2.write_all(&data).await.unwrap();
+ let mut u2 = integration.put_multipart(&location).await.unwrap();
+ u2.put_part(data).await.unwrap();
let list = flatten_list_stream(&integration, None).await.unwrap();
assert_eq!(list.len(), 0);
@@ -1520,11 +1407,13 @@ mod tests {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod not_wasm_tests {
- use crate::local::LocalFileSystem;
- use crate::{ObjectStore, Path};
use std::time::Duration;
+
+ use bytes::Bytes;
use tempfile::TempDir;
- use tokio::io::AsyncWriteExt;
+
+ use crate::local::LocalFileSystem;
+ use crate::{ObjectStore, Path};
#[tokio::test]
async fn test_cleanup_intermediate_files() {
@@ -1532,12 +1421,13 @@ mod not_wasm_tests {
let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
let location = Path::from("some_file");
- let (_, mut writer) =
integration.put_multipart(&location).await.unwrap();
- writer.write_all(b"hello").await.unwrap();
+ let data = Bytes::from_static(b"hello");
+ let mut upload = integration.put_multipart(&location).await.unwrap();
+ upload.put_part(data).await.unwrap();
let file_count = std::fs::read_dir(root.path()).unwrap().count();
assert_eq!(file_count, 1);
- drop(writer);
+ drop(upload);
tokio::time::sleep(Duration::from_millis(1)).await;
@@ -1549,13 +1439,15 @@ mod not_wasm_tests {
#[cfg(target_family = "unix")]
#[cfg(test)]
mod unix_test {
- use crate::local::LocalFileSystem;
- use crate::{ObjectStore, Path};
+ use std::fs::OpenOptions;
+
use nix::sys::stat;
use nix::unistd;
- use std::fs::OpenOptions;
use tempfile::TempDir;
+ use crate::local::LocalFileSystem;
+ use crate::{ObjectStore, Path};
+
#[tokio::test]
async fn test_fifo() {
let filename = "some_file";
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 41ee1091a3b..6c960d4f24f 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -16,27 +16,24 @@
// under the License.
//! An in-memory object store implementation
-use crate::multipart::{MultiPartStore, PartId};
-use crate::util::InvalidGetRange;
-use crate::{
- path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore,
- PutMode, PutOptions, PutResult, Result, UpdateVersion,
-};
-use crate::{GetOptions, MultipartId};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
+use std::ops::Range;
+use std::sync::Arc;
+
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
-use std::collections::BTreeSet;
-use std::collections::{BTreeMap, HashMap};
-use std::io;
-use std::ops::Range;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::Poll;
-use tokio::io::AsyncWrite;
+
+use crate::multipart::{MultipartStore, PartId};
+use crate::util::InvalidGetRange;
+use crate::GetOptions;
+use crate::{
+ path::Path, GetRange, GetResult, GetResultPayload, ListResult,
MultipartId, MultipartUpload,
+ ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result,
UpdateVersion, UploadPart,
+};
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
@@ -213,23 +210,12 @@ impl ObjectStore for InMemory {
})
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- Ok((
- String::new(),
- Box::new(InMemoryUpload {
- location: location.clone(),
- data: Vec::new(),
- storage: Arc::clone(&self.storage),
- }),
- ))
- }
-
- async fn abort_multipart(&self, _location: &Path, _multipart_id:
&MultipartId) -> Result<()> {
- // Nothing to clean up
- Ok(())
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ Ok(Box::new(InMemoryUpload {
+ location: location.clone(),
+ parts: vec![],
+ storage: Arc::clone(&self.storage),
+ }))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -391,7 +377,7 @@ impl ObjectStore for InMemory {
}
#[async_trait]
-impl MultiPartStore for InMemory {
+impl MultipartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
@@ -482,45 +468,42 @@ impl InMemory {
}
}
+#[derive(Debug)]
struct InMemoryUpload {
location: Path,
- data: Vec<u8>,
+ parts: Vec<Bytes>,
storage: Arc<RwLock<Storage>>,
}
-impl AsyncWrite for InMemoryUpload {
- fn poll_write(
- mut self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- self.data.extend_from_slice(buf);
- Poll::Ready(Ok(buf.len()))
+#[async_trait]
+impl MultipartUpload for InMemoryUpload {
+ fn put_part(&mut self, data: Bytes) -> UploadPart {
+ self.parts.push(data);
+ Box::pin(futures::future::ready(Ok(())))
}
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- Poll::Ready(Ok(()))
+ async fn complete(&mut self) -> Result<PutResult> {
+ let cap = self.parts.iter().map(|x| x.len()).sum();
+ let mut buf = Vec::with_capacity(cap);
+ self.parts.iter().for_each(|x| buf.extend_from_slice(x));
+ let etag = self.storage.write().insert(&self.location, buf.into());
+ Ok(PutResult {
+ e_tag: Some(etag.to_string()),
+ version: None,
+ })
}
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- let data = Bytes::from(std::mem::take(&mut self.data));
- self.storage.write().insert(&self.location, data);
- Poll::Ready(Ok(()))
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
}
}
#[cfg(test)]
mod tests {
- use super::*;
-
use crate::tests::*;
+ use super::*;
+
#[tokio::test]
async fn in_memory_test() {
let integration = InMemory::new();
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 1dcd5a6f496..26cce393624 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -17,34 +17,16 @@
//! Cloud Multipart Upload
//!
-//! This crate provides an asynchronous interface for multipart file uploads
to cloud storage services.
-//! It's designed to offer efficient, non-blocking operations,
+//! This crate provides an asynchronous interface for multipart file uploads to
+//! cloud storage services. It's designed to offer efficient, non-blocking
operations,
//! especially useful when dealing with large files or high-throughput systems.
use async_trait::async_trait;
use bytes::Bytes;
-use futures::{stream::FuturesUnordered, Future, StreamExt};
-use std::{io, pin::Pin, sync::Arc, task::Poll};
-use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::{MultipartId, PutResult, Result};
-type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> +
Send>>;
-
-/// A trait used in combination with [`WriteMultiPart`] to implement
-/// [`AsyncWrite`] on top of an API for multipart upload
-#[async_trait]
-pub trait PutPart: Send + Sync + 'static {
- /// Upload a single part
- async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId>;
-
- /// Complete the upload with the provided parts
- ///
- /// `completed_parts` is in order of part number
- async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()>;
-}
-
/// Represents a part of a file that has been successfully uploaded in a
multipart upload process.
#[derive(Debug, Clone)]
pub struct PartId {
@@ -52,222 +34,6 @@ pub struct PartId {
pub content_id: String,
}
-/// Wrapper around a [`PutPart`] that implements [`AsyncWrite`]
-///
-/// Data will be uploaded in fixed size chunks of 10 MiB in parallel,
-/// up to the configured maximum concurrency
-pub struct WriteMultiPart<T: PutPart> {
- inner: Arc<T>,
- /// A list of completed parts, in sequential order.
- completed_parts: Vec<Option<PartId>>,
- /// Part upload tasks currently running
- tasks: FuturesUnordered<BoxedTryFuture<(usize, PartId)>>,
- /// Maximum number of upload tasks to run concurrently
- max_concurrency: usize,
- /// Buffer that will be sent in next upload.
- current_buffer: Vec<u8>,
- /// Size of each part.
- ///
- /// While S3 and Minio support variable part sizes, R2 requires they all be
- /// exactly the same size.
- part_size: usize,
- /// Index of current part
- current_part_idx: usize,
- /// The completion task
- completion_task: Option<BoxedTryFuture<()>>,
-}
-
-impl<T: PutPart> WriteMultiPart<T> {
- /// Create a new multipart upload with the implementation and the given
maximum concurrency
- pub fn new(inner: T, max_concurrency: usize) -> Self {
- Self {
- inner: Arc::new(inner),
- completed_parts: Vec::new(),
- tasks: FuturesUnordered::new(),
- max_concurrency,
- current_buffer: Vec::new(),
- // TODO: Should self vary by provider?
- // TODO: Should we automatically increase then when part index
gets large?
-
- // Minimum size of 5 MiB
- //
https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
- // https://cloud.google.com/storage/quotas#requests
- part_size: 10 * 1024 * 1024,
- current_part_idx: 0,
- completion_task: None,
- }
- }
-
- // Add data to the current buffer, returning the number of bytes added
- fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) ->
usize {
- let remaining_capacity = self.part_size - self.current_buffer.len();
- let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset);
- self.current_buffer
- .extend_from_slice(&buf[offset..offset + to_copy]);
- to_copy
- }
-
- /// Poll current tasks
- fn poll_tasks(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Result<(), io::Error> {
- if self.tasks.is_empty() {
- return Ok(());
- }
- while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
- let (part_idx, part) = res?;
- let total_parts = self.completed_parts.len();
- self.completed_parts
- .resize(std::cmp::max(part_idx + 1, total_parts), None);
- self.completed_parts[part_idx] = Some(part);
- }
- Ok(())
- }
-
- // The `poll_flush` function will only flush the in-progress tasks.
- // The `final_flush` method called during `poll_shutdown` will flush
- // the `current_buffer` along with in-progress tasks.
- // Please see https://github.com/apache/arrow-rs/issues/3390 for more
details.
- fn final_flush(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- // If current_buffer is not empty, see if it can be submitted
- if !self.current_buffer.is_empty() && self.tasks.len() <
self.max_concurrency {
- let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
- let inner = Arc::clone(&self.inner);
- let part_idx = self.current_part_idx;
- self.tasks.push(Box::pin(async move {
- let upload_part = inner.put_part(out_buffer, part_idx).await?;
- Ok((part_idx, upload_part))
- }));
- }
-
- self.as_mut().poll_tasks(cx)?;
-
- // If tasks and current_buffer are empty, return Ready
- if self.tasks.is_empty() && self.current_buffer.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-}
-
-impl<T: PutPart> AsyncWrite for WriteMultiPart<T> {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- let mut offset = 0;
-
- loop {
- // Fill up current buffer
- offset += self.as_mut().add_to_buffer(buf, offset);
-
- // If we don't have a full buffer or we have too many tasks, break
- if self.current_buffer.len() < self.part_size
- || self.tasks.len() >= self.max_concurrency
- {
- break;
- }
-
- let new_buffer = Vec::with_capacity(self.part_size);
- let out_buffer = std::mem::replace(&mut self.current_buffer,
new_buffer);
- let inner = Arc::clone(&self.inner);
- let part_idx = self.current_part_idx;
- self.tasks.push(Box::pin(async move {
- let upload_part = inner.put_part(out_buffer, part_idx).await?;
- Ok((part_idx, upload_part))
- }));
- self.current_part_idx += 1;
-
- // We need to poll immediately after adding to setup waker
- self.as_mut().poll_tasks(cx)?;
- }
-
- // If offset is zero, then we didn't write anything because we didn't
- // have capacity for more tasks and our buffer is full.
- if offset == 0 && !buf.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(Ok(offset))
- }
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // Poll current tasks
- self.as_mut().poll_tasks(cx)?;
-
- // If tasks is empty, return Ready
- if self.tasks.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- // First, poll flush
- match self.as_mut().final_flush(cx) {
- Poll::Pending => return Poll::Pending,
- Poll::Ready(res) => res?,
- };
-
- // If shutdown task is not set, set it
- let parts = std::mem::take(&mut self.completed_parts);
- let parts = parts
- .into_iter()
- .enumerate()
- .map(|(idx, part)| {
- part.ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- format!("Missing information for upload part {idx}"),
- )
- })
- })
- .collect::<Result<_, _>>()?;
-
- let inner = Arc::clone(&self.inner);
- let completion_task = self.completion_task.get_or_insert_with(|| {
- Box::pin(async move {
- inner.complete(parts).await?;
- Ok(())
- })
- });
-
- Pin::new(completion_task).poll(cx)
- }
-}
-
-impl<T: PutPart> std::fmt::Debug for WriteMultiPart<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("WriteMultiPart")
- .field("completed_parts", &self.completed_parts)
- .field("tasks", &self.tasks)
- .field("max_concurrency", &self.max_concurrency)
- .field("current_buffer", &self.current_buffer)
- .field("part_size", &self.part_size)
- .field("current_part_idx", &self.current_part_idx)
- .finish()
- }
-}
-
/// A low-level interface for interacting with multipart upload APIs
///
/// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is
supported by more
@@ -277,7 +43,7 @@ impl<T: PutPart> std::fmt::Debug for WriteMultiPart<T> {
/// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart
/// [`LocalFileSystem`]: crate::local::LocalFileSystem
#[async_trait]
-pub trait MultiPartStore: Send + Sync + 'static {
+pub trait MultipartStore: Send + Sync + 'static {
/// Creates a new multipart upload, returning the [`MultipartId`]
async fn create_multipart(&self, path: &Path) -> Result<MultipartId>;
@@ -288,10 +54,11 @@ pub trait MultiPartStore: Send + Sync + 'static {
///
/// Most stores require that all parts excluding the last are at least 5
MiB, and some
/// further require that all parts excluding the last be the same size,
e.g. [R2].
- /// [`WriteMultiPart`] performs writes in fixed size blocks of 10 MiB, and
clients wanting
+ /// [`WriteMultipart`] performs writes in fixed size blocks of 5 MiB, and
clients wanting
/// to maximise compatibility should look to do likewise.
///
/// [R2]:
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+ /// [`WriteMultipart`]: crate::upload::WriteMultipart
async fn put_part(
&self,
path: &Path,
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 38f9b07bbd0..053f71a2d06 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -19,12 +19,11 @@
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use std::ops::Range;
-use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::{
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult,
- Result,
+ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutOptions,
+ PutResult, Result,
};
#[doc(hidden)]
@@ -91,18 +90,11 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.put_opts(&full_path, bytes, opts).await
}
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
let full_path = self.full_path(location);
self.inner.put_multipart(&full_path).await
}
- async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()> {
- let full_path = self.full_path(location);
- self.inner.abort_multipart(&full_path, multipart_id).await
- }
async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 252256a4599..5ca1eedbf73 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -20,16 +20,15 @@ use parking_lot::Mutex;
use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
+use crate::GetOptions;
use crate::{
- path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, PutOptions,
- PutResult, Result,
+ path::Path, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
+ PutOptions, PutResult, Result,
};
-use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, FutureExt, StreamExt};
use std::time::Duration;
-use tokio::io::AsyncWrite;
/// Configuration settings for throttled store
#[derive(Debug, Default, Clone, Copy)]
@@ -158,14 +157,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.put_opts(location, bytes, opts).await
}
- async fn put_multipart(
- &self,
- _location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- Err(super::Error::NotImplemented)
- }
-
- async fn abort_multipart(&self, _location: &Path, _multipart_id:
&MultipartId) -> Result<()> {
+ async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
Err(super::Error::NotImplemented)
}
diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs
new file mode 100644
index 00000000000..6f8bfa8a5f7
--- /dev/null
+++ b/object_store/src/upload.rs
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PutResult, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use tokio::task::JoinSet;
+
+/// An upload part request
+pub type UploadPart = BoxFuture<'static, Result<()>>;
+
+/// A trait allowing writing an object in fixed size chunks
+///
+/// Consecutive chunks of data can be written by calling
[`MultipartUpload::put_part`] and polling
+/// the returned futures to completion. Multiple futures returned by
[`MultipartUpload::put_part`]
+/// may be polled in parallel, allowing for concurrent uploads.
+///
+/// Once all part uploads have been polled to completion, the upload can be
completed by
+/// calling [`MultipartUpload::complete`]. This will make the entire uploaded
object visible
+/// as an atomic operation.It is implementation behind behaviour if
[`MultipartUpload::complete`]
+/// is called before all [`UploadPart`] have been polled to completion.
+#[async_trait]
+pub trait MultipartUpload: Send + std::fmt::Debug {
+ /// Upload the next part
+ ///
+ /// Most stores require that all parts excluding the last are at least 5
MiB, and some
+ /// further require that all parts excluding the last be the same size,
e.g. [R2].
+ /// Clients wanting to maximise compatibility should therefore perform
writes in
+ /// fixed size blocks larger than 5 MiB.
+ ///
+ /// Implementations may invoke this method multiple times and then await
on the
+ /// returned futures in parallel
+ ///
+ /// ```no_run
+ /// # use futures::StreamExt;
+ /// # use object_store::MultipartUpload;
+ /// #
+ /// # async fn test() {
+ /// #
+ /// let mut upload: Box<&dyn MultipartUpload> = todo!();
+ /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
+ /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
+ /// futures::future::try_join(p1, p2).await.unwrap();
+ /// upload.complete().await.unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [R2]:
https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations
+ fn put_part(&mut self, data: Bytes) -> UploadPart;
+
+ /// Complete the multipart upload
+ ///
+ /// It is implementation defined behaviour if this method is called before
polling
+ /// all [`UploadPart`] returned by [`MultipartUpload::put_part`] to
completion. Additionally,
+ /// it is implementation defined behaviour to call
[`MultipartUpload::complete`]
+ /// on an already completed or aborted [`MultipartUpload`].
+ async fn complete(&mut self) -> Result<PutResult>;
+
+ /// Abort the multipart upload
+ ///
+ /// If a [`MultipartUpload`] is dropped without calling
[`MultipartUpload::complete`],
+ /// some object stores will automatically clean up any previously uploaded
parts.
+ /// However, some stores, such as S3 and GCS, cannot perform cleanup on
drop.
+ /// As such [`MultipartUpload::abort`] can be invoked to perform this
cleanup.
+ ///
+ /// It will not be possible to call `abort` in all failure scenarios, for
example
+ /// non-graceful shutdown of the calling application. It is therefore
recommended
+ /// object stores are configured with lifecycle rules to automatically
cleanup
+ /// unused parts older than some threshold. See [crate::aws] and
[crate::gcp]
+ /// for more information.
+ ///
+ /// It is implementation defined behaviour to call
[`MultipartUpload::abort`]
+ /// on an already completed or aborted [`MultipartUpload`]
+ async fn abort(&mut self) -> Result<()>;
+}
+
+/// A synchronous write API for uploading data in parallel in fixed size chunks
+///
+/// Uses multiple tokio tasks in a [`JoinSet`] to multiplex upload tasks in
parallel
+///
+/// The design also takes inspiration from [`Sink`] with
[`WriteMultipart::wait_for_capacity`]
+/// allowing back pressure on producers, prior to buffering the next part.
However, unlike
+/// [`Sink`] this back pressure is optional, allowing integration with
synchronous producers
+///
+/// [`Sink`]: futures::sink::Sink
+#[derive(Debug)]
+pub struct WriteMultipart {
+ upload: Box<dyn MultipartUpload>,
+
+ buffer: Vec<u8>,
+
+ tasks: JoinSet<Result<()>>,
+}
+
+impl WriteMultipart {
+ /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
+ pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
+ Self::new_with_capacity(upload, 5 * 1024 * 1024)
+ }
+
+ /// Create a new [`WriteMultipart`] that will upload in fixed `capacity`
sized chunks
+ pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity:
usize) -> Self {
+ Self {
+ upload,
+ buffer: Vec::with_capacity(capacity),
+ tasks: Default::default(),
+ }
+ }
+
+ /// Wait until there are `max_concurrency` or fewer requests in-flight
+ pub async fn wait_for_capacity(&mut self, max_concurrency: usize) ->
Result<()> {
+ while self.tasks.len() > max_concurrency {
+ self.tasks.join_next().await.unwrap()??;
+ }
+ Ok(())
+ }
+
+ /// Write data to this [`WriteMultipart`]
+ ///
+ /// Note this method is synchronous (not `async`) and will immediately
start new uploads
+ /// as soon as the internal `capacity` is hit, regardless of
+ /// how many outstanding uploads are already in progress.
+ ///
+ /// Back pressure can optionally be applied to producers by calling
+ /// [`Self::wait_for_capacity`] prior to calling this method
+ pub fn write(&mut self, mut buf: &[u8]) {
+ while !buf.is_empty() {
+ let capacity = self.buffer.capacity();
+ let remaining = capacity - self.buffer.len();
+ let to_read = buf.len().min(remaining);
+ self.buffer.extend_from_slice(&buf[..to_read]);
+ if to_read == remaining {
+ let part = std::mem::replace(&mut self.buffer,
Vec::with_capacity(capacity));
+ self.put_part(part.into())
+ }
+ buf = &buf[to_read..]
+ }
+ }
+
+ fn put_part(&mut self, part: Bytes) {
+ self.tasks.spawn(self.upload.put_part(part));
+ }
+
+ /// Abort this upload, attempting to clean up any successfully uploaded
parts
+ pub async fn abort(mut self) -> Result<()> {
+ self.tasks.shutdown().await;
+ self.upload.abort().await
+ }
+
+ /// Flush final chunk, and await completion of all in-flight requests
+ pub async fn finish(mut self) -> Result<PutResult> {
+ if !self.buffer.is_empty() {
+ let part = std::mem::take(&mut self.buffer);
+ self.put_part(part.into())
+ }
+
+ self.wait_for_capacity(0).await?;
+ self.upload.complete().await
+ }
+}
diff --git a/object_store/tests/get_range_file.rs
b/object_store/tests/get_range_file.rs
index f73d78578f0..309a86d8fe9 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -25,7 +25,6 @@ use object_store::path::Path;
use object_store::*;
use std::fmt::Formatter;
use tempfile::tempdir;
-use tokio::io::AsyncWrite;
#[derive(Debug)]
struct MyStore(LocalFileSystem);
@@ -42,14 +41,7 @@ impl ObjectStore for MyStore {
self.0.put_opts(path, data, opts).await
}
- async fn put_multipart(
- &self,
- _: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- todo!()
- }
-
- async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
+ async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
todo!()
}