This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d5f02439 Upstream newline_delimited_stream and ChunkedStore from 
DataFusion (#3341)
6d5f02439 is described below

commit 6d5f02439de9af7a944e010bf1bd8a65955515c2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 16 13:54:24 2022 +0000

    Upstream newline_delimited_stream and ChunkedStore from DataFusion (#3341)
    
    * Upstream newline_delimited_stream and ChunkedStore from DataFusion
    
    * Clippy
---
 object_store/src/chunked.rs   | 247 ++++++++++++++++++++++++++++++++++++++
 object_store/src/delimited.rs | 270 ++++++++++++++++++++++++++++++++++++++++++
 object_store/src/lib.rs       |   3 +
 3 files changed, 520 insertions(+)

diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
new file mode 100644
index 000000000..76865ef96
--- /dev/null
+++ b/object_store/src/chunked.rs
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`ChunkedStore`] that can be used to test streaming behaviour
+
+use std::fmt::{Debug, Display, Formatter};
+use std::io::{BufReader, Read};
+use std::ops::Range;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use tokio::io::AsyncWrite;
+
+use crate::path::Path;
+use crate::util::maybe_spawn_blocking;
+use crate::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{MultipartId, Result};
+
+/// Wraps a [`ObjectStore`] and makes its get response return chunks
+/// in a controllable manner.
+///
+/// A `ChunkedStore` makes the memory consumption and performance of
+/// the wrapped [`ObjectStore`] worse. It is intended for use within
+/// tests, to control the chunks in the produced output streams. For
+/// example, it is used to verify the delimiting logic in
+/// newline_delimited_stream.
+#[derive(Debug)]
+pub struct ChunkedStore {
+    inner: Arc<dyn ObjectStore>,
+    chunk_size: usize,
+}
+
+impl ChunkedStore {
+    /// Creates a new [`ChunkedStore`] with the specified chunk_size
+    pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
+        Self { inner, chunk_size }
+    }
+}
+
+impl Display for ChunkedStore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "ChunkedStore({})", self.inner)
+    }
+}
+
+#[async_trait]
+impl ObjectStore for ChunkedStore {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.inner.put(location, bytes).await
+    }
+
+    async fn put_multipart(
+        &self,
+        location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        self.inner.put_multipart(location).await
+    }
+
+    async fn abort_multipart(
+        &self,
+        location: &Path,
+        multipart_id: &MultipartId,
+    ) -> Result<()> {
+        self.inner.abort_multipart(location, multipart_id).await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        match self.inner.get(location).await? {
+            GetResult::File(std_file, ..) => {
+                let reader = BufReader::new(std_file);
+                let chunk_size = self.chunk_size;
+                Ok(GetResult::Stream(
+                    futures::stream::try_unfold(reader, move |mut reader| 
async move {
+                        let (r, out, reader) = maybe_spawn_blocking(move || {
+                            let mut out = Vec::with_capacity(chunk_size);
+                            let r = (&mut reader)
+                                .take(chunk_size as u64)
+                                .read_to_end(&mut out)
+                                .map_err(|err| crate::Error::Generic {
+                                    store: "ChunkedStore",
+                                    source: Box::new(err),
+                                })?;
+                            Ok((r, out, reader))
+                        })
+                        .await?;
+
+                        match r {
+                            0 => Ok(None),
+                            _ => Ok(Some((out.into(), reader))),
+                        }
+                    })
+                    .boxed(),
+                ))
+            }
+            GetResult::Stream(stream) => {
+                let buffer = BytesMut::new();
+                Ok(GetResult::Stream(
+                    futures::stream::unfold(
+                        (stream, buffer, false, self.chunk_size),
+                        |(mut stream, mut buffer, mut exhausted, chunk_size)| 
async move {
+                            // Keep accumulating bytes until we reach capacity 
as long as
+                            // the stream can provide them:
+                            if exhausted {
+                                return None;
+                            }
+                            while buffer.len() < chunk_size {
+                                match stream.next().await {
+                                    None => {
+                                        exhausted = true;
+                                        let slice = 
buffer.split_off(0).freeze();
+                                        return Some((
+                                            Ok(slice),
+                                            (stream, buffer, exhausted, 
chunk_size),
+                                        ));
+                                    }
+                                    Some(Ok(bytes)) => {
+                                        buffer.put(bytes);
+                                    }
+                                    Some(Err(e)) => {
+                                        return Some((
+                                            Err(crate::Error::Generic {
+                                                store: "ChunkedStore",
+                                                source: Box::new(e),
+                                            }),
+                                            (stream, buffer, exhausted, 
chunk_size),
+                                        ))
+                                    }
+                                };
+                            }
+                            // Return the chunked values as the next value in 
the stream
+                            let slice = buffer.split_to(chunk_size).freeze();
+                            Some((Ok(slice), (stream, buffer, exhausted, 
chunk_size)))
+                        },
+                    )
+                    .boxed(),
+                ))
+            }
+        }
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
+        self.inner.get_range(location, range).await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        self.inner.head(location).await
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.inner.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        self.inner.list(prefix).await
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+        self.inner.list_with_delimiter(prefix).await
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        self.inner.copy(from, to).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.inner.copy_if_not_exists(from, to).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use futures::StreamExt;
+
+    use crate::local::LocalFileSystem;
+    use crate::memory::InMemory;
+    use crate::path::Path;
+    use crate::tests::*;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_chunked_basic() {
+        let location = Path::parse("test").unwrap();
+        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+        store
+            .put(&location, Bytes::from(vec![0; 1001]))
+            .await
+            .unwrap();
+
+        for chunk_size in [10, 20, 31] {
+            let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
+            let mut s = match store.get(&location).await.unwrap() {
+                GetResult::Stream(s) => s,
+                _ => unreachable!(),
+            };
+
+            let mut remaining = 1001;
+            while let Some(next) = s.next().await {
+                let size = next.unwrap().len();
+                let expected = remaining.min(chunk_size);
+                assert_eq!(size, expected);
+                remaining -= expected;
+            }
+            assert_eq!(remaining, 0);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_chunked() {
+        let temporary = tempfile::tempdir().unwrap();
+        let integrations: &[Arc<dyn ObjectStore>] = &[
+            Arc::new(InMemory::new()),
+            
Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
+        ];
+
+        for integration in integrations {
+            let integration = ChunkedStore::new(Arc::clone(integration), 100);
+
+            put_get_delete_list(&integration).await;
+            list_uses_directories_correctly(&integration).await;
+            list_with_delimiter(&integration).await;
+            rename_and_copy(&integration).await;
+            copy_if_not_exists(&integration).await;
+            stream_get(&integration).await;
+        }
+    }
+}
diff --git a/object_store/src/delimited.rs b/object_store/src/delimited.rs
new file mode 100644
index 000000000..132148651
--- /dev/null
+++ b/object_store/src/delimited.rs
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility for streaming newline delimited files from object storage
+
+use std::collections::VecDeque;
+
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use snafu::{ensure, Snafu};
+
+use super::Result;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("encountered unterminated string"))]
+    UnterminatedString,
+
+    #[snafu(display("encountered trailing escape character"))]
+    TrailingEscape,
+}
+
+impl From<Error> for super::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "LineDelimiter",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = b'"';
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = b'\n';
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = b'\\';
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an 
iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+    /// Complete chunks of [`Bytes`]
+    complete: VecDeque<Bytes>,
+    /// Remainder bytes that form the next record
+    remainder: Vec<u8>,
+    /// True if the last character was the escape character
+    is_escape: bool,
+    /// True if currently processing a quoted string
+    is_quote: bool,
+}
+
+impl LineDelimiter {
+    /// Creates a new [`LineDelimiter`] with the provided delimiter
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Adds the next set of [`Bytes`]
+    fn push(&mut self, val: impl Into<Bytes>) {
+        let val: Bytes = val.into();
+
+        let is_escape = &mut self.is_escape;
+        let is_quote = &mut self.is_quote;
+        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+            if *is_escape {
+                *is_escape = false;
+                None
+            } else if *v == ESCAPE {
+                *is_escape = true;
+                None
+            } else if *v == QUOTE {
+                *is_quote = !*is_quote;
+                None
+            } else if *is_quote {
+                None
+            } else {
+                (*v == NEWLINE).then_some(idx + 1)
+            }
+        });
+
+        let start_offset = match self.remainder.is_empty() {
+            true => 0,
+            false => match record_ends.next() {
+                Some(idx) => {
+                    self.remainder.extend_from_slice(&val[0..idx]);
+                    self.complete
+                        .push_back(Bytes::from(std::mem::take(&mut 
self.remainder)));
+                    idx
+                }
+                None => {
+                    self.remainder.extend_from_slice(&val);
+                    return;
+                }
+            },
+        };
+        let end_offset = record_ends.last().unwrap_or(start_offset);
+        if start_offset != end_offset {
+            self.complete.push_back(val.slice(start_offset..end_offset));
+        }
+
+        if end_offset != val.len() {
+            self.remainder.extend_from_slice(&val[end_offset..])
+        }
+    }
+
+    /// Marks the end of the stream, delimiting any remaining bytes
+    ///
+    /// Returns `true` if there is no remaining data to be read
+    fn finish(&mut self) -> Result<bool> {
+        if !self.remainder.is_empty() {
+            ensure!(!self.is_quote, UnterminatedStringSnafu);
+            ensure!(!self.is_quote, TrailingEscapeSnafu);
+
+            self.complete
+                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+        }
+        Ok(self.complete.is_empty())
+    }
+}
+
+impl Iterator for LineDelimiter {
+    type Item = Bytes;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.complete.pop_front()
+    }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+/// accounting for `\` style escapes and `"` quotes
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+    S: Stream<Item = Result<Bytes>> + Unpin,
+{
+    let delimiter = LineDelimiter::new();
+
+    futures::stream::unfold(
+        (s, delimiter, false),
+        |(mut s, mut delimiter, mut exhausted)| async move {
+            loop {
+                if let Some(next) = delimiter.next() {
+                    return Some((Ok(next), (s, delimiter, exhausted)));
+                } else if exhausted {
+                    return None;
+                }
+
+                match s.next().await {
+                    Some(Ok(bytes)) => delimiter.push(bytes),
+                    Some(Err(e)) => return Some((Err(e), (s, delimiter, 
exhausted))),
+                    None => {
+                        exhausted = true;
+                        match delimiter.finish() {
+                            Ok(true) => return None,
+                            Ok(false) => continue,
+                            Err(e) => return Some((Err(e), (s, delimiter, 
exhausted))),
+                        }
+                    }
+                }
+            }
+        },
+    )
+}
+
+#[cfg(test)]
+mod tests {
+    use futures::stream::{BoxStream, TryStreamExt};
+
+    use super::*;
+
+    #[test]
+    fn test_delimiter() {
+        let mut delimiter = LineDelimiter::new();
+        delimiter.push("hello\nworld");
+        delimiter.push("\n\n");
+
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
+        assert!(delimiter.next().is_none());
+    }
+
+    #[test]
+    fn test_delimiter_escaped() {
+        let mut delimiter = LineDelimiter::new();
+        delimiter.push("");
+        delimiter.push("fo\\\n\"foo");
+        delimiter.push("bo\n\"bar\n");
+        delimiter.push("\"he");
+        delimiter.push("llo\"\n");
+        assert_eq!(
+            delimiter.next().unwrap(),
+            Bytes::from("fo\\\n\"foobo\n\"bar\n")
+        );
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
+        assert!(delimiter.next().is_none());
+
+        // Verify can push further data
+        delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
+        assert!(!delimiter.finish().unwrap());
+
+        assert_eq!(
+            delimiter.next().unwrap(),
+            Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
+        );
+        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
+        assert!(delimiter.finish().unwrap());
+        assert!(delimiter.next().is_none());
+    }
+
+    #[tokio::test]
+    async fn test_delimiter_stream() {
+        let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
+        let input_stream =
+            futures::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
+        let stream = newline_delimited_stream(input_stream);
+
+        let results: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(
+            results,
+            vec![
+                Bytes::from("hello\nworld\n"),
+                Bytes::from("bingo\n"),
+                Bytes::from("cupcakes")
+            ]
+        )
+    }
+    #[tokio::test]
+    async fn test_delimiter_unfold_stream() {
+        let input_stream: BoxStream<'static, Result<Bytes>> = 
futures::stream::unfold(
+            VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
+            |mut input| async move {
+                if !input.is_empty() {
+                    Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
+                } else {
+                    None
+                }
+            },
+        )
+        .boxed();
+        let stream = newline_delimited_stream(input_stream);
+
+        let results: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(
+            results,
+            vec![
+                Bytes::from("hello\nworld\n"),
+                Bytes::from("bingo\n"),
+                Bytes::from("cupcakes")
+            ]
+        )
+    }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 0cd56612e..85e8737b7 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -163,6 +163,9 @@ compile_error!("Features 'gcp', 'aws', 'azure' are not 
supported on wasm.");
 pub mod aws;
 #[cfg(feature = "azure")]
 pub mod azure;
+#[cfg(not(target_arch = "wasm32"))]
+pub mod chunked;
+pub mod delimited;
 #[cfg(feature = "gcp")]
 pub mod gcp;
 pub mod limit;

Reply via email to