alamb commented on code in PR #5835:
URL: https://github.com/apache/arrow-rs/pull/5835#discussion_r1632401465
##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+ store: Arc<dyn ObjectStore>,
+ path: Path,
+
+ chunk_size: usize,
+ max_concurrency: usize,
+
+ buffer: PutPayloadMut,
+ write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BufUploader")
+ .field("chunk_size", &self.chunk_size)
+ .finish()
+ }
+}
+
+impl BufUploader {
+ /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and
[`Path`]
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self::with_chunk_size(store, path, 5 * 1024 * 1024)
+ }
+
+ /// Create a new [`BufUploader`] from the provided [`ObjectStore`],
[`Path`] and `capacity`
Review Comment:
I also don't understand why chunk_size is set via a special
constructor(`with_chunk_size`) but `max_concurrency` is set with a builder
style API to update an existing object (`with_max_concurrency` )
Perhaps we can use builder style API for both
```rust
/// Specify the upload chunk size
///
/// Defaults to 1MB
pub fn with_chunk_size(self, chunk_size: usize) -> Self {
Self {
chunk_size,
..self
}
}
```
If there is a reason for the inconsistency, could you please add a comment
explaining the rationale? (e.g. maybe changing chunk size after creation is not
valid 🤔 )
##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
Review Comment:
I recommend
1. File a ticket to track adding the support for tag and atributes
2. Update this to say "Notes: Attribute and tag support are not yet
implemented"
That way if you are not able to submit the follow on PR at least the feature
gap is documented and someone else may be able to help in the future
##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+ store: Arc<dyn ObjectStore>,
+ path: Path,
+
+ chunk_size: usize,
+ max_concurrency: usize,
+
+ buffer: PutPayloadMut,
+ write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BufUploader")
+ .field("chunk_size", &self.chunk_size)
+ .finish()
+ }
+}
+
+impl BufUploader {
+ /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and
[`Path`]
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self::with_chunk_size(store, path, 5 * 1024 * 1024)
+ }
+
+ /// Create a new [`BufUploader`] from the provided [`ObjectStore`],
[`Path`] and `capacity`
Review Comment:
nit:
```suggestion
/// Create a new [`BufUploader`] from the provided [`ObjectStore`],
[`Path`] and `chunk_size`
```
Also, it seems strange to me this function is called `with_chunk_size` when
it is creating a new instance. Perhaps calling it `new_with_chunk_size` would
be more consistent
##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
}
}
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+ store: Arc<dyn ObjectStore>,
+ path: Path,
+
+ chunk_size: usize,
+ max_concurrency: usize,
+
+ buffer: PutPayloadMut,
+ write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Review Comment:
It seems like it would be good to have as many other relevant field as
possible (can't we print `store`, `path`, `max_concurrency`, etc?
Maybe a comment explaining why this is a manual impl would help too (is it
because printing the buffer is too large?)
##########
object_store/src/buffered.rs:
##########
@@ -547,4 +666,44 @@ mod tests {
assert_eq!(response.meta.size, 40);
assert_eq!(response.attributes, attributes);
}
+
+ #[tokio::test]
+ async fn test_buf_uploader() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+ let path = Path::from("file.txt");
+
+ // Test put
+ let mut writer = BufUploader::with_chunk_size(Arc::clone(&store),
path.clone(), 30);
+ writer.put(Bytes::from(vec![0; 20])).await.unwrap();
+ writer.put(Bytes::from(vec![0; 5])).await.unwrap();
+ writer.finish().await.unwrap();
+ let response = store
+ .get_opts(
+ &path,
+ GetOptions {
+ head: true,
+ ..Default::default()
+ },
+ )
+ .await
+ .unwrap();
+ assert_eq!(response.meta.size, 25);
Review Comment:
I recommend verifying the written bytes too if possible -- maybe we could
upload something like `(0..5)` and `(5..20)` to make sure we aren't
accidentally uploading the same bytes twice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]