This is an automated email from the ASF dual-hosted git repository. tustvold pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new bb699eb77 Update to arrow 32 and Switch to RawDecoder for JSON (#5056) bb699eb77 is described below commit bb699eb77a1b8423f0098cdb6e9d9bdbc67eb3b2 Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com> AuthorDate: Tue Jan 31 18:06:51 2023 +0000 Update to arrow 32 and Switch to RawDecoder for JSON (#5056) * Update to arrow 32 Switch to RawDecoder for JSON * Fix avro * Update datafusion-cli * Fix arrow-flight * Update arrow * Use CSV Decoder * Fix avro * Simplify error handling * Explicit error conversions * Update arrow 23 * Fix CSV * Reivew feedback --- benchmarks/Cargo.toml | 4 +- datafusion-cli/Cargo.lock | 82 ++++++++++-------- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 4 +- datafusion/common/Cargo.toml | 6 +- datafusion/common/src/error.rs | 33 ++------ datafusion/common/src/scalar.rs | 3 +- datafusion/core/Cargo.toml | 4 +- datafusion/core/src/avro_to_arrow/schema.rs | 3 + .../core/src/physical_plan/file_format/csv.rs | 96 ++++++++++++++++++---- .../core/src/physical_plan/file_format/json.rs | 88 ++++++++++++-------- .../core/src/physical_plan/file_format/mod.rs | 1 + .../file_format/parquet/page_filter.rs | 12 ++- datafusion/expr/Cargo.toml | 2 +- datafusion/jit/Cargo.toml | 2 +- datafusion/optimizer/Cargo.toml | 2 +- datafusion/physical-expr/Cargo.toml | 6 +- datafusion/physical-expr/src/regex_expressions.rs | 8 +- datafusion/proto/Cargo.toml | 2 +- datafusion/proto/src/logical_plan/to_proto.rs | 5 ++ datafusion/row/Cargo.toml | 2 +- datafusion/sql/Cargo.toml | 2 +- parquet-test-utils/Cargo.toml | 2 +- test-utils/Cargo.toml | 2 +- 24 files changed, 232 insertions(+), 141 deletions(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 23c1d32eb..8f51bbec8 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,14 +33,14 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] -arrow = "31.0.0" +arrow = "32.0.0" datafusion = { path = "../datafusion/core", version = "17.0.0", features = ["scheduler"] } env_logger = "0.10" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" object_store = "0.5.0" -parquet = "31.0.0" +parquet = "32.0.0" parquet-test-utils = { path = "../parquet-test-utils/", version = "0.1.0" } rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index f706e2e28..c87abeb21 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -68,9 +68,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945" +checksum = "87d948f553cf556656eb89265700258e1032d26fec9b7920cd20319336e06afd" dependencies = [ "ahash", "arrow-arith", @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068" +checksum = "cf30d4ebc3df9dfd8bd26883aa30687d4ddcfd7b2443e62bd7c8fedf153b8e45" dependencies = [ "arrow-array", "arrow-buffer", @@ -106,9 +106,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1" +checksum = "9fe66ec388d882a61fff3eb613b5266af133aa08a3318e5e493daf0f5c1696cb" dependencies = [ "ahash", "arrow-buffer", @@ -122,9 +122,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748" +checksum = "4ef967dadbccd4586ec8d7aab27d7033ecb5dfae8a605c839613039eac227bda" dependencies = [ "half", "num", @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04" +checksum = "491a7979ea9e76dc218f532896e2d245fde5235e2e6420ce80d27cf6395dda84" dependencies = [ "arrow-array", "arrow-buffer", @@ -148,9 +148,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448" +checksum = "4b1d4fc91078dbe843c2c50d90f8119c96e8dfac2f78d30f7a8cb9397399c61d" dependencies = [ "arrow-array", "arrow-buffer", @@ -167,9 +167,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477" +checksum = "ee0c0e3c5d3b80be8f267f4b2af714c08cad630569be01a8379cfe27b4866495" dependencies = [ "arrow-buffer", "arrow-schema", @@ -179,9 +179,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190" +checksum = "0a3ca7eb8d23c83fe40805cbafec70a6a31df72de47355545ff34c850f715403" dependencies = [ "arrow-array", "arrow-buffer", @@ -193,9 +193,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a" +checksum = "bf65aff76d2e340d827d5cab14759e7dd90891a288347e2202e4ee28453d9bed" dependencies = [ "arrow-array", "arrow-buffer", @@ -205,15 +205,16 @@ dependencies = [ "chrono", "half", "indexmap", + "lexical-core", "num", "serde_json", ] [[package]] name = "arrow-ord" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3" +checksum = "074a5a55c37ae4750af4811c8861c0378d8ab2ff6c262622ad24efae6e0b73b3" dependencies = [ "arrow-array", "arrow-buffer", @@ -225,9 +226,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b" +checksum = "e064ac4e64960ebfbe35f218f5e7d9dc9803b59c2e56f611da28ce6d008f839e" dependencies = [ "ahash", "arrow-array", @@ -240,15 +241,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723" +checksum = "ead3f373b9173af52f2fdefcb5a7dd89f453fbc40056f574a8aeb23382a4ef81" [[package]] name = "arrow-select" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c" +checksum = "646b4f15b5a77c970059e748aeb1539705c68cd397ecf0f0264c4ef3737d35f3" dependencies = [ "arrow-array", "arrow-buffer", @@ -259,9 +260,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8" +checksum = "c8b8bf150caaeca03f39f1a91069701387d93f7cfd256d27f423ac8496d99a51" dependencies = [ "arrow-array", "arrow-buffer", @@ -986,12 +987,12 @@ dependencies = [ [[package]] name = "flatbuffers" -version = "22.9.29" +version = "23.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70" +checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" dependencies = [ "bitflags", - "thiserror", + "rustc_version", ] [[package]] @@ -1783,9 +1784,9 @@ dependencies = [ [[package]] name = "parquet" -version = "31.0.0" +version = "32.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45" +checksum = "23b3d4917209e17e1da5fb07d276da237a42465f0def2b8d5fa5ce0e85855b4c" dependencies = [ "ahash", "arrow-array", @@ -2055,6 +2056,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.36.6" @@ -2156,6 +2166,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "semver" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" + [[package]] name = "seq-macro" version = "0.3.2" diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 12fd7fea0..1459f5766 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.62" readme = "README.md" [dependencies] -arrow = "31.0.0" +arrow = "32.0.0" async-trait = "0.1.41" clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "17.0.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index e6713bb9a..a8ed249c5 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,8 +34,8 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow = "31.0.0" -arrow-flight = "31.0.0" +arrow = "32.0.0" +arrow-flight = "32.0.0" async-trait = "0.1.41" datafusion = { path = "../datafusion/core" } datafusion-common = { path = "../datafusion/common" } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d75747f80..bfe90b60c 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -40,11 +40,11 @@ pyarrow = ["pyo3", "arrow/pyarrow"] [dependencies] apache-avro = { version = "0.14", default-features = false, features = ["snappy"], optional = true } -arrow = { version = "31.0.0", default-features = false } +arrow = { version = "32.0.0", default-features = false } chrono = { version = "0.4", default-features = false } cranelift-module = { version = "0.92.0", optional = true } num_cpus = "1.13.0" object_store = { version = "0.5.0", default-features = false, optional = true } -parquet = { version = "31.0.0", default-features = false, optional = true } -pyo3 = { version = "0.17.1", optional = true } +parquet = { version = "32.0.0", default-features = false, optional = true } +pyo3 = { version = "0.18.0", optional = true } sqlparser = "0.30" diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index f27900ad3..3f10c1261 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -388,12 +388,16 @@ impl DataFusionError { let mut last_datafusion_error = self; let mut root_error: &dyn Error = self; - while let Some(source) = find_source(root_error) { + while let Some(source) = root_error.source() { // walk the next level root_error = source; // remember the lowest datafusion error so far if let Some(e) = root_error.downcast_ref::<DataFusionError>() { last_datafusion_error = e; + } else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() { + // As `Arc<T>::source()` calls through to `T::source()` we need to + // explicitly match `Arc<DataFusionError>` to capture it + last_datafusion_error = e.as_ref(); } } // return last checkpoint (which may be the original error) @@ -401,33 +405,6 @@ impl DataFusionError { } } -fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> { - // workaround until https://github.com/apache/arrow-rs/issues/3566 is released - if let Some(e) = e.downcast_ref::<ArrowError>() { - return if let ArrowError::ExternalError(e) = e { - Some(e.as_ref()) - } else { - None - }; - } - // some errors are wrapped into `Arc`s to share them with multiple - // receivers, so handle that specially here - if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() { - return Some(e.as_ref()); - } - - // For some reason the above doesn't capture works for - // Arc<DataFusionError> or Arc<ArrowError> - if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() { - return Some(e.as_ref()); - } - if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() { - return Some(e.as_ref()); - } - - e.source() -} - #[cfg(test)] mod test { use std::sync::Arc; diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 33dd58386..905661541 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1592,7 +1592,8 @@ impl ScalarValue { | DataType::Interval(_) | DataType::LargeList(_) | DataType::Union(_, _, _) - | DataType::Map(_, _) => { + | DataType::Map(_, _) + | DataType::RunEndEncoded(_, _) => { return Err(DataFusionError::Internal(format!( "Unsupported creation of {:?} array from ScalarValue {:?}", data_type, diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 07999e10a..6b59e48cc 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -60,7 +60,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } -arrow = { version = "31.0.0", features = ["prettyprint"] } +arrow = { version = "32.0.0", features = ["prettyprint"] } async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "futures-io", "tokio"], optional = true } async-trait = "0.1.41" bytes = "1.1" @@ -86,7 +86,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" object_store = "0.5.3" parking_lot = "0.12" -parquet = { version = "31.0.0", features = ["arrow", "async"] } +parquet = { version = "32.0.0", features = ["arrow", "async"] } paste = "^1.0" percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" diff --git a/datafusion/core/src/avro_to_arrow/schema.rs b/datafusion/core/src/avro_to_arrow/schema.rs index 1af552438..a3d5986da 100644 --- a/datafusion/core/src/avro_to_arrow/schema.rs +++ b/datafusion/core/src/avro_to_arrow/schema.rs @@ -217,6 +217,9 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Union(_, _, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), + DataType::RunEndEncoded(_, _) => { + unimplemented!("RunEndEncoded support not implemented") + } DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index e6bedbbb9..e0654f69b 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -21,7 +21,6 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -35,12 +34,15 @@ use arrow::datatypes::SchemaRef; use bytes::Buf; +use bytes::Bytes; +use futures::ready; use futures::{StreamExt, TryStreamExt}; use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; use std::path::Path; use std::sync::Arc; +use std::task::Poll; use tokio::task::{self, JoinHandle}; use super::{get_output_ordering, FileScanConfig}; @@ -196,12 +198,12 @@ struct CsvConfig { } impl CsvConfig { - fn open<R: std::io::Read>(&self, reader: R, first_chunk: bool) -> csv::Reader<R> { + fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> { let datetime_format = None; csv::Reader::new( reader, Arc::clone(&self.file_schema), - self.has_header && first_chunk, + self.has_header, Some(self.delimiter), self.batch_size, None, @@ -209,6 +211,20 @@ impl CsvConfig { datetime_format, ) } + + fn builder(&self) -> csv::ReaderBuilder { + let mut builder = csv::ReaderBuilder::new() + .with_schema(self.file_schema.clone()) + .with_delimiter(self.delimiter) + .with_batch_size(self.batch_size) + .has_header(self.has_header); + + if let Some(proj) = &self.file_projection { + builder = builder.with_projection(proj.clone()); + } + + builder + } } struct CsvOpener { @@ -224,20 +240,38 @@ impl FileOpener for CsvOpener { match config.object_store.get(file_meta.location()).await? { GetResult::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; - Ok(futures::stream::iter(config.open(decoder, true)).boxed()) + Ok(futures::stream::iter(config.open(decoder)).boxed()) } GetResult::Stream(s) => { - let mut first_chunk = true; - let s = s.map_err(Into::<DataFusionError>::into); - let decoder = file_compression_type.convert_stream(s)?; - Ok(newline_delimited_stream(decoder) - .map_ok(move |bytes| { - let reader = config.open(bytes.reader(), first_chunk); - first_chunk = false; - futures::stream::iter(reader) - }) - .try_flatten() - .boxed()) + let mut decoder = config.builder().build_decoder(); + let s = s.map_err(DataFusionError::from); + let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut buffered = Bytes::new(); + + let s = futures::stream::poll_fn(move |cx| { + loop { + if buffered.is_empty() { + match ready!(input.poll_next_unpin(cx)) { + Some(Ok(b)) => buffered = b, + Some(Err(e)) => { + return Poll::Ready(Some(Err(e.into()))) + } + None => {} + }; + } + let decoded = match decoder.decode(buffered.as_ref()) { + // Note: the decoder needs to be called with an empty + // array to delimt the final record + Ok(0) => break, + Ok(decoded) => decoded, + Err(e) => return Poll::Ready(Some(Err(e))), + }; + buffered.advance(decoded); + } + + Poll::Ready(decoder.flush().transpose()) + }); + Ok(s.boxed()) } } })) @@ -614,6 +648,38 @@ mod tests { .await; } + #[tokio::test] + async fn test_no_trailing_delimiter() { + let session_ctx = SessionContext::new(); + let store = object_store::memory::InMemory::new(); + + let data = bytes::Bytes::from("a,b\n1,2\n3,4"); + let path = object_store::path::Path::from("a.csv"); + store.put(&path, data).await.unwrap(); + + session_ctx + .runtime_env() + .register_object_store("memory", "", Arc::new(store)); + + let df = session_ctx + .read_csv("memory:///", CsvReadOptions::new()) + .await + .unwrap(); + + let result = df.collect().await.unwrap(); + + let expected = vec![ + "+---+---+", + "| a | b |", + "+---+---+", + "| 1 | 2 |", + "| 3 | 4 |", + "+---+---+", + ]; + + crate::assert_batches_eq!(expected, &result); + } + #[tokio::test] async fn write_csv_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 633da67e9..bab84ec6f 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -21,7 +21,6 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -30,17 +29,19 @@ use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; -use arrow::json::reader::DecoderOptions; use arrow::{datatypes::SchemaRef, json}; -use bytes::Buf; +use bytes::{Buf, Bytes}; -use futures::{StreamExt, TryStreamExt}; +use arrow::json::RawReaderBuilder; +use futures::{ready, stream, StreamExt, TryStreamExt}; use object_store::{GetResult, ObjectStore}; use std::any::Any; use std::fs; +use std::io::BufReader; use std::path::Path; use std::sync::Arc; +use std::task::Poll; use tokio::task::{self, JoinHandle}; use super::{get_output_ordering, FileScanConfig}; @@ -111,24 +112,15 @@ impl ExecutionPlan for NdJsonExec { partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { - let proj = self.base_config.projected_file_column_names(); - let batch_size = context.session_config().batch_size(); - let file_schema = Arc::clone(&self.base_config.file_schema); - - let options = DecoderOptions::new().with_batch_size(batch_size); - let options = if let Some(proj) = proj { - options.with_projection(proj) - } else { - options - }; + let (projected_schema, _) = self.base_config.project(); let object_store = context .runtime_env() .object_store(&self.base_config.object_store_url)?; let opener = JsonOpener { - file_schema, - options, + batch_size, + projected_schema, file_compression_type: self.file_compression_type.to_owned(), object_store, }; @@ -166,40 +158,64 @@ impl ExecutionPlan for NdJsonExec { } struct JsonOpener { - options: DecoderOptions, - file_schema: SchemaRef, + batch_size: usize, + projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc<dyn ObjectStore>, } impl FileOpener for JsonOpener { fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> { - let options = self.options.clone(); - let schema = self.file_schema.clone(); let store = self.object_store.clone(); + let schema = self.projected_schema.clone(); + let batch_size = self.batch_size; + let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file)?; - let reader = json::Reader::new(decoder, schema.clone(), options); + let bytes = file_compression_type.convert_read(file)?; + let reader = RawReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(bytes))?; Ok(futures::stream::iter(reader).boxed()) } GetResult::Stream(s) => { - let s = s.map_err(Into::into); - let decoder = file_compression_type.convert_stream(s)?; - - Ok(newline_delimited_stream(decoder) - .map_ok(move |bytes| { - let reader = json::Reader::new( - bytes.reader(), - schema.clone(), - options.clone(), - ); - futures::stream::iter(reader) - }) - .try_flatten() - .boxed()) + let mut decoder = RawReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + + let s = s.map_err(DataFusionError::from); + let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut buffered = Bytes::new(); + + let s = stream::poll_fn(move |cx| { + loop { + if buffered.is_empty() { + buffered = match ready!(input.poll_next_unpin(cx)) { + Some(Ok(b)) => b, + Some(Err(e)) => { + return Poll::Ready(Some(Err(e.into()))) + } + None => break, + }; + } + let read = buffered.len(); + + let decoded = match decoder.decode(buffered.as_ref()) { + Ok(decoded) => decoded, + Err(e) => return Poll::Ready(Some(Err(e))), + }; + + buffered.advance(decoded); + if decoded != read { + break; + } + } + + Poll::Ready(decoder.flush().transpose()) + }); + Ok(s.boxed()) } } })) diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 78828cbf8..a03b3681a 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -158,6 +158,7 @@ impl FileScanConfig { (table_schema, table_stats) } + #[allow(unused)] // Only used by avro fn projected_file_column_names(&self) -> Option<Vec<String>> { self.projection.as_ref().map(|p| { p.iter() diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 9d7f11462..62055ef16 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -385,7 +385,10 @@ macro_rules! get_min_max_values_for_page_index { let vec = &index.indexes; Decimal128Array::from( vec.iter() - .map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x)))) + .map(|x| { + x.$func() + .and_then(|x| Some(from_bytes_to_i128(x.as_ref()))) + }) .collect::<Vec<Option<i128>>>(), ) .with_precision_and_scale(*precision, *scale) @@ -397,7 +400,7 @@ macro_rules! get_min_max_values_for_page_index { let array: StringArray = vec .iter() .map(|x| x.$func()) - .map(|x| x.and_then(|x| std::str::from_utf8(x).ok())) + .map(|x| x.and_then(|x| std::str::from_utf8(x.as_ref()).ok())) .collect(); Some(Arc::new(array)) } @@ -411,7 +414,10 @@ macro_rules! get_min_max_values_for_page_index { let vec = &index.indexes; Decimal128Array::from( vec.iter() - .map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x)))) + .map(|x| { + x.$func() + .and_then(|x| Some(from_bytes_to_i128(x.as_ref()))) + }) .collect::<Vec<Option<i128>>>(), ) .with_precision_and_scale(*precision, *scale) diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index b669653d5..d3acef15e 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } -arrow = { version = "31.0.0", default-features = false } +arrow = { version = "32.0.0", default-features = false } datafusion-common = { path = "../common", version = "17.0.0" } log = "^0.4" sqlparser = "0.30" diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml index bb1d06e48..00cd0ced3 100644 --- a/datafusion/jit/Cargo.toml +++ b/datafusion/jit/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" jit = [] [dependencies] -arrow = { version = "31.0.0", default-features = false } +arrow = { version = "32.0.0", default-features = false } cranelift = "0.89.0" cranelift-jit = "0.89.0" cranelift-module = "0.89.0" diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 197fed6ea..dc6047706 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -37,7 +37,7 @@ default = ["unicode_expressions"] unicode_expressions = [] [dependencies] -arrow = { version = "31.0.0", features = ["prettyprint"] } +arrow = { version = "32.0.0", features = ["prettyprint"] } async-trait = "0.1.41" chrono = { version = "0.4.23", default-features = false } datafusion-common = { path = "../common", version = "17.0.0" } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 3274c934a..69c91a625 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -43,9 +43,9 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } -arrow = { version = "31.0.0", features = ["prettyprint"] } -arrow-buffer = "31.0.0" -arrow-schema = "31.0.0" +arrow = { version = "32.0.0", features = ["prettyprint"] } +arrow-buffer = "32.0.0" +arrow-schema = "32.0.0" blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4.23", default-features = false } diff --git a/datafusion/physical-expr/src/regex_expressions.rs b/datafusion/physical-expr/src/regex_expressions.rs index c5edf320f..ebc53e322 100644 --- a/datafusion/physical-expr/src/regex_expressions.rs +++ b/datafusion/physical-expr/src/regex_expressions.rs @@ -438,9 +438,9 @@ mod tests { #[test] fn test_static_pattern_regexp_replace_early_abort() { let values = StringArray::from(vec!["abc"; 5]); - let patterns = StringArray::from(vec![None; 5]); + let patterns = StringArray::from(vec![None::<&str>; 5]); let replacements = StringArray::from(vec!["foo"; 5]); - let expected = StringArray::from(vec![None; 5]); + let expected = StringArray::from(vec![None::<&str>; 5]); let re = _regexp_replace_static_pattern_replace::<i32>(&[ Arc::new(values), @@ -474,8 +474,8 @@ mod tests { let values = StringArray::from(vec!["abc"; 5]); let patterns = StringArray::from(vec!["a"; 5]); let replacements = StringArray::from(vec!["foo"; 5]); - let flags = StringArray::from(vec![None; 5]); - let expected = StringArray::from(vec![None; 5]); + let flags = StringArray::from(vec![None::<&str>; 5]); + let expected = StringArray::from(vec![None::<&str>; 5]); let re = _regexp_replace_static_pattern_replace::<i32>(&[ Arc::new(values), diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 951e92e87..fad152e4d 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -40,7 +40,7 @@ default = [] json = ["pbjson", "serde", "serde_json"] [dependencies] -arrow = "31.0.0" +arrow = "32.0.0" chrono = { version = "0.4", default-features = false } datafusion = { path = "../core", version = "17.0.0" } datafusion-common = { path = "../common", version = "17.0.0" } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index cd96cd97d..90c99c0d9 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -223,6 +223,11 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { "Proto serialization error: The Map data type is not yet supported".to_owned() )) } + DataType::RunEndEncoded(_, _) => { + return Err(Error::General( + "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned() + )) + } }; Ok(res) diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index 0ecbf0ad3..fd541de55 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -37,7 +37,7 @@ path = "src/lib.rs" jit = ["datafusion-jit"] [dependencies] -arrow = "31.0.0" +arrow = "32.0.0" datafusion-common = { path = "../common", version = "17.0.0" } datafusion-jit = { path = "../jit", version = "17.0.0", optional = true } paste = "^1.0" diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 182292fb4..39d1a47a1 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -37,7 +37,7 @@ default = ["unicode_expressions"] unicode_expressions = [] [dependencies] -arrow-schema = "31.0.0" +arrow-schema = "32.0.0" datafusion-common = { path = "../common", version = "17.0.0" } datafusion-expr = { path = "../expr", version = "17.0.0" } log = "^0.4" diff --git a/parquet-test-utils/Cargo.toml b/parquet-test-utils/Cargo.toml index 7f5a7f5ac..a4d72a4b7 100644 --- a/parquet-test-utils/Cargo.toml +++ b/parquet-test-utils/Cargo.toml @@ -25,4 +25,4 @@ edition = "2021" [dependencies] datafusion = { path = "../datafusion/core" } object_store = "0.5.0" -parquet = "31.0.0" +parquet = "32.0.0" diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 251bd95a4..2d591926b 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -23,7 +23,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "31.0.0", features = ["prettyprint"] } +arrow = { version = "32.0.0", features = ["prettyprint"] } datafusion-common = { path = "../datafusion/common" } env_logger = "0.10.0" rand = "0.8"