This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6d5f02439 Upstream newline_delimited_stream and ChunkedStore from
DataFusion (#3341)
6d5f02439 is described below
commit 6d5f02439de9af7a944e010bf1bd8a65955515c2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 16 13:54:24 2022 +0000
Upstream newline_delimited_stream and ChunkedStore from DataFusion (#3341)
* Upstream newline_delimited_stream and ChunkedStore from DataFusion
* Clippy
---
object_store/src/chunked.rs | 247 ++++++++++++++++++++++++++++++++++++++
object_store/src/delimited.rs | 270 ++++++++++++++++++++++++++++++++++++++++++
object_store/src/lib.rs | 3 +
3 files changed, 520 insertions(+)
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
new file mode 100644
index 000000000..76865ef96
--- /dev/null
+++ b/object_store/src/chunked.rs
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A [`ChunkedStore`] that can be used to test streaming behaviour
+
+use std::fmt::{Debug, Display, Formatter};
+use std::io::{BufReader, Read};
+use std::ops::Range;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use tokio::io::AsyncWrite;
+
+use crate::path::Path;
+use crate::util::maybe_spawn_blocking;
+use crate::{GetResult, ListResult, ObjectMeta, ObjectStore};
+use crate::{MultipartId, Result};
+
+/// Wraps a [`ObjectStore`] and makes its get response return chunks
+/// in a controllable manner.
+///
+/// A `ChunkedStore` makes the memory consumption and performance of
+/// the wrapped [`ObjectStore`] worse. It is intended for use within
+/// tests, to control the chunks in the produced output streams. For
+/// example, it is used to verify the delimiting logic in
+/// newline_delimited_stream.
+#[derive(Debug)]
+pub struct ChunkedStore {
+ inner: Arc<dyn ObjectStore>,
+ chunk_size: usize,
+}
+
+impl ChunkedStore {
+ /// Creates a new [`ChunkedStore`] with the specified chunk_size
+ pub fn new(inner: Arc<dyn ObjectStore>, chunk_size: usize) -> Self {
+ Self { inner, chunk_size }
+ }
+}
+
+impl Display for ChunkedStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "ChunkedStore({})", self.inner)
+ }
+}
+
+#[async_trait]
+impl ObjectStore for ChunkedStore {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.inner.put(location, bytes).await
+ }
+
+ async fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ self.inner.put_multipart(location).await
+ }
+
+ async fn abort_multipart(
+ &self,
+ location: &Path,
+ multipart_id: &MultipartId,
+ ) -> Result<()> {
+ self.inner.abort_multipart(location, multipart_id).await
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ match self.inner.get(location).await? {
+ GetResult::File(std_file, ..) => {
+ let reader = BufReader::new(std_file);
+ let chunk_size = self.chunk_size;
+ Ok(GetResult::Stream(
+ futures::stream::try_unfold(reader, move |mut reader|
async move {
+ let (r, out, reader) = maybe_spawn_blocking(move || {
+ let mut out = Vec::with_capacity(chunk_size);
+ let r = (&mut reader)
+ .take(chunk_size as u64)
+ .read_to_end(&mut out)
+ .map_err(|err| crate::Error::Generic {
+ store: "ChunkedStore",
+ source: Box::new(err),
+ })?;
+ Ok((r, out, reader))
+ })
+ .await?;
+
+ match r {
+ 0 => Ok(None),
+ _ => Ok(Some((out.into(), reader))),
+ }
+ })
+ .boxed(),
+ ))
+ }
+ GetResult::Stream(stream) => {
+ let buffer = BytesMut::new();
+ Ok(GetResult::Stream(
+ futures::stream::unfold(
+ (stream, buffer, false, self.chunk_size),
+ |(mut stream, mut buffer, mut exhausted, chunk_size)|
async move {
+ // Keep accumulating bytes until we reach capacity
as long as
+ // the stream can provide them:
+ if exhausted {
+ return None;
+ }
+ while buffer.len() < chunk_size {
+ match stream.next().await {
+ None => {
+ exhausted = true;
+ let slice =
buffer.split_off(0).freeze();
+ return Some((
+ Ok(slice),
+ (stream, buffer, exhausted,
chunk_size),
+ ));
+ }
+ Some(Ok(bytes)) => {
+ buffer.put(bytes);
+ }
+ Some(Err(e)) => {
+ return Some((
+ Err(crate::Error::Generic {
+ store: "ChunkedStore",
+ source: Box::new(e),
+ }),
+ (stream, buffer, exhausted,
chunk_size),
+ ))
+ }
+ };
+ }
+ // Return the chunked values as the next value in
the stream
+ let slice = buffer.split_to(chunk_size).freeze();
+ Some((Ok(slice), (stream, buffer, exhausted,
chunk_size)))
+ },
+ )
+ .boxed(),
+ ))
+ }
+ }
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
+ self.inner.get_range(location, range).await
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ self.inner.head(location).await
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.inner.delete(location).await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ self.inner.list(prefix).await
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
+ self.inner.list_with_delimiter(prefix).await
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ self.inner.copy(from, to).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.inner.copy_if_not_exists(from, to).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use futures::StreamExt;
+
+ use crate::local::LocalFileSystem;
+ use crate::memory::InMemory;
+ use crate::path::Path;
+ use crate::tests::*;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_chunked_basic() {
+ let location = Path::parse("test").unwrap();
+ let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
+ store
+ .put(&location, Bytes::from(vec![0; 1001]))
+ .await
+ .unwrap();
+
+ for chunk_size in [10, 20, 31] {
+ let store = ChunkedStore::new(Arc::clone(&store), chunk_size);
+ let mut s = match store.get(&location).await.unwrap() {
+ GetResult::Stream(s) => s,
+ _ => unreachable!(),
+ };
+
+ let mut remaining = 1001;
+ while let Some(next) = s.next().await {
+ let size = next.unwrap().len();
+ let expected = remaining.min(chunk_size);
+ assert_eq!(size, expected);
+ remaining -= expected;
+ }
+ assert_eq!(remaining, 0);
+ }
+ }
+
+ #[tokio::test]
+ async fn test_chunked() {
+ let temporary = tempfile::tempdir().unwrap();
+ let integrations: &[Arc<dyn ObjectStore>] = &[
+ Arc::new(InMemory::new()),
+
Arc::new(LocalFileSystem::new_with_prefix(temporary.path()).unwrap()),
+ ];
+
+ for integration in integrations {
+ let integration = ChunkedStore::new(Arc::clone(integration), 100);
+
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
+ }
+ }
+}
diff --git a/object_store/src/delimited.rs b/object_store/src/delimited.rs
new file mode 100644
index 000000000..132148651
--- /dev/null
+++ b/object_store/src/delimited.rs
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility for streaming newline delimited files from object storage
+
+use std::collections::VecDeque;
+
+use bytes::Bytes;
+use futures::{Stream, StreamExt};
+use snafu::{ensure, Snafu};
+
+use super::Result;
+
+#[derive(Debug, Snafu)]
+enum Error {
+ #[snafu(display("encountered unterminated string"))]
+ UnterminatedString,
+
+ #[snafu(display("encountered trailing escape character"))]
+ TrailingEscape,
+}
+
+impl From<Error> for super::Error {
+ fn from(err: Error) -> Self {
+ Self::Generic {
+ store: "LineDelimiter",
+ source: Box::new(err),
+ }
+ }
+}
+
+/// The ASCII encoding of `"`
+const QUOTE: u8 = b'"';
+
+/// The ASCII encoding of `\n`
+const NEWLINE: u8 = b'\n';
+
+/// The ASCII encoding of `\`
+const ESCAPE: u8 = b'\\';
+
+/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an
iterator
+/// of [`Bytes`] containing a whole number of new line delimited records
+#[derive(Debug, Default)]
+struct LineDelimiter {
+ /// Complete chunks of [`Bytes`]
+ complete: VecDeque<Bytes>,
+ /// Remainder bytes that form the next record
+ remainder: Vec<u8>,
+ /// True if the last character was the escape character
+ is_escape: bool,
+ /// True if currently processing a quoted string
+ is_quote: bool,
+}
+
+impl LineDelimiter {
+ /// Creates a new [`LineDelimiter`] with the provided delimiter
+ fn new() -> Self {
+ Self::default()
+ }
+
+ /// Adds the next set of [`Bytes`]
+ fn push(&mut self, val: impl Into<Bytes>) {
+ let val: Bytes = val.into();
+
+ let is_escape = &mut self.is_escape;
+ let is_quote = &mut self.is_quote;
+ let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
+ if *is_escape {
+ *is_escape = false;
+ None
+ } else if *v == ESCAPE {
+ *is_escape = true;
+ None
+ } else if *v == QUOTE {
+ *is_quote = !*is_quote;
+ None
+ } else if *is_quote {
+ None
+ } else {
+ (*v == NEWLINE).then_some(idx + 1)
+ }
+ });
+
+ let start_offset = match self.remainder.is_empty() {
+ true => 0,
+ false => match record_ends.next() {
+ Some(idx) => {
+ self.remainder.extend_from_slice(&val[0..idx]);
+ self.complete
+ .push_back(Bytes::from(std::mem::take(&mut
self.remainder)));
+ idx
+ }
+ None => {
+ self.remainder.extend_from_slice(&val);
+ return;
+ }
+ },
+ };
+ let end_offset = record_ends.last().unwrap_or(start_offset);
+ if start_offset != end_offset {
+ self.complete.push_back(val.slice(start_offset..end_offset));
+ }
+
+ if end_offset != val.len() {
+ self.remainder.extend_from_slice(&val[end_offset..])
+ }
+ }
+
+ /// Marks the end of the stream, delimiting any remaining bytes
+ ///
+ /// Returns `true` if there is no remaining data to be read
+ fn finish(&mut self) -> Result<bool> {
+ if !self.remainder.is_empty() {
+ ensure!(!self.is_quote, UnterminatedStringSnafu);
+ ensure!(!self.is_quote, TrailingEscapeSnafu);
+
+ self.complete
+ .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
+ }
+ Ok(self.complete.is_empty())
+ }
+}
+
+impl Iterator for LineDelimiter {
+ type Item = Bytes;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.complete.pop_front()
+ }
+}
+
+/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
+/// yielded [`Bytes`] contains a whole number of new line delimited records
+/// accounting for `\` style escapes and `"` quotes
+pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
+where
+ S: Stream<Item = Result<Bytes>> + Unpin,
+{
+ let delimiter = LineDelimiter::new();
+
+ futures::stream::unfold(
+ (s, delimiter, false),
+ |(mut s, mut delimiter, mut exhausted)| async move {
+ loop {
+ if let Some(next) = delimiter.next() {
+ return Some((Ok(next), (s, delimiter, exhausted)));
+ } else if exhausted {
+ return None;
+ }
+
+ match s.next().await {
+ Some(Ok(bytes)) => delimiter.push(bytes),
+ Some(Err(e)) => return Some((Err(e), (s, delimiter,
exhausted))),
+ None => {
+ exhausted = true;
+ match delimiter.finish() {
+ Ok(true) => return None,
+ Ok(false) => continue,
+ Err(e) => return Some((Err(e), (s, delimiter,
exhausted))),
+ }
+ }
+ }
+ }
+ },
+ )
+}
+
+#[cfg(test)]
+mod tests {
+ use futures::stream::{BoxStream, TryStreamExt};
+
+ use super::*;
+
+ #[test]
+ fn test_delimiter() {
+ let mut delimiter = LineDelimiter::new();
+ delimiter.push("hello\nworld");
+ delimiter.push("\n\n");
+
+ assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
+ assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
+ assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
+ assert!(delimiter.next().is_none());
+ }
+
+ #[test]
+ fn test_delimiter_escaped() {
+ let mut delimiter = LineDelimiter::new();
+ delimiter.push("");
+ delimiter.push("fo\\\n\"foo");
+ delimiter.push("bo\n\"bar\n");
+ delimiter.push("\"he");
+ delimiter.push("llo\"\n");
+ assert_eq!(
+ delimiter.next().unwrap(),
+ Bytes::from("fo\\\n\"foobo\n\"bar\n")
+ );
+ assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
+ assert!(delimiter.next().is_none());
+
+ // Verify can push further data
+ delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
+ assert!(!delimiter.finish().unwrap());
+
+ assert_eq!(
+ delimiter.next().unwrap(),
+ Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
+ );
+ assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
+ assert!(delimiter.finish().unwrap());
+ assert!(delimiter.next().is_none());
+ }
+
+ #[tokio::test]
+ async fn test_delimiter_stream() {
+ let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
+ let input_stream =
+ futures::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
+ let stream = newline_delimited_stream(input_stream);
+
+ let results: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(
+ results,
+ vec![
+ Bytes::from("hello\nworld\n"),
+ Bytes::from("bingo\n"),
+ Bytes::from("cupcakes")
+ ]
+ )
+ }
+ #[tokio::test]
+ async fn test_delimiter_unfold_stream() {
+ let input_stream: BoxStream<'static, Result<Bytes>> =
futures::stream::unfold(
+ VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
+ |mut input| async move {
+ if !input.is_empty() {
+ Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
+ } else {
+ None
+ }
+ },
+ )
+ .boxed();
+ let stream = newline_delimited_stream(input_stream);
+
+ let results: Vec<_> = stream.try_collect().await.unwrap();
+ assert_eq!(
+ results,
+ vec![
+ Bytes::from("hello\nworld\n"),
+ Bytes::from("bingo\n"),
+ Bytes::from("cupcakes")
+ ]
+ )
+ }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 0cd56612e..85e8737b7 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -163,6 +163,9 @@ compile_error!("Features 'gcp', 'aws', 'azure' are not
supported on wasm.");
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
+#[cfg(not(target_arch = "wasm32"))]
+pub mod chunked;
+pub mod delimited;
#[cfg(feature = "gcp")]
pub mod gcp;
pub mod limit;