This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c0defeb7a Use upstream newline_delimited_stream (#5267)
c0defeb7a is described below

commit c0defeb7a074d286976240a3965600a77038459a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Feb 13 19:58:02 2023 +0000

    Use upstream newline_delimited_stream (#5267)
    
    * Use upstream newline_delimited_stream
    
    * Drive by fix
---
 benchmarks/Cargo.toml                              |   2 +-
 datafusion-cli/Cargo.toml                          |   2 +-
 datafusion-examples/Cargo.toml                     |   2 +-
 datafusion/common/Cargo.toml                       |   2 +-
 datafusion/core/src/datasource/file_format/csv.rs  |  16 +-
 .../physical_plan/file_format/delimited_stream.rs  | 255 ---------------------
 .../core/src/physical_plan/file_format/mod.rs      |   2 -
 datafusion/proto/Cargo.toml                        |   2 +-
 datafusion/substrait/Cargo.toml                    |   2 +-
 parquet-test-utils/Cargo.toml                      |   2 +-
 10 files changed, 13 insertions(+), 274 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 21860b103..02eeb00a3 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -39,7 +39,7 @@ env_logger = "0.10"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
-object_store = "0.5.0"
+object_store = "0.5.4"
 parquet = "32.0.0"
 parquet-test-utils = { path = "../parquet-test-utils/", version = "0.1.0" }
 rand = "0.8.4"
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 5e8a83904..375e86b7b 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -36,7 +36,7 @@ datafusion = { path = "../datafusion/core", version = 
"18.0.0" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
-object_store = { version = "0.5.0", features = ["aws", "gcp"] }
+object_store = { version = "0.5.4", features = ["aws", "gcp"] }
 parking_lot = { version = "0.12" }
 rustyline = "10.0"
 tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", 
"sync", "parking_lot"] }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index bbc2500b1..4f50ca614 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -49,7 +49,7 @@ futures = "0.3"
 log = "0.4"
 mimalloc = { version = "0.1", default-features = false }
 num_cpus = "1.13.0"
-object_store = { version = "0.5", features = ["aws"] }
+object_store = { version = "0.5.4", features = ["aws"] }
 prost = { version = "0.11", default-features = false }
 prost-derive = { version = "0.11", default-features = false }
 serde = { version = "1.0.136", features = ["derive"] }
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 8c865172b..0dcae674f 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -44,7 +44,7 @@ arrow = { version = "32.0.0", default-features = false }
 chrono = { version = "0.4", default-features = false }
 cranelift-module = { version = "0.92.0", optional = true }
 num_cpus = "1.13.0"
-object_store = { version = "0.5.0", default-features = false, optional = true }
+object_store = { version = "0.5.4", default-features = false, optional = true }
 parquet = { version = "32.0.0", default-features = false, optional = true }
 pyo3 = { version = "0.18.0", optional = true }
 sqlparser = "0.30"
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 358189627..85a9d186a 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -30,7 +30,7 @@ use bytes::{Buf, Bytes};
 use datafusion_common::DataFusionError;
 
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
 use super::FileFormat;
 use crate::datasource::file_format::file_type::FileCompressionType;
@@ -38,9 +38,7 @@ use 
crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::file_format::{
-    newline_delimited_stream, CsvExec, FileScanConfig,
-};
+use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
@@ -183,12 +181,10 @@ async fn read_to_delimited_chunks(
         .map_err(DataFusionError::ObjectStore);
 
     match stream {
-        Ok(s) => newline_delimited_stream(
-            s.into_stream()
-                .map_err(|e| DataFusionError::External(Box::new(e))),
-        )
-        .left_stream(),
-        Err(e) => futures::stream::iter(vec![Err(e)]).right_stream(),
+        Ok(s) => newline_delimited_stream(s.into_stream())
+            .map_err(|e| DataFusionError::External(Box::new(e)))
+            .left_stream(),
+        Err(e) => 
futures::stream::once(futures::future::ready(Err(e))).right_stream(),
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/file_format/delimited_stream.rs 
b/datafusion/core/src/physical_plan/file_format/delimited_stream.rs
deleted file mode 100644
index c6317ad13..000000000
--- a/datafusion/core/src/physical_plan/file_format/delimited_stream.rs
+++ /dev/null
@@ -1,255 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::{DataFusionError, Result};
-use bytes::Bytes;
-use futures::{Stream, StreamExt};
-use std::collections::VecDeque;
-
-/// The ASCII encoding of `"`
-const QUOTE: u8 = b'"';
-
-/// The ASCII encoding of `\n`
-const NEWLINE: u8 = b'\n';
-
-/// The ASCII encoding of `\`
-const ESCAPE: u8 = b'\\';
-
-/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an 
iterator
-/// of [`Bytes`] containing a whole number of new line delimited records
-#[derive(Debug, Default)]
-struct LineDelimiter {
-    /// Complete chunks of [`Bytes`]
-    complete: VecDeque<Bytes>,
-    /// Remainder bytes that form the next record
-    remainder: Vec<u8>,
-    /// True if the last character was the escape character
-    is_escape: bool,
-    /// True if currently processing a quoted string
-    is_quote: bool,
-}
-
-impl LineDelimiter {
-    /// Creates a new [`LineDelimiter`] with the provided delimiter
-    fn new() -> Self {
-        Self::default()
-    }
-
-    /// Adds the next set of [`Bytes`]
-    fn push(&mut self, val: impl Into<Bytes>) {
-        let val: Bytes = val.into();
-
-        let is_escape = &mut self.is_escape;
-        let is_quote = &mut self.is_quote;
-        let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
-            if *is_escape {
-                *is_escape = false;
-                None
-            } else if *v == ESCAPE {
-                *is_escape = true;
-                None
-            } else if *v == QUOTE {
-                *is_quote = !*is_quote;
-                None
-            } else if *is_quote {
-                None
-            } else {
-                (*v == NEWLINE).then_some(idx + 1)
-            }
-        });
-
-        let start_offset = match self.remainder.is_empty() {
-            true => 0,
-            false => match record_ends.next() {
-                Some(idx) => {
-                    self.remainder.extend_from_slice(&val[0..idx]);
-                    self.complete
-                        .push_back(Bytes::from(std::mem::take(&mut 
self.remainder)));
-                    idx
-                }
-                None => {
-                    self.remainder.extend_from_slice(&val);
-                    return;
-                }
-            },
-        };
-        let end_offset = record_ends.last().unwrap_or(start_offset);
-        if start_offset != end_offset {
-            self.complete.push_back(val.slice(start_offset..end_offset));
-        }
-
-        if end_offset != val.len() {
-            self.remainder.extend_from_slice(&val[end_offset..])
-        }
-    }
-
-    /// Marks the end of the stream, delimiting any remaining bytes
-    ///
-    /// Returns `true` if there is no remaining data to be read
-    fn finish(&mut self) -> Result<bool> {
-        if !self.remainder.is_empty() {
-            if self.is_quote {
-                return Err(DataFusionError::Execution(
-                    "encountered unterminated string".to_string(),
-                ));
-            }
-
-            if self.is_escape {
-                return Err(DataFusionError::Execution(
-                    "encountered trailing escape character".to_string(),
-                ));
-            }
-
-            self.complete
-                .push_back(Bytes::from(std::mem::take(&mut self.remainder)))
-        }
-        Ok(self.complete.is_empty())
-    }
-}
-
-impl Iterator for LineDelimiter {
-    type Item = Bytes;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.complete.pop_front()
-    }
-}
-
-/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
-/// yielded [`Bytes`] contains a whole number of new line delimited records
-/// accounting for `\` style escapes and `"` quotes
-pub fn newline_delimited_stream<S>(s: S) -> impl Stream<Item = Result<Bytes>>
-where
-    S: Stream<Item = Result<Bytes>> + Unpin,
-{
-    let delimiter = LineDelimiter::new();
-
-    futures::stream::unfold(
-        (s, delimiter, false),
-        |(mut s, mut delimiter, mut exhausted)| async move {
-            loop {
-                if let Some(next) = delimiter.next() {
-                    return Some((Ok(next), (s, delimiter, exhausted)));
-                } else if exhausted {
-                    return None;
-                }
-
-                match s.next().await {
-                    Some(Ok(bytes)) => delimiter.push(bytes),
-                    Some(Err(e)) => return Some((Err(e), (s, delimiter, 
exhausted))),
-                    None => {
-                        exhausted = true;
-                        match delimiter.finish() {
-                            Ok(true) => return None,
-                            Ok(false) => continue,
-                            Err(e) => return Some((Err(e), (s, delimiter, 
exhausted))),
-                        }
-                    }
-                }
-            }
-        },
-    )
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use futures::stream::{BoxStream, TryStreamExt};
-
-    #[test]
-    fn test_delimiter() {
-        let mut delimiter = LineDelimiter::new();
-        delimiter.push("hello\nworld");
-        delimiter.push("\n\n");
-
-        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n"));
-        assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n"));
-        assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
-        assert!(delimiter.next().is_none());
-    }
-
-    #[test]
-    fn test_delimiter_escaped() {
-        let mut delimiter = LineDelimiter::new();
-        delimiter.push("");
-        delimiter.push("fo\\\n\"foo");
-        delimiter.push("bo\n\"bar\n");
-        delimiter.push("\"he");
-        delimiter.push("llo\"\n");
-        assert_eq!(
-            delimiter.next().unwrap(),
-            Bytes::from("fo\\\n\"foobo\n\"bar\n")
-        );
-        assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n"));
-        assert!(delimiter.next().is_none());
-
-        // Verify can push further data
-        delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello");
-        assert!(!delimiter.finish().unwrap());
-
-        assert_eq!(
-            delimiter.next().unwrap(),
-            Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n")
-        );
-        assert_eq!(delimiter.next().unwrap(), Bytes::from("hello"));
-        assert!(delimiter.finish().unwrap());
-        assert!(delimiter.next().is_none());
-    }
-
-    #[tokio::test]
-    async fn test_delimiter_stream() {
-        let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
-        let input_stream =
-            futures::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
-        let stream = newline_delimited_stream(input_stream);
-
-        let results: Vec<_> = stream.try_collect().await.unwrap();
-        assert_eq!(
-            results,
-            vec![
-                Bytes::from("hello\nworld\n"),
-                Bytes::from("bingo\n"),
-                Bytes::from("cupcakes")
-            ]
-        )
-    }
-    #[tokio::test]
-    async fn test_delimiter_unfold_stream() {
-        let input_stream: BoxStream<'static, Result<Bytes>> = 
futures::stream::unfold(
-            VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
-            |mut input| async move {
-                if !input.is_empty() {
-                    Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
-                } else {
-                    None
-                }
-            },
-        )
-        .boxed();
-        let stream = newline_delimited_stream(input_stream);
-
-        let results: Vec<_> = stream.try_collect().await.unwrap();
-        assert_eq!(
-            results,
-            vec![
-                Bytes::from("hello\nworld\n"),
-                Bytes::from("bingo\n"),
-                Bytes::from("cupcakes")
-            ]
-        )
-    }
-}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 60c1fd2e3..97b091ced 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -21,14 +21,12 @@ mod avro;
 #[cfg(test)]
 mod chunked_store;
 mod csv;
-mod delimited_stream;
 mod file_stream;
 mod json;
 mod parquet;
 
 pub(crate) use self::csv::plan_to_csv;
 pub use self::csv::CsvExec;
-pub(crate) use self::delimited_stream::newline_delimited_stream;
 pub(crate) use self::parquet::plan_to_parquet;
 pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
 use arrow::{
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index 82b9ead03..7a6529c25 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -45,7 +45,7 @@ chrono = { version = "0.4", default-features = false }
 datafusion = { path = "../core", version = "18.0.0" }
 datafusion-common = { path = "../common", version = "18.0.0" }
 datafusion-expr = { path = "../expr", version = "18.0.0" }
-object_store = { version = "0.5.0" }
+object_store = { version = "0.5.4" }
 parking_lot = { version = "0.12" }
 pbjson = { version = "0.5", optional = true }
 pbjson-types = { version = "0.5", optional = true }
diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml
index 3d4a72901..02108067c 100644
--- a/datafusion/substrait/Cargo.toml
+++ b/datafusion/substrait/Cargo.toml
@@ -28,7 +28,7 @@ async-recursion = "1.0"
 chrono = "0.4.23"
 datafusion = { version = "18.0.0", path = "../core" }
 itertools = "0.10.5"
-object_store = "0.5.3"
+object_store = "0.5.4"
 prost = "0.11"
 prost-types = "0.11"
 substrait = "0.4"
diff --git a/parquet-test-utils/Cargo.toml b/parquet-test-utils/Cargo.toml
index a4d72a4b7..c63af8362 100644
--- a/parquet-test-utils/Cargo.toml
+++ b/parquet-test-utils/Cargo.toml
@@ -24,5 +24,5 @@ edition = "2021"
 
 [dependencies]
 datafusion = { path = "../datafusion/core" }
-object_store = "0.5.0"
+object_store = "0.5.4"
 parquet = "32.0.0"

Reply via email to