midnattsol commented on issue #5277:
URL: https://github.com/apache/arrow-rs/issues/5277#issuecomment-2507693426

   Hello, I'm reading the need, and how it could be implemented.
   I have a question @tustvold. Do you think the functions could look like 
this, for example?
   
   ```
   pub async fn download(
       store: &dyn ObjectStore,
       location: &Path,
       opts: GetOptions,
       file: &mut std::fs::File,
       transfer_opts: Option<&DownloadTransferConfig>,
   ) -> Result<()> 
   ```
   
   The idea of **transfer_opts** (I can change the name for anything else) it's 
implement something similar to this:
   ```
   pub struct DownloadTransferConfig {
       /// The maximum number of concurrent chunks to download
       pub max_concurrent_chunks: usize,
       /// The maximum number of bytes to buffer in memory
       pub chunk_queue_size: usize,
   }
   
   impl Default for DownloadTransferConfig {
       fn default() -> Self {
           Self {
               max_concurrent_chunks: 1,
               chunk_queue_size: 2,
           }
       }
   }
   ```
   
   So in the download function, would be possible to use something similar to 
this
   ```
   pub async fn download(
       store: &dyn ObjectStore,
       location: &Path,
       opts: GetOptions,
       file: &mut std::fs::File,
       transfer_opts: Option<&DownloadTransferConfig>,
   ) -> Result<()> {
       let result = store.get_opts(&location, opts).await?;
       let transfer_opts = 
*transfer_opts.unwrap_or(&DownloadTransferConfig::default());
       let (sender, mut receiver) =
           tokio::sync::mpsc::channel::<Bytes>(transfer_opts.chunk_queue_size);
   
       match result.payload {
           GetResultPayload::Stream(stream) => {
               let sender_task = tokio::spawn(async move {
                   let mut buffered_stream = stream
                       .map(|chunk| async move {
                           let chunk = chunk.map_err(crate::Error::from)?;
                           Ok::<Bytes, crate::Error>(chunk)
                       })
                       .buffered(transfer_opts.max_concurrent_chunks);
   
                   while let Some(chunk) = buffered_stream.next().await {
                       let chunk = chunk?;
                       if let Err(e) = sender.send(chunk).await {
                           eprintln!("Error al enviar el chunk: {:?}", e);
                           break;
                       }
                   }
                   drop(sender);
                   Ok::<(), crate::Error>(())
               });
   
               while let Some(chunk) = receiver.recv().await {
                   file.write_all(&chunk).context(UnableToWriteFileSnafu)?;
               }
   
               sender_task.await.context(UnableToJoinTaskSnafu)??;
               Ok(())
           }
   
           GetResultPayload::File(mut source_file, _path) => {
               std::io::copy(&mut source_file, 
file).context(UnableToWriteFileSnafu)?;
               Ok(())
           }
       }
   }
   ```
   
   I'm still making tests, to find the best way to do it. But I was wondering 
if adding the possibility to configure the buffered, or the size of the channel 
would fit with the expectatives of thiss issue.
   
   Regards


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to