tustvold commented on code in PR #5281:
URL: https://github.com/apache/arrow-rs/pull/5281#discussion_r1449030846
##########
object_store/src/local.rs:
##########
@@ -1082,6 +1097,58 @@ fn convert_walkdir_result(
}
}
+
+/// Download a remote object to a local [`File`]
+pub async fn upload(store: &dyn ObjectStore, location: &Path, opts:
PutOptions, file: &mut std::fs::File) -> Result<()> {
+ // Determine file size
+ let metadata = file.metadata().map_err(|e| Error::FileMetadata {
+ source: e.into(),
+ })?;
+ let file_size = metadata.len();
+
+ // Set a threshold for when to switch to multipart_put
+ let multipart_threshold: u64 = 50 * 1024 * 1024;
+
+ if file_size <= multipart_threshold {
+ let mut buffer = Vec::with_capacity(file_size as usize);
+ file.read_to_end(&mut buffer).map_err(|e|
Error::UnableToReadBytesFromFile{
+ source: e
+ })?;
+ let bytes = Bytes::from(buffer);
+ store.put_opts(&location, bytes, opts).await?;
+ Ok(())
+ } else {
+ let (_id, mut writer) = store.put_multipart(&location).await?;
+ let mut buffer = vec![0u8; 5 * 1024 * 1024];
+ while let Ok(size) = file.read(&mut buffer) {
+ if size == 0 {
+ break;
+ }
+ writer.write_all(&buffer[..size]).await.unwrap();
+ }
+
+ writer.flush().await.unwrap();
+ writer.shutdown().await.unwrap();
+ Ok(())
+ }
+}
+
+
+/// Upload a local [`File`] to a remote object store
+pub async fn download(store: &dyn ObjectStore, location: &Path, opts:
GetOptions, file: &mut File) -> Result<()> {
+ let get_result = store.get_opts(location, opts).await?;
+ let mut stream = get_result.into_stream();
+
+ while let Some(bytes_result) = stream.next().await {
+ let bytes = bytes_result?;
+ file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{
+ source: e.into()
+ })?;
+ }
Review Comment:
Sorry had a crazy few days, something like this should work (not at all
tested)
```
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index e985ff070c..0969731f32 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -155,6 +155,11 @@ pub(crate) enum Error {
InvalidPath {
path: String,
},
+
+ #[snafu(display("Unable to download data to file: {}", source))]
+ Download {
+ source: io::Error,
+ },
}
impl From<Error> for super::Error {
@@ -1093,6 +1098,37 @@ fn convert_walkdir_result(
}
}
+pub async fn download(
+ store: &dyn ObjectStore,
+ location: &Path,
+ opts: GetOptions,
+ file: &mut File,
+) -> Result<()> {
+ let (mut sender, mut receiver) = tokio::sync::mpsc::channel(2);
+ let mut download = store.get_opts(location, opts).await?.into_stream();
+ let forwarder = async move {
+ while let Some(n) = download.next().await.transpose()? {
+ if sender.send(n).await.is_err() {
+ break;
+ }
+ }
+ Ok::<_, crate::Error>(())
+ };
+
+ let mut captured = file.try_clone().context(DownloadSnafu)?;
+ let writer = maybe_spawn_blocking(move || {
+ Ok(async move {
+ while let Some(b) = receiver.blocking_recv() {
+ captured.write_all(&b).context(DownloadSnafu)?;
+ }
+ Ok::<_, crate::Error>(())
+ })
+ });
+
+ let _ = futures::future::try_join(forwarder, writer).await?;
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use super::*;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]