This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 14176ff Update to arrow-7.0.0 (#1523)
14176ff is described below
commit 14176ffb50307b1d550729c8334658293e057f87
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jan 12 16:44:23 2022 -0500
Update to arrow-7.0.0 (#1523)
* Update tonic/prost deps
* Update to arrow 7.0.0-SNAPSHOT
* Update datafusion and tests for arrow changes
* fix doc tests
* Update avro support
* Use released arrow 7.0.0
---
ballista-examples/Cargo.toml | 4 +-
ballista/rust/core/Cargo.toml | 11 +--
ballista/rust/core/proto/ballista.proto | 8 ++
ballista/rust/core/src/client.rs | 14 ++--
ballista/rust/core/src/serde/logical_plan/mod.rs | 89 +++++++++++++---------
.../rust/core/src/serde/logical_plan/to_proto.rs | 25 ++++--
ballista/rust/core/src/serde/mod.rs | 20 ++++-
ballista/rust/executor/Cargo.toml | 6 +-
ballista/rust/scheduler/Cargo.toml | 6 +-
datafusion-cli/Cargo.toml | 2 +-
datafusion-examples/Cargo.toml | 6 +-
datafusion/Cargo.toml | 4 +-
datafusion/src/avro_to_arrow/schema.rs | 7 +-
datafusion/src/lib.rs | 6 +-
datafusion/src/physical_plan/file_format/csv.rs | 2 +
.../src/physical_plan/sort_preserving_merge.rs | 33 +++++---
datafusion/src/test_util.rs | 8 +-
datafusion/tests/parquet_pruning.rs | 4 +-
datafusion/tests/sql/explain_analyze.rs | 20 +++--
datafusion/tests/sql/select.rs | 4 +-
datafusion/tests/user_defined_plan.rs | 4 +-
21 files changed, 182 insertions(+), 101 deletions(-)
diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml
index a2d2fd6..338f699 100644
--- a/ballista-examples/Cargo.toml
+++ b/ballista-examples/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.57"
[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
-prost = "0.8"
-tonic = "0.5"
+prost = "0.9"
+tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
futures = "0.3"
num_cpus = "1.13.0"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 16ec07a..bbf8e27 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,18 +35,15 @@ async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
-prost = "0.8"
+prost = "0.9"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.13"
tokio = "1.0"
-tonic = "0.5"
+tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }
-# workaround for https://github.com/apache/arrow-datafusion/issues/1498
-# should be able to remove when we update arrow-flight
-quote = "=1.0.10"
-arrow-flight = { version = "6.4.0" }
+arrow-flight = { version = "7.0.0" }
datafusion = { path = "../../../datafusion", version = "6.0.0" }
@@ -54,4 +51,4 @@ datafusion = { path = "../../../datafusion", version =
"6.0.0" }
tempfile = "3"
[build-dependencies]
-tonic-build = { version = "0.5" }
+tonic-build = { version = "0.6" }
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index aa7b6a9..3fa14f1 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -1015,6 +1015,7 @@ enum TimeUnit{
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
+ MonthDayNano = 2;
}
message Decimal{
@@ -1040,11 +1041,18 @@ message Struct{
repeated Field sub_field_types = 1;
}
+enum UnionMode{
+ sparse = 0;
+ dense = 1;
+}
+
message Union{
repeated Field union_types = 1;
+ UnionMode union_mode = 2;
}
+
message ScalarListValue{
ScalarType datatype = 1;
repeated ScalarValue values = 2;
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index 26c8d22..4b82c34 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -17,7 +17,7 @@
//! Client API for sending requests to executors.
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
@@ -135,13 +135,16 @@ impl BallistaClient {
}
struct FlightDataStream {
- stream: Streaming<FlightData>,
+ stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
}
impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
- Self { stream, schema }
+ Self {
+ stream: Mutex::new(stream),
+ schema,
+ }
}
}
@@ -149,10 +152,11 @@ impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
+ self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.stream.poll_next_unpin(cx).map(|x| match x {
+ let mut stream = self.stream.lock().expect("mutex is bad");
+ stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index a0f481a..94d4e8e 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -24,6 +24,7 @@ mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use core::panic;
+ use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
@@ -413,25 +414,31 @@ mod roundtrip_tests {
true,
),
]),
- DataType::Union(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- ]),
- DataType::Union(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- Field::new(
- "nested_struct",
- DataType::Struct(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- ]),
- true,
- ),
- ]),
+ DataType::Union(
+ vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ ],
+ UnionMode::Dense,
+ ),
+ DataType::Union(
+ vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ Field::new(
+ "nested_struct",
+ DataType::Struct(vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ ]),
+ true,
+ ),
+ ],
+ UnionMode::Sparse,
+ ),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
@@ -558,25 +565,31 @@ mod roundtrip_tests {
true,
),
]),
- DataType::Union(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- ]),
- DataType::Union(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- Field::new(
- "nested_struct",
- DataType::Struct(vec![
- Field::new("nullable", DataType::Boolean, false),
- Field::new("name", DataType::Utf8, false),
- Field::new("datatype", DataType::Binary, false),
- ]),
- true,
- ),
- ]),
+ DataType::Union(
+ vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ ],
+ UnionMode::Sparse,
+ ),
+ DataType::Union(
+ vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ Field::new(
+ "nested_struct",
+ DataType::Struct(vec![
+ Field::new("nullable", DataType::Boolean, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("datatype", DataType::Binary, false),
+ ]),
+ true,
+ ),
+ ],
+ UnionMode::Dense,
+ ),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 01428d9..46543ea 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -22,7 +22,7 @@
use super::super::proto_error;
use crate::serde::{byte_to_string, protobuf, BallistaError};
use datafusion::arrow::datatypes::{
- DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
+ DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
};
use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
@@ -60,6 +60,7 @@ impl protobuf::IntervalUnit {
match interval_unit {
IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
+ IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
}
}
@@ -71,6 +72,7 @@ impl protobuf::IntervalUnit {
Some(interval_unit) => Ok(match interval_unit {
protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
+ protobuf::IntervalUnit::MonthDayNano =>
IntervalUnit::MonthDayNano,
}),
None => Err(proto_error(
"Error converting i32 to DateUnit: Passed invalid variant",
@@ -238,12 +240,19 @@ impl From<&DataType> for
protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
- DataType::Union(union_types) =>
ArrowTypeEnum::Union(protobuf::Union {
- union_types: union_types
- .iter()
- .map(|field| field.into())
- .collect::<Vec<_>>(),
- }),
+ DataType::Union(union_types, union_mode) => {
+ let union_mode = match union_mode {
+ UnionMode::Sparse => protobuf::UnionMode::Sparse,
+ UnionMode::Dense => protobuf::UnionMode::Dense,
+ };
+ ArrowTypeEnum::Union(protobuf::Union {
+ union_types: union_types
+ .iter()
+ .map(|field| field.into())
+ .collect::<Vec<_>>(),
+ union_mode: union_mode.into(),
+ })
+ }
DataType::Dictionary(key_type, value_type) => {
ArrowTypeEnum::Dictionary(Box::new(protobuf::Dictionary {
key: Some(Box::new(key_type.as_ref().into())),
@@ -387,7 +396,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype
{
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
- | DataType::Union(_)
+ | DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Map(_, _)
| DataType::Decimal(_, _) => {
diff --git a/ballista/rust/core/src/serde/mod.rs
b/ballista/rust/core/src/serde/mod.rs
index fd3b57b..e68983a 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -20,6 +20,7 @@
use std::{convert::TryInto, io::Cursor};
+use datafusion::arrow::datatypes::UnionMode;
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -246,13 +247,24 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
),
- arrow_type::ArrowTypeEnum::Union(union) => DataType::Union(
- union
+ arrow_type::ArrowTypeEnum::Union(union) => {
+ let union_mode =
protobuf::UnionMode::from_i32(union.union_mode)
+ .ok_or_else(|| {
+ proto_error(
+ "Protobuf deserialization error: Unknown union
mode type",
+ )
+ })?;
+ let union_mode = match union_mode {
+ protobuf::UnionMode::Dense => UnionMode::Dense,
+ protobuf::UnionMode::Sparse => UnionMode::Sparse,
+ };
+ let union_types = union
.union_types
.iter()
.map(|field| field.try_into())
- .collect::<Result<Vec<_>, _>>()?,
- ),
+ .collect::<Result<Vec<_>, _>>()?;
+ DataType::Union(union_types, union_mode)
+ }
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let pb_key_datatype = dict
.as_ref()
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index 00f3aab..c01bb20 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -29,8 +29,8 @@ edition = "2018"
snmalloc = ["snmalloc-rs"]
[dependencies]
-arrow = { version = "6.4.0" }
-arrow-flight = { version = "6.4.0" }
+arrow = { version = "7.0.0" }
+arrow-flight = { version = "7.0.0" }
anyhow = "1"
async-trait = "0.1.36"
ballista-core = { path = "../core", version = "0.6.0" }
@@ -43,7 +43,7 @@ snmalloc-rs = {version = "0.2", features= ["cache-friendly"],
optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.5"
+tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
[dev-dependencies]
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index a71be40..0bacccf 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -44,13 +44,13 @@ http-body = "0.4"
hyper = "0.14.4"
log = "0.4"
parse_arg = "0.1.3"
-prost = "0.8"
+prost = "0.9"
rand = "0.8"
serde = {version = "1", features = ["derive"]}
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.5"
+tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
@@ -60,7 +60,7 @@ uuid = { version = "0.8", features = ["v4"] }
[build-dependencies]
configure_me_codegen = "0.4.1"
-tonic-build = { version = "0.5" }
+tonic-build = { version = "0.6" }
[package.metadata.configure_me.bin]
scheduler = "scheduler_config_spec.toml"
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 394bd1e..d5347d8 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,5 +31,5 @@ clap = "2.33"
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
-arrow = { version = "6.4.0" }
+arrow = { version = "7.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index f7ef66d..24d453e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -34,10 +34,10 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]
[dev-dependencies]
-arrow-flight = { version = "6.4.0" }
+arrow-flight = { version = "7.0.0" }
datafusion = { path = "../datafusion" }
-prost = "0.8"
-tonic = "0.5"
+prost = "0.9"
+tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync"] }
futures = "0.3"
num_cpus = "1.13.0"
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index b919282..46e2cbe 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -52,8 +52,8 @@ avro = ["avro-rs", "num-traits"]
[dependencies]
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.11", features = ["raw"] }
-arrow = { version = "6.4.0", features = ["prettyprint"] }
-parquet = { version = "6.4.0", features = ["arrow"] }
+arrow = { version = "7.0.0", features = ["prettyprint"] }
+parquet = { version = "7.0.0", features = ["arrow"] }
sqlparser = "0.13"
paste = "^1.0"
num_cpus = "1.13.0"
diff --git a/datafusion/src/avro_to_arrow/schema.rs
b/datafusion/src/avro_to_arrow/schema.rs
index c6eda80..2e9a17d 100644
--- a/datafusion/src/avro_to_arrow/schema.rs
+++ b/datafusion/src/avro_to_arrow/schema.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
+use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit,
UnionMode};
use crate::error::{DataFusionError, Result};
use arrow::datatypes::Field;
use avro_rs::schema::Name;
@@ -103,7 +103,7 @@ fn schema_to_field_with_props(
.iter()
.map(|s| schema_to_field_with_props(s, None, has_nullable,
None))
.collect::<Result<Vec<Field>>>()?;
- DataType::Union(fields)
+ DataType::Union(fields, UnionMode::Dense)
}
}
AvroSchema::Record { name, fields, .. } => {
@@ -201,6 +201,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Interval(unit) => match unit {
IntervalUnit::YearMonth => "intervalyear",
IntervalUnit::DayTime => "intervalmonth",
+ IntervalUnit::MonthDayNano => "intervalmonthdaynano",
},
DataType::Binary => "varbinary",
DataType::FixedSizeBinary(_) => "fixedsizebinary",
@@ -211,7 +212,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::FixedSizeList(_, _) => "fixed_size_list",
DataType::LargeList(_) => "largelist",
DataType::Struct(_) => "struct",
- DataType::Union(_) => "union",
+ DataType::Union(_, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index 26d55bc..fd574d7 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -48,7 +48,8 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
-//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
+//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
+//! .to_string();
//!
//! let expected = vec![
//! "+---+--------------------------+",
@@ -83,7 +84,8 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
-//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?;
+//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
+//! .to_string();
//!
//! let expected = vec![
//! "+---+----------------+",
diff --git a/datafusion/src/physical_plan/file_format/csv.rs
b/datafusion/src/physical_plan/file_format/csv.rs
index efea300..f250baa 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -116,6 +116,7 @@ impl ExecutionPlan for CsvExec {
let fun = move |file, remaining: &Option<usize>| {
let bounds = remaining.map(|x| (0, x + start_line));
+ let datetime_format = None;
Box::new(csv::Reader::new(
file,
Arc::clone(&file_schema),
@@ -124,6 +125,7 @@ impl ExecutionPlan for CsvExec {
batch_size,
bounds,
file_projection.clone(),
+ datetime_format,
)) as BatchIter
};
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs
b/datafusion/src/physical_plan/sort_preserving_merge.rs
index c90c653..6326580 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -975,8 +975,12 @@ mod tests {
let basic = basic_sort(csv.clone(), sort.clone()).await;
let partition = partition_sort(csv, sort).await;
- let basic =
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
- let partition =
arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+ let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+ .unwrap()
+ .to_string();
+ let partition =
arrow::util::pretty::pretty_format_batches(&[partition])
+ .unwrap()
+ .to_string();
assert_eq!(
basic, partition,
@@ -1072,8 +1076,12 @@ mod tests {
assert_eq!(basic.num_rows(), 300);
assert_eq!(partition.num_rows(), 300);
- let basic =
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
- let partition =
arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+ let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+ .unwrap()
+ .to_string();
+ let partition =
arrow::util::pretty::pretty_format_batches(&[partition])
+ .unwrap()
+ .to_string();
assert_eq!(basic, partition);
}
@@ -1106,9 +1114,12 @@ mod tests {
assert_eq!(basic.num_rows(), 300);
assert_eq!(merged.iter().map(|x| x.num_rows()).sum::<usize>(), 300);
- let basic =
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
- let partition =
-
arrow::util::pretty::pretty_format_batches(merged.as_slice()).unwrap();
+ let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+ .unwrap()
+ .to_string();
+ let partition =
arrow::util::pretty::pretty_format_batches(merged.as_slice())
+ .unwrap()
+ .to_string();
assert_eq!(basic, partition);
}
@@ -1245,8 +1256,12 @@ mod tests {
let merged = merged.remove(0);
let basic = basic_sort(batches, sort.clone()).await;
- let basic =
arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
- let partition =
arrow::util::pretty::pretty_format_batches(&[merged]).unwrap();
+ let basic = arrow::util::pretty::pretty_format_batches(&[basic])
+ .unwrap()
+ .to_string();
+ let partition = arrow::util::pretty::pretty_format_batches(&[merged])
+ .unwrap()
+ .to_string();
assert_eq!(
basic, partition,
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index f1fb4db..af66503 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -38,7 +38,9 @@ macro_rules! assert_batches_eq {
let expected_lines: Vec<String> =
$EXPECTED_LINES.iter().map(|&s| s.into()).collect();
- let formatted =
arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
+ .unwrap()
+ .to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
@@ -72,7 +74,9 @@ macro_rules! assert_batches_sorted_eq {
expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
- let formatted =
arrow::util::pretty::pretty_format_batches($CHUNKS).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS)
+ .unwrap()
+ .to_string();
// fix for windows: \r\n -->
let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
diff --git a/datafusion/tests/parquet_pruning.rs
b/datafusion/tests/parquet_pruning.rs
index 194563a..ee27a33 100644
--- a/datafusion/tests/parquet_pruning.rs
+++ b/datafusion/tests/parquet_pruning.rs
@@ -528,7 +528,7 @@ impl ContextWithParquet {
.collect()
.await
.expect("getting input");
- let pretty_input = pretty_format_batches(&input).unwrap();
+ let pretty_input = pretty_format_batches(&input).unwrap().to_string();
let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing
plan");
let physical_plan = self
@@ -564,7 +564,7 @@ impl ContextWithParquet {
let result_rows = results.iter().map(|b| b.num_rows()).sum();
- let pretty_results = pretty_format_batches(&results).unwrap();
+ let pretty_results =
pretty_format_batches(&results).unwrap().to_string();
let sql = sql.into();
TestOutput {
diff --git a/datafusion/tests/sql/explain_analyze.rs
b/datafusion/tests/sql/explain_analyze.rs
index 47e7290..a9cef73 100644
--- a/datafusion/tests/sql/explain_analyze.rs
+++ b/datafusion/tests/sql/explain_analyze.rs
@@ -42,7 +42,9 @@ async fn explain_analyze_baseline_metrics() {
let plan = ctx.optimize(&plan).unwrap();
let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
let results = collect(physical_plan.clone()).await.unwrap();
- let formatted =
arrow::util::pretty::pretty_format_batches(&results).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches(&results)
+ .unwrap()
+ .to_string();
println!("Query Output:\n\n{}", formatted);
assert_metrics!(
@@ -548,13 +550,17 @@ async fn explain_analyze_runs_optimizers() {
let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&mut ctx, sql).await;
- let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let actual = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
assert_contains!(actual, expected);
// EXPLAIN ANALYZE should work the same
let sql = "EXPLAIN ANALYZE SELECT count(*) from alltypes_plain";
let actual = execute_to_batches(&mut ctx, sql).await;
- let actual = arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let actual = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
assert_contains!(actual, expected);
}
@@ -760,7 +766,9 @@ async fn csv_explain_analyze() {
register_aggregate_csv_by_sql(&mut ctx).await;
let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100
group by c1";
let actual = execute_to_batches(&mut ctx, sql).await;
- let formatted =
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
// Only test basic plumbing and try to avoid having to change too
// many things. explain_analyze_baseline_metrics covers the values
@@ -780,7 +788,9 @@ async fn csv_explain_analyze_verbose() {
let sql =
"EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100
group by c1";
let actual = execute_to_batches(&mut ctx, sql).await;
- let formatted =
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
let verbose_needle = "Output Rows";
assert_contains!(formatted, verbose_needle);
diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs
index 8d0d12f..cfe0fac 100644
--- a/datafusion/tests/sql/select.rs
+++ b/datafusion/tests/sql/select.rs
@@ -495,7 +495,9 @@ async fn use_between_expression_in_select_query() ->
Result<()> {
let sql = "EXPLAIN SELECT c1 BETWEEN 2 AND 3 FROM test";
let actual = execute_to_batches(&mut ctx, sql).await;
- let formatted =
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
// Only test that the projection exprs arecorrect, rather than entire
output
let needle = "ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as test.c1
BETWEEN Int64(2) AND Int64(3)]";
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index d3c6083..b603f6a 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -94,7 +94,9 @@ use datafusion::logical_plan::{DFSchemaRef, Limit};
async fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> {
let df = ctx.sql(sql).await?;
let batches = df.collect().await?;
- pretty_format_batches(&batches).map_err(DataFusionError::ArrowError)
+ pretty_format_batches(&batches)
+ .map_err(DataFusionError::ArrowError)
+ .map(|d| d.to_string())
}
/// Create a test table.