This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c0defeb7a Use upstream newline_delimited_stream (#5267)
c0defeb7a is described below
commit c0defeb7a074d286976240a3965600a77038459a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Feb 13 19:58:02 2023 +0000
Use upstream newline_delimited_stream (#5267)
* Use upstream newline_delimited_stream
* Drive by fix
---
benchmarks/Cargo.toml | 2 +-
datafusion-cli/Cargo.toml | 2 +-
datafusion-examples/Cargo.toml | 2 +-
datafusion/common/Cargo.toml | 2 +-
datafusion/core/src/datasource/file_format/csv.rs | 16 +-
.../physical_plan/file_format/delimited_stream.rs | 255 ---------------------
.../core/src/physical_plan/file_format/mod.rs | 2 -
datafusion/proto/Cargo.toml | 2 +-
datafusion/substrait/Cargo.toml | 2 +-
parquet-test-utils/Cargo.toml | 2 +-
10 files changed, 13 insertions(+), 274 deletions(-)
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 21860b103..02eeb00a3 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -39,7 +39,7 @@ env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
-object_store = "0.5.0"
+object_store = "0.5.4"
parquet = "32.0.0"
parquet-test-utils = { path = "../parquet-test-utils/", version = "0.1.0" }
rand = "0.8.4"
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 5e8a83904..375e86b7b 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -36,7 +36,7 @@ datafusion = { path = "../datafusion/core", version =
"18.0.0" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
-object_store = { version = "0.5.0", features = ["aws", "gcp"] }
+object_store = { version = "0.5.4", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
rustyline = "10.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread",
"sync", "parking_lot"] }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index bbc2500b1..4f50ca614 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -49,7 +49,7 @@ futures = "0.3"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
-object_store = { version = "0.5", features = ["aws"] }
+object_store = { version = "0.5.4", features = ["aws"] }
prost = { version = "0.11", default-features = false }
prost-derive = { version = "0.11", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 8c865172b..0dcae674f 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -44,7 +44,7 @@ arrow = { version = "32.0.0", default-features = false }
chrono = { version = "0.4", default-features = false }
cranelift-module = { version = "0.92.0", optional = true }
num_cpus = "1.13.0"
-object_store = { version = "0.5.0", default-features = false, optional = true }
+object_store = { version = "0.5.4", default-features = false, optional = true }
parquet = { version = "32.0.0", default-features = false, optional = true }
pyo3 = { version = "0.18.0", optional = true }
sqlparser = "0.30"
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 358189627..85a9d186a 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -30,7 +30,7 @@ use bytes::{Buf, Bytes};
use datafusion_common::DataFusionError;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
use super::FileFormat;
use crate::datasource::file_format::file_type::FileCompressionType;
@@ -38,9 +38,7 @@ use
crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
-use crate::physical_plan::file_format::{
- newline_delimited_stream, CsvExec, FileScanConfig,
-};
+use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
@@ -183,12 +181,10 @@ async fn read_to_delimited_chunks(
.map_err(DataFusionError::ObjectStore);
match stream {
- Ok(s) => newline_delimited_stream(
- s.into_stream()
- .map_err(|e| DataFusionError::External(Box::new(e))),
- )
- .left_stream(),
- Err(e) => futures::stream::iter(vec![Err(e)]).right_stream(),
+ Ok(s) => newline_delimited_stream(s.into_stream())
+ .map_err(|e| DataFusionError::External(Box::new(e)))
+ .left_stream(),
+ Err(e) =>
futures::stream::once(futures::future::ready(Err(e))).right_stream(),
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/delimited_stream.rs
b/datafusion/core/src/physical_plan/file_format/delimited_stream.rs
deleted file mode 100644
index c6317ad13..000000000
--- a/datafusion/core/src/physical_plan/file_format/delimited_stream.rs
+++ /dev/null
@@ -1,255 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::{DataFusionError, Result};
-use bytes::Bytes;
-use futures::{Stream, StreamExt};
-use std::collections::VecDeque;
-
-/// The ASCII encoding of `"`
-const QUOTE: u8 = b'"';
-
-/// The ASCII encoding of `\n`
-const NEWLINE: u8 = b'\n';
-
-/// The ASCII encoding of `\`
-const ESCAPE: u8 = b'\\';
-
-/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an
iterator
-/// of [`Bytes`] containing a whole number of new line delimited records
-#[derive(Debug, Default)]
-struct LineDelimiter {
- /// Complete chunks of [`Bytes`]
- complete: VecDeque<Bytes>,
- /// Remainder bytes that form the next record
- remainder: Vec<u8>,
- /// True if the last character was the escape character
- is_escape: bool,
- /// True if currently processing a quoted string
- is_quote: bool,
-}
-
-impl LineDelimiter {
- /// Creates a new [`LineDelimiter`] with the provided delimiter
- fn new() -> Self {
- Self::default()
- }
-
- /// Adds the next set of [`Bytes`]
- fn push(&mut self, val: impl Into<Bytes>) {
- let val: Bytes = val.into();
-
- let is_escape = &mut self.is_escape;
- let is_quote = &mut self.is_quote;
- let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
- if *is_escape {
- *is_escape = false;
- None
- } else if *v == ESCAPE {
- *is_escape = true;
- None
- } else if *v == QUOTE {
- *is_quote = !*is_quote;
- None
- } else if *is_quote {
- None
- } else {
- (*v == NEWLINE).then_some(idx + 1)
- }
- });
-
- let start_offset = match self.remainder.is_empty() {
- true => 0,
- false => match record_ends.next() {
- Some(idx) => {
- self.remainder.extend_from_slice(&val[0..idx]);
- self.complete
- .push_back(Bytes::from(std::mem::take(&mut
self.remainder)));
- idx
- }
- None => {
- self.remainder.extend_from_slice(&val);
- return;
- }
- },
- };
- let end_offset = record_ends.last().unwrap_or(start_offset);
- if start_offset != end_offset {
- self.complete.push_back(val.slice(start_offset..end_offset));
- }
-
- if end_offset != val.len() {
- self.remainder.extend_from_slice(&val[end_offset..])
- }
- }
-
- /// Marks the end of the stream, delimiting any remaining bytes
- ///
- /// Returns `true` if there is no remaining data to be read
- fn finish(&mut self) -> Result<bool> {
- if !self.remainder.is_empty() {
- if self.is_quote {
- return Err(DataFusionError::Execution(
- "encountered unterminated string".to_string(),
- ));
- }
-
- if self.is_escape {
- return Err(DataFusionError::Execution(
- "encountered trailing escape character".to_string(),
- ));
- }
-
- self.complete
- .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
- }
- Ok(self.complete.is_empty())
- }
-}
-
-impl Iterator for LineDelimiter {
- type Item = Bytes;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.complete.pop_front()
- }
-}
-
-/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
-/// yielded [`Bytes`] contains a whole number of new line delimited records
-/// accounting for `\` style escapes and `"` quotes
-pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
-where
- S: Stream<Item = Result<Bytes>> + Unpin,
-{
- let delimiter = LineDelimiter::new();
-
- futures::stream::unfold(
- (s, delimiter, false),
- |(mut s, mut delimiter, mut exhausted)| async move {
- loop {
- if let Some(next) = delimiter.next() {
- return Some((Ok(next), (s, delimiter, exhausted)));
- } else if exhausted {
- return None;
- }
-
- match s.next().await {
- Some(Ok(bytes)) => delimiter.push(bytes),
- Some(Err(e)) => return Some((Err(e), (s, delimiter,
exhausted))),
- None => {
- exhausted = true;
- match delimiter.finish() {
- Ok(true) => return None,
- Ok(false) => continue,
- Err(e) => return Some((Err(e), (s, delimiter,
exhausted))),
- }
- }
- }
- }
- },
- )
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use futures::stream::{BoxStream, TryStreamExt};
-
- #[test]
- fn test_delimiter() {
- let mut delimiter = LineDelimiter::new();
- delimiter.push("hello\nworld");
- delimiter.push("\n\n");
-
- assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
- assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
- assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
- assert!(delimiter.next().is_none());
- }
-
- #[test]
- fn test_delimiter_escaped() {
- let mut delimiter = LineDelimiter::new();
- delimiter.push("");
- delimiter.push("fo\\\n\"foo");
- delimiter.push("bo\n\"bar\n");
- delimiter.push("\"he");
- delimiter.push("llo\"\n");
- assert_eq!(
- delimiter.next().unwrap(),
- Bytes::from("fo\\\n\"foobo\n\"bar\n")
- );
- assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
- assert!(delimiter.next().is_none());
-
- // Verify can push further data
- delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
- assert!(!delimiter.finish().unwrap());
-
- assert_eq!(
- delimiter.next().unwrap(),
- Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
- );
- assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
- assert!(delimiter.finish().unwrap());
- assert!(delimiter.next().is_none());
- }
-
- #[tokio::test]
- async fn test_delimiter_stream() {
- let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
- let input_stream =
- futures::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
- let stream = newline_delimited_stream(input_stream);
-
- let results: Vec<_> = stream.try_collect().await.unwrap();
- assert_eq!(
- results,
- vec![
- Bytes::from("hello\nworld\n"),
- Bytes::from("bingo\n"),
- Bytes::from("cupcakes")
- ]
- )
- }
- #[tokio::test]
- async fn test_delimiter_unfold_stream() {
- let input_stream: BoxStream<'static, Result<Bytes>> =
futures::stream::unfold(
- VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
- |mut input| async move {
- if !input.is_empty() {
- Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
- } else {
- None
- }
- },
- )
- .boxed();
- let stream = newline_delimited_stream(input_stream);
-
- let results: Vec<_> = stream.try_collect().await.unwrap();
- assert_eq!(
- results,
- vec![
- Bytes::from("hello\nworld\n"),
- Bytes::from("bingo\n"),
- Bytes::from("cupcakes")
- ]
- )
- }
-}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 60c1fd2e3..97b091ced 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -21,14 +21,12 @@ mod avro;
#[cfg(test)]
mod chunked_store;
mod csv;
-mod delimited_stream;
mod file_stream;
mod json;
mod parquet;
pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
-pub(crate) use self::delimited_stream::newline_delimited_stream;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory};
use arrow::{
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index 82b9ead03..7a6529c25 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -45,7 +45,7 @@ chrono = { version = "0.4", default-features = false }
datafusion = { path = "../core", version = "18.0.0" }
datafusion-common = { path = "../common", version = "18.0.0" }
datafusion-expr = { path = "../expr", version = "18.0.0" }
-object_store = { version = "0.5.0" }
+object_store = { version = "0.5.4" }
parking_lot = { version = "0.12" }
pbjson = { version = "0.5", optional = true }
pbjson-types = { version = "0.5", optional = true }
diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml
index 3d4a72901..02108067c 100644
--- a/datafusion/substrait/Cargo.toml
+++ b/datafusion/substrait/Cargo.toml
@@ -28,7 +28,7 @@ async-recursion = "1.0"
chrono = "0.4.23"
datafusion = { version = "18.0.0", path = "../core" }
itertools = "0.10.5"
-object_store = "0.5.3"
+object_store = "0.5.4"
prost = "0.11"
prost-types = "0.11"
substrait = "0.4"
diff --git a/parquet-test-utils/Cargo.toml b/parquet-test-utils/Cargo.toml
index a4d72a4b7..c63af8362 100644
--- a/parquet-test-utils/Cargo.toml
+++ b/parquet-test-utils/Cargo.toml
@@ -24,5 +24,5 @@ edition = "2021"
[dependencies]
datafusion = { path = "../datafusion/core" }
-object_store = "0.5.0"
+object_store = "0.5.4"
parquet = "32.0.0"