This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bb699eb77 Update to arrow 32 and Switch to RawDecoder for JSON (#5056)
bb699eb77 is described below

commit bb699eb77a1b8423f0098cdb6e9d9bdbc67eb3b2
Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com>
AuthorDate: Tue Jan 31 18:06:51 2023 +0000

    Update to arrow 32 and Switch to RawDecoder for JSON (#5056)
    
    * Update to arrow 32
    
    Switch to RawDecoder for JSON
    
    * Fix avro
    
    * Update datafusion-cli
    
    * Fix arrow-flight
    
    * Update arrow
    
    * Use CSV Decoder
    
    * Fix avro
    
    * Simplify error handling
    
    * Explicit error conversions
    
    * Update arrow 23
    
    * Fix CSV
    
    * Reivew feedback
---
 benchmarks/Cargo.toml                              |  4 +-
 datafusion-cli/Cargo.lock                          | 82 ++++++++++--------
 datafusion-cli/Cargo.toml                          |  2 +-
 datafusion-examples/Cargo.toml                     |  4 +-
 datafusion/common/Cargo.toml                       |  6 +-
 datafusion/common/src/error.rs                     | 33 ++------
 datafusion/common/src/scalar.rs                    |  3 +-
 datafusion/core/Cargo.toml                         |  4 +-
 datafusion/core/src/avro_to_arrow/schema.rs        |  3 +
 .../core/src/physical_plan/file_format/csv.rs      | 96 ++++++++++++++++++----
 .../core/src/physical_plan/file_format/json.rs     | 88 ++++++++++++--------
 .../core/src/physical_plan/file_format/mod.rs      |  1 +
 .../file_format/parquet/page_filter.rs             | 12 ++-
 datafusion/expr/Cargo.toml                         |  2 +-
 datafusion/jit/Cargo.toml                          |  2 +-
 datafusion/optimizer/Cargo.toml                    |  2 +-
 datafusion/physical-expr/Cargo.toml                |  6 +-
 datafusion/physical-expr/src/regex_expressions.rs  |  8 +-
 datafusion/proto/Cargo.toml                        |  2 +-
 datafusion/proto/src/logical_plan/to_proto.rs      |  5 ++
 datafusion/row/Cargo.toml                          |  2 +-
 datafusion/sql/Cargo.toml                          |  2 +-
 parquet-test-utils/Cargo.toml                      |  2 +-
 test-utils/Cargo.toml                              |  2 +-
 24 files changed, 232 insertions(+), 141 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 23c1d32eb..8f51bbec8 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,14 +33,14 @@ simd = ["datafusion/simd"]
 snmalloc = ["snmalloc-rs"]
 
 [dependencies]
-arrow = "31.0.0"
+arrow = "32.0.0"
 datafusion = { path = "../datafusion/core", version = "17.0.0", features = 
["scheduler"] }
 env_logger = "0.10"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
 object_store = "0.5.0"
-parquet = "31.0.0"
+parquet = "32.0.0"
 parquet-test-utils = { path = "../parquet-test-utils/", version = "0.1.0" }
 rand = "0.8.4"
 serde = { version = "1.0.136", features = ["derive"] }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index f706e2e28..c87abeb21 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -68,9 +68,9 @@ checksum = 
"8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
 
 [[package]]
 name = "arrow"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945"
+checksum = "87d948f553cf556656eb89265700258e1032d26fec9b7920cd20319336e06afd"
 dependencies = [
  "ahash",
  "arrow-arith",
@@ -91,9 +91,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-arith"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068"
+checksum = "cf30d4ebc3df9dfd8bd26883aa30687d4ddcfd7b2443e62bd7c8fedf153b8e45"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -106,9 +106,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-array"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1"
+checksum = "9fe66ec388d882a61fff3eb613b5266af133aa08a3318e5e493daf0f5c1696cb"
 dependencies = [
  "ahash",
  "arrow-buffer",
@@ -122,9 +122,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-buffer"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748"
+checksum = "4ef967dadbccd4586ec8d7aab27d7033ecb5dfae8a605c839613039eac227bda"
 dependencies = [
  "half",
  "num",
@@ -132,9 +132,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-cast"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04"
+checksum = "491a7979ea9e76dc218f532896e2d245fde5235e2e6420ce80d27cf6395dda84"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -148,9 +148,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-csv"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448"
+checksum = "4b1d4fc91078dbe843c2c50d90f8119c96e8dfac2f78d30f7a8cb9397399c61d"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -167,9 +167,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-data"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477"
+checksum = "ee0c0e3c5d3b80be8f267f4b2af714c08cad630569be01a8379cfe27b4866495"
 dependencies = [
  "arrow-buffer",
  "arrow-schema",
@@ -179,9 +179,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-ipc"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190"
+checksum = "0a3ca7eb8d23c83fe40805cbafec70a6a31df72de47355545ff34c850f715403"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -193,9 +193,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-json"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a"
+checksum = "bf65aff76d2e340d827d5cab14759e7dd90891a288347e2202e4ee28453d9bed"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -205,15 +205,16 @@ dependencies = [
  "chrono",
  "half",
  "indexmap",
+ "lexical-core",
  "num",
  "serde_json",
 ]
 
 [[package]]
 name = "arrow-ord"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3"
+checksum = "074a5a55c37ae4750af4811c8861c0378d8ab2ff6c262622ad24efae6e0b73b3"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -225,9 +226,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-row"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b"
+checksum = "e064ac4e64960ebfbe35f218f5e7d9dc9803b59c2e56f611da28ce6d008f839e"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -240,15 +241,15 @@ dependencies = [
 
 [[package]]
 name = "arrow-schema"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723"
+checksum = "ead3f373b9173af52f2fdefcb5a7dd89f453fbc40056f574a8aeb23382a4ef81"
 
 [[package]]
 name = "arrow-select"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c"
+checksum = "646b4f15b5a77c970059e748aeb1539705c68cd397ecf0f0264c4ef3737d35f3"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -259,9 +260,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-string"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8"
+checksum = "c8b8bf150caaeca03f39f1a91069701387d93f7cfd256d27f423ac8496d99a51"
 dependencies = [
  "arrow-array",
  "arrow-buffer",
@@ -986,12 +987,12 @@ dependencies = [
 
 [[package]]
 name = "flatbuffers"
-version = "22.9.29"
+version = "23.1.21"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70"
+checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619"
 dependencies = [
  "bitflags",
- "thiserror",
+ "rustc_version",
 ]
 
 [[package]]
@@ -1783,9 +1784,9 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "31.0.0"
+version = "32.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45"
+checksum = "23b3d4917209e17e1da5fb07d276da237a42465f0def2b8d5fa5ce0e85855b4c"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -2055,6 +2056,15 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
 [[package]]
 name = "rustix"
 version = "0.36.6"
@@ -2156,6 +2166,12 @@ dependencies = [
  "untrusted",
 ]
 
+[[package]]
+name = "semver"
+version = "1.0.16"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
+
 [[package]]
 name = "seq-macro"
 version = "0.3.2"
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 12fd7fea0..1459f5766 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -29,7 +29,7 @@ rust-version = "1.62"
 readme = "README.md"
 
 [dependencies]
-arrow = "31.0.0"
+arrow = "32.0.0"
 async-trait = "0.1.41"
 clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { path = "../datafusion/core", version = "17.0.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index e6713bb9a..a8ed249c5 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -34,8 +34,8 @@ path = "examples/avro_sql.rs"
 required-features = ["datafusion/avro"]
 
 [dev-dependencies]
-arrow = "31.0.0"
-arrow-flight = "31.0.0"
+arrow = "32.0.0"
+arrow-flight = "32.0.0"
 async-trait = "0.1.41"
 datafusion = { path = "../datafusion/core" }
 datafusion-common = { path = "../datafusion/common" }
diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index d75747f80..bfe90b60c 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -40,11 +40,11 @@ pyarrow = ["pyo3", "arrow/pyarrow"]
 
 [dependencies]
 apache-avro = { version = "0.14", default-features = false, features = 
["snappy"], optional = true }
-arrow = { version = "31.0.0", default-features = false }
+arrow = { version = "32.0.0", default-features = false }
 chrono = { version = "0.4", default-features = false }
 cranelift-module = { version = "0.92.0", optional = true }
 num_cpus = "1.13.0"
 object_store = { version = "0.5.0", default-features = false, optional = true }
-parquet = { version = "31.0.0", default-features = false, optional = true }
-pyo3 = { version = "0.17.1", optional = true }
+parquet = { version = "32.0.0", default-features = false, optional = true }
+pyo3 = { version = "0.18.0", optional = true }
 sqlparser = "0.30"
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index f27900ad3..3f10c1261 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -388,12 +388,16 @@ impl DataFusionError {
 
         let mut last_datafusion_error = self;
         let mut root_error: &dyn Error = self;
-        while let Some(source) = find_source(root_error) {
+        while let Some(source) = root_error.source() {
             // walk the next level
             root_error = source;
             // remember the lowest datafusion error so far
             if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
                 last_datafusion_error = e;
+            } else if let Some(e) = 
root_error.downcast_ref::<Arc<DataFusionError>>() {
+                // As `Arc<T>::source()` calls through to `T::source()` we 
need to
+                // explicitly match `Arc<DataFusionError>` to capture it
+                last_datafusion_error = e.as_ref();
             }
         }
         // return last checkpoint (which may be the original error)
@@ -401,33 +405,6 @@ impl DataFusionError {
     }
 }
 
-fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 
'static)> {
-    // workaround until https://github.com/apache/arrow-rs/issues/3566 is 
released
-    if let Some(e) = e.downcast_ref::<ArrowError>() {
-        return if let ArrowError::ExternalError(e) = e {
-            Some(e.as_ref())
-        } else {
-            None
-        };
-    }
-    // some errors are wrapped into `Arc`s to share them with multiple
-    // receivers, so handle that specially here
-    if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() {
-        return Some(e.as_ref());
-    }
-
-    // For some reason the above doesn't capture works for
-    // Arc<DataFusionError> or Arc<ArrowError>
-    if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() {
-        return Some(e.as_ref());
-    }
-    if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() {
-        return Some(e.as_ref());
-    }
-
-    e.source()
-}
-
 #[cfg(test)]
 mod test {
     use std::sync::Arc;
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 33dd58386..905661541 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1592,7 +1592,8 @@ impl ScalarValue {
             | DataType::Interval(_)
             | DataType::LargeList(_)
             | DataType::Union(_, _, _)
-            | DataType::Map(_, _) => {
+            | DataType::Map(_, _)
+            | DataType::RunEndEncoded(_, _) => {
                 return Err(DataFusionError::Internal(format!(
                     "Unsupported creation of {:?} array from ScalarValue {:?}",
                     data_type,
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 07999e10a..6b59e48cc 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -60,7 +60,7 @@ unicode_expressions = 
["datafusion-physical-expr/regex_expressions", "datafusion
 [dependencies]
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
 apache-avro = { version = "0.14", optional = true }
-arrow = { version = "31.0.0", features = ["prettyprint"] }
+arrow = { version = "32.0.0", features = ["prettyprint"] }
 async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", 
"futures-io", "tokio"], optional = true }
 async-trait = "0.1.41"
 bytes = "1.1"
@@ -86,7 +86,7 @@ num-traits = { version = "0.2", optional = true }
 num_cpus = "1.13.0"
 object_store = "0.5.3"
 parking_lot = "0.12"
-parquet = { version = "31.0.0", features = ["arrow", "async"] }
+parquet = { version = "32.0.0", features = ["arrow", "async"] }
 paste = "^1.0"
 percent-encoding = "2.2.0"
 pin-project-lite = "^0.2.7"
diff --git a/datafusion/core/src/avro_to_arrow/schema.rs 
b/datafusion/core/src/avro_to_arrow/schema.rs
index 1af552438..a3d5986da 100644
--- a/datafusion/core/src/avro_to_arrow/schema.rs
+++ b/datafusion/core/src/avro_to_arrow/schema.rs
@@ -217,6 +217,9 @@ fn default_field_name(dt: &DataType) -> &str {
         DataType::Union(_, _, _) => "union",
         DataType::Dictionary(_, _) => "map",
         DataType::Map(_, _) => unimplemented!("Map support not implemented"),
+        DataType::RunEndEncoded(_, _) => {
+            unimplemented!("RunEndEncoded support not implemented")
+        }
         DataType::Decimal128(_, _) => "decimal",
         DataType::Decimal256(_, _) => "decimal",
     }
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index e6bedbbb9..e0654f69b 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -21,7 +21,6 @@ use 
crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::{SessionState, TaskContext};
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use 
crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
@@ -35,12 +34,15 @@ use arrow::datatypes::SchemaRef;
 
 use bytes::Buf;
 
+use bytes::Bytes;
+use futures::ready;
 use futures::{StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
 use std::fs;
 use std::path::Path;
 use std::sync::Arc;
+use std::task::Poll;
 use tokio::task::{self, JoinHandle};
 
 use super::{get_output_ordering, FileScanConfig};
@@ -196,12 +198,12 @@ struct CsvConfig {
 }
 
 impl CsvConfig {
-    fn open<R: std::io::Read>(&self, reader: R, first_chunk: bool) -> 
csv::Reader<R> {
+    fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
         let datetime_format = None;
         csv::Reader::new(
             reader,
             Arc::clone(&self.file_schema),
-            self.has_header && first_chunk,
+            self.has_header,
             Some(self.delimiter),
             self.batch_size,
             None,
@@ -209,6 +211,20 @@ impl CsvConfig {
             datetime_format,
         )
     }
+
+    fn builder(&self) -> csv::ReaderBuilder {
+        let mut builder = csv::ReaderBuilder::new()
+            .with_schema(self.file_schema.clone())
+            .with_delimiter(self.delimiter)
+            .with_batch_size(self.batch_size)
+            .has_header(self.has_header);
+
+        if let Some(proj) = &self.file_projection {
+            builder = builder.with_projection(proj.clone());
+        }
+
+        builder
+    }
 }
 
 struct CsvOpener {
@@ -224,20 +240,38 @@ impl FileOpener for CsvOpener {
             match config.object_store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
-                    Ok(futures::stream::iter(config.open(decoder, 
true)).boxed())
+                    Ok(futures::stream::iter(config.open(decoder)).boxed())
                 }
                 GetResult::Stream(s) => {
-                    let mut first_chunk = true;
-                    let s = s.map_err(Into::<DataFusionError>::into);
-                    let decoder = file_compression_type.convert_stream(s)?;
-                    Ok(newline_delimited_stream(decoder)
-                        .map_ok(move |bytes| {
-                            let reader = config.open(bytes.reader(), 
first_chunk);
-                            first_chunk = false;
-                            futures::stream::iter(reader)
-                        })
-                        .try_flatten()
-                        .boxed())
+                    let mut decoder = config.builder().build_decoder();
+                    let s = s.map_err(DataFusionError::from);
+                    let mut input = 
file_compression_type.convert_stream(s)?.fuse();
+                    let mut buffered = Bytes::new();
+
+                    let s = futures::stream::poll_fn(move |cx| {
+                        loop {
+                            if buffered.is_empty() {
+                                match ready!(input.poll_next_unpin(cx)) {
+                                    Some(Ok(b)) => buffered = b,
+                                    Some(Err(e)) => {
+                                        return Poll::Ready(Some(Err(e.into())))
+                                    }
+                                    None => {}
+                                };
+                            }
+                            let decoded = match 
decoder.decode(buffered.as_ref()) {
+                                // Note: the decoder needs to be called with 
an empty
+                                // array to delimt the final record
+                                Ok(0) => break,
+                                Ok(decoded) => decoded,
+                                Err(e) => return Poll::Ready(Some(Err(e))),
+                            };
+                            buffered.advance(decoded);
+                        }
+
+                        Poll::Ready(decoder.flush().transpose())
+                    });
+                    Ok(s.boxed())
                 }
             }
         }))
@@ -614,6 +648,38 @@ mod tests {
         .await;
     }
 
+    #[tokio::test]
+    async fn test_no_trailing_delimiter() {
+        let session_ctx = SessionContext::new();
+        let store = object_store::memory::InMemory::new();
+
+        let data = bytes::Bytes::from("a,b\n1,2\n3,4");
+        let path = object_store::path::Path::from("a.csv");
+        store.put(&path, data).await.unwrap();
+
+        session_ctx
+            .runtime_env()
+            .register_object_store("memory", "", Arc::new(store));
+
+        let df = session_ctx
+            .read_csv("memory:///", CsvReadOptions::new())
+            .await
+            .unwrap();
+
+        let result = df.collect().await.unwrap();
+
+        let expected = vec![
+            "+---+---+",
+            "| a | b |",
+            "+---+---+",
+            "| 1 | 2 |",
+            "| 3 | 4 |",
+            "+---+---+",
+        ];
+
+        crate::assert_batches_eq!(expected, &result);
+    }
+
     #[tokio::test]
     async fn write_csv_results_error_handling() -> Result<()> {
         let ctx = SessionContext::new();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 633da67e9..bab84ec6f 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -21,7 +21,6 @@ use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use 
crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
@@ -30,17 +29,19 @@ use 
crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
 };
-use arrow::json::reader::DecoderOptions;
 use arrow::{datatypes::SchemaRef, json};
 
-use bytes::Buf;
+use bytes::{Buf, Bytes};
 
-use futures::{StreamExt, TryStreamExt};
+use arrow::json::RawReaderBuilder;
+use futures::{ready, stream, StreamExt, TryStreamExt};
 use object_store::{GetResult, ObjectStore};
 use std::any::Any;
 use std::fs;
+use std::io::BufReader;
 use std::path::Path;
 use std::sync::Arc;
+use std::task::Poll;
 use tokio::task::{self, JoinHandle};
 
 use super::{get_output_ordering, FileScanConfig};
@@ -111,24 +112,15 @@ impl ExecutionPlan for NdJsonExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let proj = self.base_config.projected_file_column_names();
-
         let batch_size = context.session_config().batch_size();
-        let file_schema = Arc::clone(&self.base_config.file_schema);
-
-        let options = DecoderOptions::new().with_batch_size(batch_size);
-        let options = if let Some(proj) = proj {
-            options.with_projection(proj)
-        } else {
-            options
-        };
+        let (projected_schema, _) = self.base_config.project();
 
         let object_store = context
             .runtime_env()
             .object_store(&self.base_config.object_store_url)?;
         let opener = JsonOpener {
-            file_schema,
-            options,
+            batch_size,
+            projected_schema,
             file_compression_type: self.file_compression_type.to_owned(),
             object_store,
         };
@@ -166,40 +158,64 @@ impl ExecutionPlan for NdJsonExec {
 }
 
 struct JsonOpener {
-    options: DecoderOptions,
-    file_schema: SchemaRef,
+    batch_size: usize,
+    projected_schema: SchemaRef,
     file_compression_type: FileCompressionType,
     object_store: Arc<dyn ObjectStore>,
 }
 
 impl FileOpener for JsonOpener {
     fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
-        let options = self.options.clone();
-        let schema = self.file_schema.clone();
         let store = self.object_store.clone();
+        let schema = self.projected_schema.clone();
+        let batch_size = self.batch_size;
+
         let file_compression_type = self.file_compression_type.to_owned();
         Ok(Box::pin(async move {
             match store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
-                    let decoder = file_compression_type.convert_read(file)?;
-                    let reader = json::Reader::new(decoder, schema.clone(), 
options);
+                    let bytes = file_compression_type.convert_read(file)?;
+                    let reader = RawReaderBuilder::new(schema)
+                        .with_batch_size(batch_size)
+                        .build(BufReader::new(bytes))?;
                     Ok(futures::stream::iter(reader).boxed())
                 }
                 GetResult::Stream(s) => {
-                    let s = s.map_err(Into::into);
-                    let decoder = file_compression_type.convert_stream(s)?;
-
-                    Ok(newline_delimited_stream(decoder)
-                        .map_ok(move |bytes| {
-                            let reader = json::Reader::new(
-                                bytes.reader(),
-                                schema.clone(),
-                                options.clone(),
-                            );
-                            futures::stream::iter(reader)
-                        })
-                        .try_flatten()
-                        .boxed())
+                    let mut decoder = RawReaderBuilder::new(schema)
+                        .with_batch_size(batch_size)
+                        .build_decoder()?;
+
+                    let s = s.map_err(DataFusionError::from);
+                    let mut input = 
file_compression_type.convert_stream(s)?.fuse();
+                    let mut buffered = Bytes::new();
+
+                    let s = stream::poll_fn(move |cx| {
+                        loop {
+                            if buffered.is_empty() {
+                                buffered = match 
ready!(input.poll_next_unpin(cx)) {
+                                    Some(Ok(b)) => b,
+                                    Some(Err(e)) => {
+                                        return Poll::Ready(Some(Err(e.into())))
+                                    }
+                                    None => break,
+                                };
+                            }
+                            let read = buffered.len();
+
+                            let decoded = match 
decoder.decode(buffered.as_ref()) {
+                                Ok(decoded) => decoded,
+                                Err(e) => return Poll::Ready(Some(Err(e))),
+                            };
+
+                            buffered.advance(decoded);
+                            if decoded != read {
+                                break;
+                            }
+                        }
+
+                        Poll::Ready(decoder.flush().transpose())
+                    });
+                    Ok(s.boxed())
                 }
             }
         }))
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 78828cbf8..a03b3681a 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -158,6 +158,7 @@ impl FileScanConfig {
         (table_schema, table_stats)
     }
 
+    #[allow(unused)] // Only used by avro
     fn projected_file_column_names(&self) -> Option<Vec<String>> {
         self.projection.as_ref().map(|p| {
             p.iter()
diff --git 
a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs 
b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 9d7f11462..62055ef16 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -385,7 +385,10 @@ macro_rules! get_min_max_values_for_page_index {
                     let vec = &index.indexes;
                     Decimal128Array::from(
                         vec.iter()
-                            .map(|x| x.$func().and_then(|x| 
Some(from_bytes_to_i128(x))))
+                            .map(|x| {
+                                x.$func()
+                                    .and_then(|x| 
Some(from_bytes_to_i128(x.as_ref())))
+                            })
                             .collect::<Vec<Option<i128>>>(),
                     )
                     .with_precision_and_scale(*precision, *scale)
@@ -397,7 +400,7 @@ macro_rules! get_min_max_values_for_page_index {
                     let array: StringArray = vec
                         .iter()
                         .map(|x| x.$func())
-                        .map(|x| x.and_then(|x| std::str::from_utf8(x).ok()))
+                        .map(|x| x.and_then(|x| 
std::str::from_utf8(x.as_ref()).ok()))
                         .collect();
                     Some(Arc::new(array))
                 }
@@ -411,7 +414,10 @@ macro_rules! get_min_max_values_for_page_index {
                     let vec = &index.indexes;
                     Decimal128Array::from(
                         vec.iter()
-                            .map(|x| x.$func().and_then(|x| 
Some(from_bytes_to_i128(x))))
+                            .map(|x| {
+                                x.$func()
+                                    .and_then(|x| 
Some(from_bytes_to_i128(x.as_ref())))
+                            })
                             .collect::<Vec<Option<i128>>>(),
                     )
                     .with_precision_and_scale(*precision, *scale)
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index b669653d5..d3acef15e 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -36,7 +36,7 @@ path = "src/lib.rs"
 
 [dependencies]
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
-arrow = { version = "31.0.0", default-features = false }
+arrow = { version = "32.0.0", default-features = false }
 datafusion-common = { path = "../common", version = "17.0.0" }
 log = "^0.4"
 sqlparser = "0.30"
diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml
index bb1d06e48..00cd0ced3 100644
--- a/datafusion/jit/Cargo.toml
+++ b/datafusion/jit/Cargo.toml
@@ -36,7 +36,7 @@ path = "src/lib.rs"
 jit = []
 
 [dependencies]
-arrow = { version = "31.0.0", default-features = false }
+arrow = { version = "32.0.0", default-features = false }
 cranelift = "0.89.0"
 cranelift-jit = "0.89.0"
 cranelift-module = "0.89.0"
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index 197fed6ea..dc6047706 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -37,7 +37,7 @@ default = ["unicode_expressions"]
 unicode_expressions = []
 
 [dependencies]
-arrow = { version = "31.0.0", features = ["prettyprint"] }
+arrow = { version = "32.0.0", features = ["prettyprint"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4.23", default-features = false }
 datafusion-common = { path = "../common", version = "17.0.0" }
diff --git a/datafusion/physical-expr/Cargo.toml 
b/datafusion/physical-expr/Cargo.toml
index 3274c934a..69c91a625 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -43,9 +43,9 @@ unicode_expressions = ["unicode-segmentation"]
 
 [dependencies]
 ahash = { version = "0.8", default-features = false, features = 
["runtime-rng"] }
-arrow = { version = "31.0.0", features = ["prettyprint"] }
-arrow-buffer = "31.0.0"
-arrow-schema = "31.0.0"
+arrow = { version = "32.0.0", features = ["prettyprint"] }
+arrow-buffer = "32.0.0"
+arrow-schema = "32.0.0"
 blake2 = { version = "^0.10.2", optional = true }
 blake3 = { version = "1.0", optional = true }
 chrono = { version = "0.4.23", default-features = false }
diff --git a/datafusion/physical-expr/src/regex_expressions.rs 
b/datafusion/physical-expr/src/regex_expressions.rs
index c5edf320f..ebc53e322 100644
--- a/datafusion/physical-expr/src/regex_expressions.rs
+++ b/datafusion/physical-expr/src/regex_expressions.rs
@@ -438,9 +438,9 @@ mod tests {
     #[test]
     fn test_static_pattern_regexp_replace_early_abort() {
         let values = StringArray::from(vec!["abc"; 5]);
-        let patterns = StringArray::from(vec![None; 5]);
+        let patterns = StringArray::from(vec![None::<&str>; 5]);
         let replacements = StringArray::from(vec!["foo"; 5]);
-        let expected = StringArray::from(vec![None; 5]);
+        let expected = StringArray::from(vec![None::<&str>; 5]);
 
         let re = _regexp_replace_static_pattern_replace::<i32>(&[
             Arc::new(values),
@@ -474,8 +474,8 @@ mod tests {
         let values = StringArray::from(vec!["abc"; 5]);
         let patterns = StringArray::from(vec!["a"; 5]);
         let replacements = StringArray::from(vec!["foo"; 5]);
-        let flags = StringArray::from(vec![None; 5]);
-        let expected = StringArray::from(vec![None; 5]);
+        let flags = StringArray::from(vec![None::<&str>; 5]);
+        let expected = StringArray::from(vec![None::<&str>; 5]);
 
         let re = _regexp_replace_static_pattern_replace::<i32>(&[
             Arc::new(values),
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index 951e92e87..fad152e4d 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -40,7 +40,7 @@ default = []
 json = ["pbjson", "serde", "serde_json"]
 
 [dependencies]
-arrow = "31.0.0"
+arrow = "32.0.0"
 chrono = { version = "0.4", default-features = false }
 datafusion = { path = "../core", version = "17.0.0" }
 datafusion-common = { path = "../common", version = "17.0.0" }
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index cd96cd97d..90c99c0d9 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -223,6 +223,11 @@ impl TryFrom<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
                     "Proto serialization error: The Map data type is not yet 
supported".to_owned()
                 ))
             }
+            DataType::RunEndEncoded(_, _) => {
+                return Err(Error::General(
+                    "Proto serialization error: The RunEndEncoded data type is 
not yet supported".to_owned()
+                ))
+            }
         };
 
         Ok(res)
diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml
index 0ecbf0ad3..fd541de55 100644
--- a/datafusion/row/Cargo.toml
+++ b/datafusion/row/Cargo.toml
@@ -37,7 +37,7 @@ path = "src/lib.rs"
 jit = ["datafusion-jit"]
 
 [dependencies]
-arrow = "31.0.0"
+arrow = "32.0.0"
 datafusion-common = { path = "../common", version = "17.0.0" }
 datafusion-jit = { path = "../jit", version = "17.0.0", optional = true }
 paste = "^1.0"
diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml
index 182292fb4..39d1a47a1 100644
--- a/datafusion/sql/Cargo.toml
+++ b/datafusion/sql/Cargo.toml
@@ -37,7 +37,7 @@ default = ["unicode_expressions"]
 unicode_expressions = []
 
 [dependencies]
-arrow-schema = "31.0.0"
+arrow-schema = "32.0.0"
 datafusion-common = { path = "../common", version = "17.0.0" }
 datafusion-expr = { path = "../expr", version = "17.0.0" }
 log = "^0.4"
diff --git a/parquet-test-utils/Cargo.toml b/parquet-test-utils/Cargo.toml
index 7f5a7f5ac..a4d72a4b7 100644
--- a/parquet-test-utils/Cargo.toml
+++ b/parquet-test-utils/Cargo.toml
@@ -25,4 +25,4 @@ edition = "2021"
 [dependencies]
 datafusion = { path = "../datafusion/core" }
 object_store = "0.5.0"
-parquet = "31.0.0"
+parquet = "32.0.0"
diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml
index 251bd95a4..2d591926b 100644
--- a/test-utils/Cargo.toml
+++ b/test-utils/Cargo.toml
@@ -23,7 +23,7 @@ edition = "2021"
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-arrow = { version = "31.0.0", features = ["prettyprint"] }
+arrow = { version = "32.0.0", features = ["prettyprint"] }
 datafusion-common = { path = "../datafusion/common" }
 env_logger = "0.10.0"
 rand = "0.8"


Reply via email to