This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 14176ff  Update to arrow-7.0.0 (#1523)
14176ff is described below

commit 14176ffb50307b1d550729c8334658293e057f87
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jan 12 16:44:23 2022 -0500

    Update to arrow-7.0.0 (#1523)
    
    * Update tonic/prost deps
    
    * Update to arrow 7.0.0-SNAPSHOT
    
    * Update datafusion and tests for arrow changes
    
    * fix doc tests
    
    * Update avro support
    
    * Use released arrow 7.0.0
---
 ballista-examples/Cargo.toml                       |  4 +-
 ballista/rust/core/Cargo.toml                      | 11 +--
 ballista/rust/core/proto/ballista.proto            |  8 ++
 ballista/rust/core/src/client.rs                   | 14 ++--
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 89 +++++++++++++---------
 .../rust/core/src/serde/logical_plan/to_proto.rs   | 25 ++++--
 ballista/rust/core/src/serde/mod.rs                | 20 ++++-
 ballista/rust/executor/Cargo.toml                  |  6 +-
 ballista/rust/scheduler/Cargo.toml                 |  6 +-
 datafusion-cli/Cargo.toml                          |  2 +-
 datafusion-examples/Cargo.toml                     |  6 +-
 datafusion/Cargo.toml                              |  4 +-
 datafusion/src/avro_to_arrow/schema.rs             |  7 +-
 datafusion/src/lib.rs                              |  6 +-
 datafusion/src/physical_plan/file_format/csv.rs    |  2 +
 .../src/physical_plan/sort_preserving_merge.rs     | 33 +++++---
 datafusion/src/test_util.rs                        |  8 +-
 datafusion/tests/parquet_pruning.rs                |  4 +-
 datafusion/tests/sql/explain_analyze.rs            | 20 +++--
 datafusion/tests/sql/select.rs                     |  4 +-
 datafusion/tests/user_defined_plan.rs              |  4 +-
 21 files changed, 182 insertions(+), 101 deletions(-)

diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml
index a2d2fd6..338f699 100644
--- a/ballista-examples/Cargo.toml
+++ b/ballista-examples/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.57"
 [dependencies]
 datafusion = { path = "../datafusion" }
 ballista = { path = "../ballista/rust/client", version = "0.6.0"}
-prost = "0.8"
-tonic = "0.5"
+prost = "0.9"
+tonic = "0.6"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 futures = "0.3"
 num_cpus = "1.13.0"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 16ec07a..bbf8e27 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,18 +35,15 @@ async-trait = "0.1.36"
 futures = "0.3"
 hashbrown = "0.11"
 log = "0.4"
-prost = "0.8"
+prost = "0.9"
 serde = {version = "1", features = ["derive"]}
 sqlparser = "0.13"
 tokio = "1.0"
-tonic = "0.5"
+tonic = "0.6"
 uuid = { version = "0.8", features = ["v4"] }
 chrono = { version = "0.4", default-features = false }
 
-# workaround for https://github.com/apache/arrow-datafusion/issues/1498
-# should be able to remove when we update arrow-flight
-quote = "=1.0.10"
-arrow-flight = { version = "6.4.0"  }
+arrow-flight = { version = "7.0.0"  }
 
 datafusion = { path = "../../../datafusion", version = "6.0.0" }
 
@@ -54,4 +51,4 @@ datafusion = { path = "../../../datafusion", version = 
"6.0.0" }
 tempfile = "3"
 
 [build-dependencies]
-tonic-build = { version = "0.5" }
+tonic-build = { version = "0.6" }
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index aa7b6a9..3fa14f1 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -1015,6 +1015,7 @@ enum TimeUnit{
 enum IntervalUnit{
     YearMonth = 0;
     DayTime = 1;
+    MonthDayNano = 2;
 }
 
 message Decimal{
@@ -1040,11 +1041,18 @@ message Struct{
     repeated Field sub_field_types = 1;
 }
 
+enum UnionMode{
+  sparse = 0;
+  dense = 1;
+}
+
 message Union{
     repeated Field union_types = 1;
+    UnionMode union_mode = 2;
 }
 
 
+
 message ScalarListValue{
     ScalarType datatype = 1;
     repeated ScalarValue values = 2;
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index 26c8d22..4b82c34 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -17,7 +17,7 @@
 
 //! Client API for sending requests to executors.
 
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::{collections::HashMap, pin::Pin};
 use std::{
     convert::{TryFrom, TryInto},
@@ -135,13 +135,16 @@ impl BallistaClient {
 }
 
 struct FlightDataStream {
-    stream: Streaming<FlightData>,
+    stream: Mutex<Streaming<FlightData>>,
     schema: SchemaRef,
 }
 
 impl FlightDataStream {
     pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
-        Self { stream, schema }
+        Self {
+            stream: Mutex::new(stream),
+            schema,
+        }
     }
 }
 
@@ -149,10 +152,11 @@ impl Stream for FlightDataStream {
     type Item = ArrowResult<RecordBatch>;
 
     fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
+        self: std::pin::Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx).map(|x| match x {
+        let mut stream = self.stream.lock().expect("mutex is bad");
+        stream.poll_next_unpin(cx).map(|x| match x {
             Some(flight_data_chunk_result) => {
                 let converted_chunk = flight_data_chunk_result
                     .map_err(|e| ArrowError::from_external_error(Box::new(e)))
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index a0f481a..94d4e8e 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -24,6 +24,7 @@ mod roundtrip_tests {
     use super::super::{super::error::Result, protobuf};
     use crate::error::BallistaError;
     use core::panic;
+    use datafusion::arrow::datatypes::UnionMode;
     use datafusion::logical_plan::Repartition;
     use datafusion::{
         arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
@@ -413,25 +414,31 @@ mod roundtrip_tests {
                     true,
                 ),
             ]),
-            DataType::Union(vec![
-                Field::new("nullable", DataType::Boolean, false),
-                Field::new("name", DataType::Utf8, false),
-                Field::new("datatype", DataType::Binary, false),
-            ]),
-            DataType::Union(vec![
-                Field::new("nullable", DataType::Boolean, false),
-                Field::new("name", DataType::Utf8, false),
-                Field::new("datatype", DataType::Binary, false),
-                Field::new(
-                    "nested_struct",
-                    DataType::Struct(vec![
-                        Field::new("nullable", DataType::Boolean, false),
-                        Field::new("name", DataType::Utf8, false),
-                        Field::new("datatype", DataType::Binary, false),
-                    ]),
-                    true,
-                ),
-            ]),
+            DataType::Union(
+                vec![
+                    Field::new("nullable", DataType::Boolean, false),
+                    Field::new("name", DataType::Utf8, false),
+                    Field::new("datatype", DataType::Binary, false),
+                ],
+                UnionMode::Dense,
+            ),
+            DataType::Union(
+                vec![
+                    Field::new("nullable", DataType::Boolean, false),
+                    Field::new("name", DataType::Utf8, false),
+                    Field::new("datatype", DataType::Binary, false),
+                    Field::new(
+                        "nested_struct",
+                        DataType::Struct(vec![
+                            Field::new("nullable", DataType::Boolean, false),
+                            Field::new("name", DataType::Utf8, false),
+                            Field::new("datatype", DataType::Binary, false),
+                        ]),
+                        true,
+                    ),
+                ],
+                UnionMode::Sparse,
+            ),
             DataType::Dictionary(
                 Box::new(DataType::Utf8),
                 Box::new(DataType::Struct(vec![
@@ -558,25 +565,31 @@ mod roundtrip_tests {
                     true,
                 ),
             ]),
-            DataType::Union(vec![
-                Field::new("nullable", DataType::Boolean, false),
-                Field::new("name", DataType::Utf8, false),
-                Field::new("datatype", DataType::Binary, false),
-            ]),
-            DataType::Union(vec![
-                Field::new("nullable", DataType::Boolean, false),
-                Field::new("name", DataType::Utf8, false),
-                Field::new("datatype", DataType::Binary, false),
-                Field::new(
-                    "nested_struct",
-                    DataType::Struct(vec![
-                        Field::new("nullable", DataType::Boolean, false),
-                        Field::new("name", DataType::Utf8, false),
-                        Field::new("datatype", DataType::Binary, false),
-                    ]),
-                    true,
-                ),
-            ]),
+            DataType::Union(
+                vec![
+                    Field::new("nullable", DataType::Boolean, false),
+                    Field::new("name", DataType::Utf8, false),
+                    Field::new("datatype", DataType::Binary, false),
+                ],
+                UnionMode::Sparse,
+            ),
+            DataType::Union(
+                vec![
+                    Field::new("nullable", DataType::Boolean, false),
+                    Field::new("name", DataType::Utf8, false),
+                    Field::new("datatype", DataType::Binary, false),
+                    Field::new(
+                        "nested_struct",
+                        DataType::Struct(vec![
+                            Field::new("nullable", DataType::Boolean, false),
+                            Field::new("name", DataType::Utf8, false),
+                            Field::new("datatype", DataType::Binary, false),
+                        ]),
+                        true,
+                    ),
+                ],
+                UnionMode::Dense,
+            ),
             DataType::Dictionary(
                 Box::new(DataType::Utf8),
                 Box::new(DataType::Struct(vec![
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 01428d9..46543ea 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -22,7 +22,7 @@
 use super::super::proto_error;
 use crate::serde::{byte_to_string, protobuf, BallistaError};
 use datafusion::arrow::datatypes::{
-    DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
+    DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
 };
 use datafusion::datasource::file_format::avro::AvroFormat;
 use datafusion::datasource::file_format::csv::CsvFormat;
@@ -60,6 +60,7 @@ impl protobuf::IntervalUnit {
         match interval_unit {
             IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
             IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
+            IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
         }
     }
 
@@ -71,6 +72,7 @@ impl protobuf::IntervalUnit {
             Some(interval_unit) => Ok(match interval_unit {
                 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
                 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
+                protobuf::IntervalUnit::MonthDayNano => 
IntervalUnit::MonthDayNano,
             }),
             None => Err(proto_error(
                 "Error converting i32 to DateUnit: Passed invalid variant",
@@ -238,12 +240,19 @@ impl From<&DataType> for 
protobuf::arrow_type::ArrowTypeEnum {
                     .map(|field| field.into())
                     .collect::<Vec<_>>(),
             }),
-            DataType::Union(union_types) => 
ArrowTypeEnum::Union(protobuf::Union {
-                union_types: union_types
-                    .iter()
-                    .map(|field| field.into())
-                    .collect::<Vec<_>>(),
-            }),
+            DataType::Union(union_types, union_mode) => {
+                let union_mode = match union_mode {
+                    UnionMode::Sparse => protobuf::UnionMode::Sparse,
+                    UnionMode::Dense => protobuf::UnionMode::Dense,
+                };
+                ArrowTypeEnum::Union(protobuf::Union {
+                    union_types: union_types
+                        .iter()
+                        .map(|field| field.into())
+                        .collect::<Vec<_>>(),
+                    union_mode: union_mode.into(),
+                })
+            }
             DataType::Dictionary(key_type, value_type) => {
                 ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary {
                     key: Some(Box::new(key_type.as_ref().into())),
@@ -387,7 +396,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype 
{
             | DataType::FixedSizeList(_, _)
             | DataType::LargeList(_)
             | DataType::Struct(_)
-            | DataType::Union(_)
+            | DataType::Union(_, _)
             | DataType::Dictionary(_, _)
             | DataType::Map(_, _)
             | DataType::Decimal(_, _) => {
diff --git a/ballista/rust/core/src/serde/mod.rs 
b/ballista/rust/core/src/serde/mod.rs
index fd3b57b..e68983a 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -20,6 +20,7 @@
 
 use std::{convert::TryInto, io::Cursor};
 
+use datafusion::arrow::datatypes::UnionMode;
 use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -246,13 +247,24 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
                     .map(|field| field.try_into())
                     .collect::<Result<Vec<_>, _>>()?,
             ),
-            arrow_type::ArrowTypeEnum::Union(union) => DataType::Union(
-                union
+            arrow_type::ArrowTypeEnum::Union(union) => {
+                let union_mode = 
protobuf::UnionMode::from_i32(union.union_mode)
+                    .ok_or_else(|| {
+                        proto_error(
+                            "Protobuf deserialization error: Unknown union 
mode type",
+                        )
+                    })?;
+                let union_mode = match union_mode {
+                    protobuf::UnionMode::Dense => UnionMode::Dense,
+                    protobuf::UnionMode::Sparse => UnionMode::Sparse,
+                };
+                let union_types = union
                     .union_types
                     .iter()
                     .map(|field| field.try_into())
-                    .collect::<Result<Vec<_>, _>>()?,
-            ),
+                    .collect::<Result<Vec<_>, _>>()?;
+                DataType::Union(union_types, union_mode)
+            }
             arrow_type::ArrowTypeEnum::Dictionary(dict) => {
                 let pb_key_datatype = dict
                     .as_ref()
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index 00f3aab..c01bb20 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -29,8 +29,8 @@ edition = "2018"
 snmalloc = ["snmalloc-rs"]
 
 [dependencies]
-arrow = { version = "6.4.0"  }
-arrow-flight = { version = "6.4.0"  }
+arrow = { version = "7.0.0"  }
+arrow-flight = { version = "7.0.0"  }
 anyhow = "1"
 async-trait = "0.1.36"
 ballista-core = { path = "../core", version = "0.6.0" }
@@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"], 
optional = true}
 tempfile = "3"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
 tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.5"
+tonic = "0.6"
 uuid = { version = "0.8", features = ["v4"] }
 
 [dev-dependencies]
diff --git a/ballista/rust/scheduler/Cargo.toml 
b/ballista/rust/scheduler/Cargo.toml
index a71be40..0bacccf 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -44,13 +44,13 @@ http-body = "0.4"
 hyper = "0.14.4"
 log = "0.4"
 parse_arg = "0.1.3"
-prost = "0.8"
+prost = "0.9"
 rand = "0.8"
 serde = {version = "1", features = ["derive"]}
 sled_package = { package = "sled", version = "0.34", optional = true }
 tokio = { version = "1.0", features = ["full"] }
 tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.5"
+tonic = "0.6"
 tower = { version = "0.4" }
 warp = "0.3"
 
@@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] }
 
 [build-dependencies]
 configure_me_codegen = "0.4.1"
-tonic-build = { version = "0.5" }
+tonic-build = { version = "0.6" }
 
 [package.metadata.configure_me.bin]
 scheduler = "scheduler_config_spec.toml"
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 394bd1e..d5347d8 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,5 +31,5 @@ clap = "2.33"
 rustyline = "9.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 datafusion = { path = "../datafusion", version = "6.0.0" }
-arrow = { version = "6.4.0" }
+arrow = { version = "7.0.0" }
 ballista = { path = "../ballista/rust/client", version = "0.6.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index f7ef66d..24d453e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -34,10 +34,10 @@ path = "examples/avro_sql.rs"
 required-features = ["datafusion/avro"]
 
 [dev-dependencies]
-arrow-flight = { version = "6.4.0" }
+arrow-flight = { version = "7.0.0" }
 datafusion = { path = "../datafusion" }
-prost = "0.8"
-tonic = "0.5"
+prost = "0.9"
+tonic = "0.6"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync"] }
 futures = "0.3"
 num_cpus = "1.13.0"
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index b919282..46e2cbe 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"]
 [dependencies]
 ahash = { version = "0.7", default-features = false }
 hashbrown = { version = "0.11", features = ["raw"] }
-arrow = { version = "6.4.0", features = ["prettyprint"] }
-parquet = { version = "6.4.0", features = ["arrow"] }
+arrow = { version = "7.0.0", features = ["prettyprint"] }
+parquet = { version = "7.0.0", features = ["arrow"] }
 sqlparser = "0.13"
 paste = "^1.0"
 num_cpus = "1.13.0"
diff --git a/datafusion/src/avro_to_arrow/schema.rs 
b/datafusion/src/avro_to_arrow/schema.rs
index c6eda80..2e9a17d 100644
--- a/datafusion/src/avro_to_arrow/schema.rs
+++ b/datafusion/src/avro_to_arrow/schema.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit, 
UnionMode};
 use crate::error::{DataFusionError, Result};
 use arrow::datatypes::Field;
 use avro_rs::schema::Name;
@@ -103,7 +103,7 @@ fn schema_to_field_with_props(
                     .iter()
                     .map(|s| schema_to_field_with_props(s, None, has_nullable, 
None))
                     .collect::<Result<Vec<Field>>>()?;
-                DataType::Union(fields)
+                DataType::Union(fields, UnionMode::Dense)
             }
         }
         AvroSchema::Record { name, fields, .. } => {
@@ -201,6 +201,7 @@ fn default_field_name(dt: &DataType) -> &str {
         DataType::Interval(unit) => match unit {
             IntervalUnit::YearMonth => "intervalyear",
             IntervalUnit::DayTime => "intervalmonth",
+            IntervalUnit::MonthDayNano => "intervalmonthdaynano",
         },
         DataType::Binary => "varbinary",
         DataType::FixedSizeBinary(_) => "fixedsizebinary",
@@ -211,7 +212,7 @@ fn default_field_name(dt: &DataType) -> &str {
         DataType::FixedSizeList(_, _) => "fixed_size_list",
         DataType::LargeList(_) => "largelist",
         DataType::Struct(_) => "struct",
-        DataType::Union(_) => "union",
+        DataType::Union(_, _) => "union",
         DataType::Dictionary(_, _) => "map",
         DataType::Map(_, _) => unimplemented!("Map support not implemented"),
         DataType::Decimal(_, _) => "decimal",
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index 26d55bc..fd574d7 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -48,7 +48,8 @@
 //! let results: Vec<RecordBatch> = df.collect().await?;
 //!
 //! // format the results
-//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
+//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
+//!    .to_string();
 //!
 //! let expected = vec![
 //!     "+---+--------------------------+",
@@ -83,7 +84,8 @@
 //! let results: Vec<RecordBatch> = df.collect().await?;
 //!
 //! // format the results
-//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
+//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
+//!   .to_string();
 //!
 //! let expected = vec![
 //!     "+---+----------------+",
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index efea300..f250baa 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -116,6 +116,7 @@ impl ExecutionPlan for CsvExec {
 
         let fun = move |file, remaining: &Option<usize>| {
             let bounds = remaining.map(|x| (0, x + start_line));
+            let datetime_format = None;
             Box::new(csv::Reader::new(
                 file,
                 Arc::clone(&file_schema),
@@ -124,6 +125,7 @@ impl ExecutionPlan for CsvExec {
                 batch_size,
                 bounds,
                 file_projection.clone(),
+                datetime_format,
             )) as BatchIter
         };
 
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sort_preserving_merge.rs
index c90c653..6326580 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -975,8 +975,12 @@ mod tests {
         let basic = basic_sort(csv.clone(), sort.clone()).await;
         let partition = partition_sort(csv, sort).await;
 
-        let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
-        let partition = 
arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+            .unwrap()
+            .to_string();
+        let partition = 
arrow::util::pretty::pretty_format_batches(&[partition])
+            .unwrap()
+            .to_string();
 
         assert_eq!(
             basic, partition,
@@ -1072,8 +1076,12 @@ mod tests {
         assert_eq!(basic.num_rows(), 300);
         assert_eq!(partition.num_rows(), 300);
 
-        let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
-        let partition = 
arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+            .unwrap()
+            .to_string();
+        let partition = 
arrow::util::pretty::pretty_format_batches(&[partition])
+            .unwrap()
+            .to_string();
 
         assert_eq!(basic, partition);
     }
@@ -1106,9 +1114,12 @@ mod tests {
         assert_eq!(basic.num_rows(), 300);
         assert_eq!(merged.iter().map(|x| x.num_rows()).sum::<usize>(), 300);
 
-        let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
-        let partition =
-            
arrow::util::pretty::pretty_format_batches(merged.as_slice()).unwrap();
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+            .unwrap()
+            .to_string();
+        let partition = 
arrow::util::pretty::pretty_format_batches(merged.as_slice())
+            .unwrap()
+            .to_string();
 
         assert_eq!(basic, partition);
     }
@@ -1245,8 +1256,12 @@ mod tests {
         let merged = merged.remove(0);
         let basic = basic_sort(batches, sort.clone()).await;
 
-        let basic = 
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
-        let partition = 
arrow::util::pretty::pretty_format_batches(&[merged]).unwrap();
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+            .unwrap()
+            .to_string();
+        let partition = arrow::util::pretty::pretty_format_batches(&[merged])
+            .unwrap()
+            .to_string();
 
         assert_eq!(
             basic, partition,
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index f1fb4db..af66503 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -38,7 +38,9 @@ macro_rules! assert_batches_eq {
         let expected_lines: Vec<String> =
             $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
 
-        let formatted = 
arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap();
+        let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
+            .unwrap()
+            .to_string();
 
         let actual_lines: Vec<&str> = formatted.trim().lines().collect();
 
@@ -72,7 +74,9 @@ macro_rules! assert_batches_sorted_eq {
             expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
         }
 
-        let formatted = 
arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap();
+        let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
+            .unwrap()
+            .to_string();
         // fix for windows: \r\n -->
 
         let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
diff --git a/datafusion/tests/parquet_pruning.rs 
b/datafusion/tests/parquet_pruning.rs
index 194563a..ee27a33 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -528,7 +528,7 @@ impl ContextWithParquet {
             .collect()
             .await
             .expect("getting input");
-        let pretty_input = pretty_format_batches(&input).unwrap();
+        let pretty_input = pretty_format_batches(&input).unwrap().to_string();
 
         let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing 
plan");
         let physical_plan = self
@@ -564,7 +564,7 @@ impl ContextWithParquet {
 
         let result_rows = results.iter().map(|b| b.num_rows()).sum();
 
-        let pretty_results = pretty_format_batches(&results).unwrap();
+        let pretty_results = 
pretty_format_batches(&results).unwrap().to_string();
 
         let sql = sql.into();
         TestOutput {
diff --git a/datafusion/tests/sql/explain_analyze.rs 
b/datafusion/tests/sql/explain_analyze.rs
index 47e7290..a9cef73 100644
--- a/datafusion/tests/sql/explain_analyze.rs
+++ b/datafusion/tests/sql/explain_analyze.rs
@@ -42,7 +42,9 @@ async fn explain_analyze_baseline_metrics() {
     let plan = ctx.optimize(&plan).unwrap();
     let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
     let results = collect(physical_plan.clone()).await.unwrap();
-    let formatted = 
arrow::util::pretty::pretty_format_batches(&results).unwrap();
+    let formatted = arrow::util::pretty::pretty_format_batches(&results)
+        .unwrap()
+        .to_string();
     println!("Query Output:\n\n{}", formatted);
 
     assert_metrics!(
@@ -548,13 +550,17 @@ async fn explain_analyze_runs_optimizers() {
 
     let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
     let actual = execute_to_batches(&mut ctx, sql).await;
-    let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let actual = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
     assert_contains!(actual, expected);
 
     // EXPLAIN ANALYZE should work the same
     let sql = "EXPLAIN  ANALYZE SELECT count(*) from alltypes_plain";
     let actual = execute_to_batches(&mut ctx, sql).await;
-    let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let actual = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
     assert_contains!(actual, expected);
 }
 
@@ -760,7 +766,9 @@ async fn csv_explain_analyze() {
     register_aggregate_csv_by_sql(&mut ctx).await;
     let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100 
group by c1";
     let actual = execute_to_batches(&mut ctx, sql).await;
-    let formatted = 
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
 
     // Only test basic plumbing and try to avoid having to change too
     // many things. explain_analyze_baseline_metrics covers the values
@@ -780,7 +788,9 @@ async fn csv_explain_analyze_verbose() {
     let sql =
         "EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100 
group by c1";
     let actual = execute_to_batches(&mut ctx, sql).await;
-    let formatted = 
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
 
     let verbose_needle = "Output Rows";
     assert_contains!(formatted, verbose_needle);
diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs
index 8d0d12f..cfe0fac 100644
--- a/datafusion/tests/sql/select.rs
+++ b/datafusion/tests/sql/select.rs
@@ -495,7 +495,9 @@ async fn use_between_expression_in_select_query() -> 
Result<()> {
 
     let sql = "EXPLAIN SELECT c1 BETWEEN 2 AND 3 FROM test";
     let actual = execute_to_batches(&mut ctx, sql).await;
-    let formatted = 
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
 
     // Only test that the projection exprs arecorrect, rather than entire 
output
     let needle = "ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as test.c1 
BETWEEN Int64(2) AND Int64(3)]";
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index d3c6083..b603f6a 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -94,7 +94,9 @@ use datafusion::logical_plan::{DFSchemaRef, Limit};
 async fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> {
     let df = ctx.sql(sql).await?;
     let batches = df.collect().await?;
-    pretty_format_batches(&batches).map_err(DataFusionError::ArrowError)
+    pretty_format_batches(&batches)
+        .map_err(DataFusionError::ArrowError)
+        .map(|d| d.to_string())
 }
 
 /// Create a test table.

Reply via email to