alamb commented on code in PR #5500: URL: https://github.com/apache/arrow-rs/pull/5500#discussion_r1525136739
########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { Review Comment: For other reviewers, This is the key new API What is the correct description of this trait? Something like ```suggestion /// Represents an inprogress multi-part upload /// /// (Is this right? Can we actually abort a multi-part upload? ) /// Cancel behavior: On drop, the multi-part upload is aborted pub trait Upload: Send + std::fmt::Debug { ``` ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + async fn complete(&mut self) -> Result<PutResult>; + + /// Abort the multipart upload + /// + /// It is implementation defined behaviour if called concurrently with [`UploadPart::execute`] + async fn abort(&mut self) -> Result<()>; +} + +/// A synchronous write API for uploading data in parallel +/// +/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, +/// avoiding issues caused by sharing a single tokio's cooperative task +/// budget across multiple IO operations. +/// +/// The design also takes inspiration from [`Sink`] with [`ChunkedUpload::wait_for_capacity`] +/// allowing back pressure on producers, prior to buffering the next part. However, unlike +/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers +/// +/// [`Sink`]: futures::sink::Sink +#[derive(Debug)] +pub struct ChunkedUpload { + upload: Box<dyn Upload>, + + buffer: Vec<u8>, + + tasks: JoinSet<Result<()>>, +} + +impl ChunkedUpload { + /// Create a new [`ChunkedUpload`] + pub fn new(upload: Box<dyn Upload>) -> Self { + Self::new_with_capacity(upload, 5 * 1024 * 1024) + } + + /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks + pub fn new_with_capacity(upload: Box<dyn Upload>, capacity: usize) -> Self { + Self { + upload, + buffer: Vec::with_capacity(capacity), + tasks: Default::default(), + } + } + + /// Wait until there are `max_concurrency` or fewer requests in-flight + pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { + while self.tasks.len() > max_concurrency { + self.tasks.join_next().await.unwrap()??; + } + Ok(()) + } + + /// Write data to this [`ChunkedUpload`] + /// + /// Back pressure can optionally be applied to producers by calling + /// [`Self::wait_for_capacity`] prior to calling this method + pub fn write(&mut self, mut buf: &[u8]) { + while !buf.is_empty() { + let capacity = self.buffer.capacity(); + let remaining = capacity - self.buffer.len(); + let to_read = buf.len().min(remaining); + self.buffer.extend_from_slice(&buf[..to_read]); + if to_read == remaining { + let part = std::mem::replace(&mut self.buffer, Vec::with_capacity(capacity)); + self.put_part(part.into()) + } + buf = &buf[to_read..] + } + } + + fn put_part(&mut self, part: Bytes) { + self.tasks.spawn(self.upload.put_part(part)); + } + + /// Abort this upload + pub async fn abort(mut self) -> Result<()> { + self.tasks.shutdown().await; + self.upload.abort().await + } + + /// Flush final chunk, and await completion of all in-flight requests + pub async fn finish(mut self) -> Result<PutResult> { Review Comment: I believe this `PutResult` is what @ashtuchkin is asking for in https://github.com/apache/arrow-rs/issues/5443 ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); Review Comment: I didn't quite follow the discussion about `mut` borrows below, but this example seems to demonstrate it is possible to uploads in parallel, which is a key usecase ########## object_store/src/limit.rs: ########## @@ -81,18 +81,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let (id, write) = self.inner.put_multipart(location).await?; - Ok((id, Box::new(PermitWrapper::new(write, permit)))) - } - - async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> { - let _permit = self.semaphore.acquire().await.unwrap(); - self.inner.abort_multipart(location, multipart_id).await + async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> { Review Comment: I know you prefer shorter names, but here I recommend calling this `multipart_upload` and `MultipartUpload` to align with the term used by the cloud providers. Upload is pretty generic https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-upload-object.html https://cloud.google.com/storage/docs/xml-api/post-object-complete Or maybe keep the existing `put_multipart` name (though I realize reusing the same name may be more confusing on upgrade) ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + async fn complete(&mut self) -> Result<PutResult>; Review Comment: Is there any usecase for the PartId needed by the client (e.g. is it something that S3 exposes that someone might want access to?) ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; Review Comment: This API requires having the entire upload in a single contiguous `Bytes` buffer right? it doesn't seem to allow for incremental streaming writes, the way the current `AsyncWrite` does. Maybe we could support both a `put_part` like this (all data at once) as well as a `put_part_stream` (that takes data as a steam of `Byte`s) 🤔 Even if the intention is that most people will use the `ChunkedUpload` API I think it is still worth considering non-contiguous writes ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; + +#[async_trait] +pub trait Upload: Send + std::fmt::Debug { + /// Upload the next part + /// + /// Returns a stream + /// + /// Most stores require that all parts excluding the last are at least 5 MiB, and some + /// further require that all parts excluding the last be the same size, e.g. [R2]. + /// Clients wanting to maximise compatibility should therefore perform writes in + /// fixed size blocks larger than 5 MiB. + /// + /// Implementations may invoke this method multiple times and then await on the + /// returned futures in parallel + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use object_store::Upload; + /// # + /// # async fn test() { + /// # + /// let mut upload: Box<&dyn Upload> = todo!(); + /// let mut p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into()); + /// let mut p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into()); + /// + /// let (u1, u2) = futures::future::join(p1.next(), p2.next()).await; + /// u1.unwrap().unwrap(); + /// u2.unwrap().unwrap(); + /// let result = upload.complete().await.unwrap(); + /// # } + /// ``` + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + fn put_part(&mut self, data: Bytes) -> UploadPart; + + /// Complete the multipart upload + async fn complete(&mut self) -> Result<PutResult>; + + /// Abort the multipart upload + /// + /// It is implementation defined behaviour if called concurrently with [`UploadPart::execute`] + async fn abort(&mut self) -> Result<()>; +} + +/// A synchronous write API for uploading data in parallel +/// +/// Makes use of [`JoinSet`] under the hood to multiplex upload tasks, +/// avoiding issues caused by sharing a single tokio's cooperative task +/// budget across multiple IO operations. +/// +/// The design also takes inspiration from [`Sink`] with [`ChunkedUpload::wait_for_capacity`] +/// allowing back pressure on producers, prior to buffering the next part. However, unlike +/// [`Sink`] this back pressure is optional, allowing integration with synchronous producers +/// +/// [`Sink`]: futures::sink::Sink +#[derive(Debug)] +pub struct ChunkedUpload { + upload: Box<dyn Upload>, + + buffer: Vec<u8>, + + tasks: JoinSet<Result<()>>, +} + +impl ChunkedUpload { + /// Create a new [`ChunkedUpload`] + pub fn new(upload: Box<dyn Upload>) -> Self { + Self::new_with_capacity(upload, 5 * 1024 * 1024) + } + + /// Create a new [`ChunkedUpload`] that will upload in fixed `capacity` sized chunks + pub fn new_with_capacity(upload: Box<dyn Upload>, capacity: usize) -> Self { + Self { + upload, + buffer: Vec::with_capacity(capacity), + tasks: Default::default(), + } + } + + /// Wait until there are `max_concurrency` or fewer requests in-flight + pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> { + while self.tasks.len() > max_concurrency { + self.tasks.join_next().await.unwrap()??; + } + Ok(()) + } + + /// Write data to this [`ChunkedUpload`] + /// + /// Back pressure can optionally be applied to producers by calling + /// [`Self::wait_for_capacity`] prior to calling this method + pub fn write(&mut self, mut buf: &[u8]) { Review Comment: It seems like this API basically requires copying the data. I wonder if there is some way to allow users to pass in an owned buffer already, like ```rust pub fn write(&mut self, buf: impl Into<Buffer>) { ... } ``` And then internally slicing up the Buffer to ensure the correct sizes I realize that `put` currently requires a single contiguous buffer (per part), so maybe this copy isn't that big a problem. However it seems a pity that we require the copy 🤔 ```rust async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> { self.put_opts(location, bytes, PutOptions::default()).await } ``` ########## object_store/src/upload.rs: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PutResult, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use tokio::task::JoinSet; + +/// An upload part request +pub type UploadPart = BoxFuture<'static, Result<()>>; Review Comment: Is there a `PutResult` that is returned on each part upload? As in should this be ```suggestion pub type UploadPart = BoxFuture<'static, Result<PutResult>>; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
