This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new e4191dc7 Update to DataFusion 33 (#900)
e4191dc7 is described below
commit e4191dc7b76c00b319a7f04b0fdc452d6b2886eb
Author: Daniƫl Heres <[email protected]>
AuthorDate: Sun Nov 26 19:13:21 2023 +0000
Update to DataFusion 33 (#900)
* Update to DataFusion 33
* Test fixes
* Fix
* Upgrade crates
* Tests
* Tests
* Use parquet file instead
---
Cargo.toml | 14 ++---
ballista/client/src/columnar_batch.rs | 8 ++-
ballista/client/src/context.rs | 34 +++++------
ballista/core/Cargo.toml | 2 +-
.../core/src/execution_plans/distributed_query.rs | 4 +-
.../core/src/execution_plans/shuffle_reader.rs | 68 +++++++++++-----------
.../core/src/execution_plans/shuffle_writer.rs | 2 +-
.../core/src/execution_plans/unresolved_shuffle.rs | 4 +-
ballista/executor/src/collect.rs | 2 +-
ballista/executor/src/executor.rs | 6 +-
ballista/scheduler/Cargo.toml | 6 +-
ballista/scheduler/src/flight_sql.rs | 5 +-
examples/examples/standalone-sql.rs | 10 ++--
13 files changed, 84 insertions(+), 81 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 6f34fa06..e4b5f324 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
resolver = "2"
[workspace.dependencies]
-arrow = { version = "47.0.0" }
-arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "47.0.0", default-features = false }
+arrow = { version = "48.0.0" }
+arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "48.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-datafusion = "32.0.0"
-datafusion-cli = "32.0.0"
-datafusion-proto = "32.0.0"
+datafusion = "33.0.0"
+datafusion-cli = "33.0.0"
+datafusion-proto = "33.0.0"
object_store = "0.7.0"
-sqlparser = "0.38.0"
+sqlparser = "0.39.0"
tonic = { version = "0.10" }
tonic-build = { version = "0.10", default-features = false, features = [
"transport",
diff --git a/ballista/client/src/columnar_batch.rs
b/ballista/client/src/columnar_batch.rs
index 3431f561..5e7fe89b 100644
--- a/ballista/client/src/columnar_batch.rs
+++ b/ballista/client/src/columnar_batch.rs
@@ -147,10 +147,12 @@ impl ColumnarValue {
}
}
- pub fn to_arrow(&self) -> ArrayRef {
+ pub fn to_arrow(&self) -> Result<ArrayRef> {
match self {
- ColumnarValue::Columnar(array) => array.clone(),
- ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n),
+ ColumnarValue::Columnar(array) => Ok(array.clone()),
+ ColumnarValue::Scalar(value, n) => {
+ value.to_array_of_size(*n).map_err(|x| x.into())
+ }
}
}
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 76c8d439..82ca1710 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -839,7 +839,7 @@ mod tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
- "| VARIANCE(test.id) |",
+ "| VAR(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
@@ -852,11 +852,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
- "+-----------------------+",
- "| VARIANCE_POP(test.id) |",
- "+-----------------------+",
- "| 5.250000000000001 |",
- "+-----------------------+",
+ "+-------------------+",
+ "| VAR_POP(test.id) |",
+ "+-------------------+",
+ "| 5.250000000000001 |",
+ "+-------------------+",
];
assert_result_eq(expected, &res);
@@ -867,7 +867,7 @@ mod tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
- "| VARIANCE(test.id) |",
+ "| VAR(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
@@ -908,11 +908,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
- "+--------------------------------------+",
- "| COVARIANCE(test.id,test.tinyint_col) |",
- "+--------------------------------------+",
- "| 0.28571428571428586 |",
- "+--------------------------------------+",
+ "+---------------------------------+",
+ "| COVAR(test.id,test.tinyint_col) |",
+ "+---------------------------------+",
+ "| 0.28571428571428586 |",
+ "+---------------------------------+",
];
assert_result_eq(expected, &res);
@@ -922,11 +922,11 @@ mod tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
- "+---------------------------------------+",
- "| CORRELATION(test.id,test.tinyint_col) |",
- "+---------------------------------------+",
- "| 0.21821789023599245 |",
- "+---------------------------------------+",
+ "+--------------------------------+",
+ "| CORR(test.id,test.tinyint_col) |",
+ "+--------------------------------+",
+ "| 0.21821789023599245 |",
+ "+--------------------------------+",
];
assert_result_eq(expected, &res);
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 31a5d495..d8d4bfcc 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -57,7 +57,7 @@ datafusion-proto = { workspace = true }
futures = "0.3"
hashbrown = "0.14"
-itertools = "0.11"
+itertools = "0.12"
libloading = "0.8.0"
log = "0.4"
md-5 = { version = "^0.10.0" }
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index ccb26206..13511173 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -210,11 +210,11 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
// This execution plan sends the logical plan to the scheduler without
// performing the node by node conversion to a full physical plan.
// This implies that we cannot infer the statistics at this stage.
- Statistics::default()
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index fa3f9f69..6a77a16e 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -16,6 +16,7 @@
// under the License.
use async_trait::async_trait;
+use datafusion::common::stats::Precision;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
@@ -37,8 +38,8 @@ use datafusion::error::Result;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use futures::{Stream, StreamExt, TryStreamExt};
@@ -172,37 +173,39 @@ impl ExecutionPlan for ShuffleReaderExec {
Some(self.metrics.clone_inner())
}
- fn statistics(&self) -> Statistics {
- stats_for_partitions(
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(stats_for_partitions(
+ self.schema.fields().len(),
self.partition
.iter()
.flatten()
.map(|loc| loc.partition_stats),
- )
+ ))
}
}
fn stats_for_partitions(
+ num_fields: usize,
partition_stats: impl Iterator<Item = PartitionStats>,
) -> Statistics {
// TODO stats: add column statistics to PartitionStats
- partition_stats.fold(
- Statistics {
- is_exact: true,
- num_rows: Some(0),
- total_byte_size: Some(0),
- column_statistics: None,
- },
- |mut acc, part| {
+ let (num_rows, total_byte_size) =
+ partition_stats.fold((Some(0), Some(0)), |(num_rows, total_byte_size),
part| {
// if any statistic is unkown it makes the entire statistic unkown
- acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b
as usize);
- acc.total_byte_size = acc
- .total_byte_size
+ let num_rows = num_rows.zip(part.num_rows).map(|(a, b)| a + b as
usize);
+ let total_byte_size = total_byte_size
.zip(part.num_bytes)
.map(|(a, b)| a + b as usize);
- acc
- },
- )
+ (num_rows, total_byte_size)
+ });
+
+ Statistics {
+ num_rows: num_rows.map(Precision::Exact).unwrap_or(Precision::Absent),
+ total_byte_size: total_byte_size
+ .map(Precision::Exact)
+ .unwrap_or(Precision::Absent),
+ column_statistics: vec![ColumnStatistics::new_unknown(); num_fields],
+ }
}
struct LocalShuffleStream {
@@ -445,13 +448,12 @@ mod tests {
#[tokio::test]
async fn test_stats_for_partitions_empty() {
- let result = stats_for_partitions(std::iter::empty());
+ let result = stats_for_partitions(0, std::iter::empty());
let exptected = Statistics {
- is_exact: true,
- num_rows: Some(0),
- total_byte_size: Some(0),
- column_statistics: None,
+ num_rows: Precision::Exact(0),
+ total_byte_size: Precision::Exact(0),
+ column_statistics: vec![],
};
assert_eq!(result, exptected);
@@ -472,13 +474,12 @@ mod tests {
},
];
- let result = stats_for_partitions(part_stats.into_iter());
+ let result = stats_for_partitions(0, part_stats.into_iter());
let exptected = Statistics {
- is_exact: true,
- num_rows: Some(14),
- total_byte_size: Some(149),
- column_statistics: None,
+ num_rows: Precision::Exact(14),
+ total_byte_size: Precision::Exact(149),
+ column_statistics: vec![],
};
assert_eq!(result, exptected);
@@ -499,13 +500,12 @@ mod tests {
},
];
- let result = stats_for_partitions(part_stats.into_iter());
+ let result = stats_for_partitions(0, part_stats.into_iter());
let exptected = Statistics {
- is_exact: true,
- num_rows: None,
- total_byte_size: None,
- column_statistics: None,
+ num_rows: Precision::Absent,
+ total_byte_size: Precision::Absent,
+ column_statistics: vec![],
};
assert_eq!(result, exptected);
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 24869b2c..1896c206 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -418,7 +418,7 @@ impl ExecutionPlan for ShuffleWriterExec {
Some(self.metrics.clone_inner())
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
self.plan.statistics()
}
}
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index 0557529f..fe3610c4 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -115,9 +115,9 @@ impl ExecutionPlan for UnresolvedShuffleExec {
))
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
// The full statistics are computed in the `ShuffleReaderExec` node
// that replaces this one once the previous stage is completed.
- Statistics::default()
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 8dbccc32..22567bca 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -108,7 +108,7 @@ impl ExecutionPlan for CollectExec {
}))
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
self.plan.statistics()
}
}
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 4ee57eb8..60db9f00 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -210,7 +210,7 @@ mod test {
use crate::execution_engine::DefaultQueryStageExec;
use ballista_core::serde::scheduler::PartitionId;
- use datafusion::error::DataFusionError;
+ use datafusion::error::{DataFusionError, Result};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
@@ -299,8 +299,8 @@ mod test {
Ok(Box::pin(NeverendingRecordBatchStream))
}
- fn statistics(&self) -> Statistics {
- Statistics::default()
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(Statistics::new_unknown(&self.schema()))
}
}
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 470195ce..270c5f99 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -47,20 +47,20 @@ arrow-flight = { workspace = true }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
-base64 = { version = "0.13" }
+base64 = { version = "0.21" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = { workspace = true }
dashmap = "5.4.0"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
-etcd-client = { version = "0.11", optional = true }
+etcd-client = { version = "0.12", optional = true }
flatbuffers = { version = "23.5.26" }
futures = "0.3"
graphviz-rust = "0.6.1"
http = "0.2"
http-body = "0.4"
hyper = "0.14.4"
-itertools = "0.11.0"
+itertools = "0.12.0"
log = "0.4"
object_store = { workspace = true }
once_cell = { version = "1.16.0", optional = true }
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index 23a863e2..62c96966 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -35,6 +35,7 @@ use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
HandshakeResponse, Location, Ticket,
};
+use base64::Engine;
use futures::Stream;
use log::{debug, error, warn};
use std::convert::TryFrom;
@@ -503,8 +504,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
"Auth type not implemented: {authorization}"
)))?;
}
- let base64 = &authorization[basic.len()..];
- let bytes = base64::decode(base64)
+ let bytes = base64::engine::general_purpose::STANDARD
+ .decode(&authorization[basic.len()..])
.map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
let str = String::from_utf8(bytes)
.map_err(|_| Status::invalid_argument("authorization not
parsable"))?;
diff --git a/examples/examples/standalone-sql.rs
b/examples/examples/standalone-sql.rs
index b24a6350..0427caa7 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -17,7 +17,7 @@
use ballista::prelude::{BallistaConfig, BallistaContext, Result};
use ballista_examples::test_util;
-use datafusion::prelude::CsvReadOptions;
+use datafusion::execution::options::ParquetReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
@@ -29,11 +29,11 @@ async fn main() -> Result<()> {
let testdata = test_util::examples_test_data();
- // register csv file with the execution context
- ctx.register_csv(
+ // register parquet file with the execution context
+ ctx.register_parquet(
"test",
- &format!("{testdata}/aggregate_test_100.csv"),
- CsvReadOptions::new(),
+ &format!("{testdata}/alltypes_plain.parquet"),
+ ParquetReadOptions::default(),
)
.await?;