alamb commented on code in PR #2936:
URL: https://github.com/apache/arrow-datafusion/pull/2936#discussion_r923625775
##########
datafusion/core/src/physical_plan/file_format/chunked_store.rs:
##########
@@ -0,0 +1,122 @@
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use object_store::path::Path;
+use object_store::Result;
+use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use std::fmt::{Debug, Display, Formatter};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Wraps a [`ObjectStore`] and makes its get response return chunks
+///
+/// TODO: Upstream into object_store_rs
+#[derive(Debug)]
+pub struct ChunkedStore {
+ inner: Arc<dyn ObjectStore>,
+ chunk_size: usize,
+}
+
+impl ChunkedStore {
+ pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
+ Self { inner, chunk_size }
+ }
+}
+
+impl Display for ChunkedStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "ChunkedStore({})", self.inner)
+ }
+}
+
+#[async_trait]
+impl ObjectStore for ChunkedStore {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.inner.put(location, bytes).await
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let bytes = self.inner.get(location).await?.bytes().await?;
+ let mut offset = 0;
+ let chunk_size = self.chunk_size;
+
+ Ok(GetResult::Stream(
+ futures::stream::iter(std::iter::from_fn(move || {
+ let remaining = bytes.len() - offset;
+ if remaining == 0 {
+ return None;
+ }
+ let to_read = remaining.min(chunk_size);
+ let next_offset = offset + to_read;
+ let slice = bytes.slice(offset..next_offset);
+ offset = next_offset;
+ Some(Ok(slice))
+ }))
+ .boxed(),
+ ))
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
+ self.inner.get_range(location, range).await
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ self.inner.head(location).await
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.inner.delete(location).await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.inner.list(prefix).await
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
+ self.inner.list_with_delimiter(prefix).await
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ self.inner.copy(from, to).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.inner.copy_if_not_exists(from, to).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use object_store::memory::InMemory;
+
+ #[tokio::test]
+ async fn test_chunked() {
+ let location = Path::parse("test").unwrap();
+ let store = Arc::new(InMemory::new());
+ store
+ .put(&location, Bytes::from(vec![0; 1001]))
+ .await
+ .unwrap();
+
+ for chunk_size in [10, 20, 31] {
+ let store = ChunkedStore::new(store.clone(), chunk_size);
+ let mut s = match store.get(&location).await.unwrap() {
+ GetResult::Stream(s) => s,
+ _ => unreachable!(),
+ };
+
+ let mut remaining = 1001;
+ while let Some(next) = s.next().await {
+ let size = next.unwrap().len();
+ let expected = remaining.min(chunk_size);
+ assert_eq!(size, expected);
+ remaining -= expected;
+ }
+ }
Review Comment:
I recommend also `assert_eq!(remaining, 0)` at the end of the test to ensure
nothing is lost
##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -198,12 +198,15 @@ impl FormatReader for CsvOpener {
Box::pin(async move {
match store.get(&file.location).await? {
GetResult::File(file, _) => {
- Ok(futures::stream::iter(config.open(file)).boxed())
+ Ok(futures::stream::iter(config.open(file, true)).boxed())
Review Comment:
Is `first-chunk` a bug fix?
##########
datafusion/core/src/physical_plan/file_format/json.rs:
##########
@@ -440,4 +443,38 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_chunked() {
+ let mut ctx = SessionContext::new();
+
+ for chunk_size in [10, 20, 30, 40] {
+ ctx.runtime_env().register_object_store(
+ "file",
+ "",
+ Arc::new(ChunkedStore::new(
Review Comment:
very nice 👌
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]