alamb commented on code in PR #5835:
URL: https://github.com/apache/arrow-rs/pull/5835#discussion_r1632401465


##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
     }
 }
 
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+    store: Arc<dyn ObjectStore>,
+    path: Path,
+
+    chunk_size: usize,
+    max_concurrency: usize,
+
+    buffer: PutPayloadMut,
+    write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("BufUploader")
+            .field("chunk_size", &self.chunk_size)
+            .finish()
+    }
+}
+
+impl BufUploader {
+    /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and 
[`Path`]
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+        Self::with_chunk_size(store, path, 5 * 1024 * 1024)
+    }
+
+    /// Create a new [`BufUploader`] from the provided [`ObjectStore`], 
[`Path`] and `capacity`

Review Comment:
   I also don't understand why chunk_size is set via a special 
constructor(`with_chunk_size`) but `max_concurrency` is set with a builder 
style API to update an existing object (`with_max_concurrency` )
   
   Perhaps we can use builder style API for both
   
   ```rust
       /// Specify the upload chunk size
       ///
       /// Defaults to 1MB
       pub fn with_chunk_size(self, chunk_size: usize) -> Self {
           Self {
               chunk_size,
               ..self
           }
       }
   ```
   
   If there is a reason for the inconsistency, could you please add a comment 
explaining the rationale? (e.g. maybe changing chunk size after creation is not 
valid 🤔 )



##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
     }
 }
 
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO

Review Comment:
   I recommend 
   1. File a ticket to track adding the support for tag and atributes
   2. Update this to say "Notes: Attribute and tag support are not yet 
implemented"
   
   That way if you are not able to submit the follow on PR at least the feature 
gap is documented and someone else may be able to help in the future



##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
     }
 }
 
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+    store: Arc<dyn ObjectStore>,
+    path: Path,
+
+    chunk_size: usize,
+    max_concurrency: usize,
+
+    buffer: PutPayloadMut,
+    write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("BufUploader")
+            .field("chunk_size", &self.chunk_size)
+            .finish()
+    }
+}
+
+impl BufUploader {
+    /// Create a new [`BufUploader`] from the provided [`ObjectStore`] and 
[`Path`]
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+        Self::with_chunk_size(store, path, 5 * 1024 * 1024)
+    }
+
+    /// Create a new [`BufUploader`] from the provided [`ObjectStore`], 
[`Path`] and `capacity`

Review Comment:
   nit:
   
   ```suggestion
       /// Create a new [`BufUploader`] from the provided [`ObjectStore`], 
[`Path`] and `chunk_size`
   ```
   
   Also, it seems strange to me this function is called `with_chunk_size` when 
it is creating a new instance. Perhaps calling it `new_with_chunk_size` would 
be more consistent
   



##########
object_store/src/buffered.rs:
##########
@@ -400,6 +400,125 @@ impl AsyncWrite for BufWriter {
     }
 }
 
+/// A buffered multipart uploader.
+///
+/// This uploader adaptively uses [`ObjectStore::put`] or
+/// [`ObjectStore::put_multipart`] depending on the amount of data that has
+/// been written.
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+///
+/// # TODO
+///
+/// Add attributes and tags support.
+pub struct BufUploader {
+    store: Arc<dyn ObjectStore>,
+    path: Path,
+
+    chunk_size: usize,
+    max_concurrency: usize,
+
+    buffer: PutPayloadMut,
+    write_multipart: Option<WriteMultipart>,
+}
+
+impl std::fmt::Debug for BufUploader {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

Review Comment:
   It seems like it would be good to have as many other relevant field as 
possible (can't we print `store`, `path`, `max_concurrency`, etc?
   
   Maybe a comment explaining why this is a manual impl would help too (is it 
because printing the buffer is too large?)



##########
object_store/src/buffered.rs:
##########
@@ -547,4 +666,44 @@ mod tests {
         assert_eq!(response.meta.size, 40);
         assert_eq!(response.attributes, attributes);
     }
+
+    #[tokio::test]
+    async fn test_buf_uploader() {
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+        let path = Path::from("file.txt");
+
+        // Test put
+        let mut writer = BufUploader::with_chunk_size(Arc::clone(&store), 
path.clone(), 30);
+        writer.put(Bytes::from(vec![0; 20])).await.unwrap();
+        writer.put(Bytes::from(vec![0; 5])).await.unwrap();
+        writer.finish().await.unwrap();
+        let response = store
+            .get_opts(
+                &path,
+                GetOptions {
+                    head: true,
+                    ..Default::default()
+                },
+            )
+            .await
+            .unwrap();
+        assert_eq!(response.meta.size, 25);

Review Comment:
   I recommend verifying the written bytes too if possible -- maybe we could 
upload something like `(0..5)` and `(5..20)` to make sure we aren't 
accidentally uploading the same bytes twice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to