This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new a3ac8575 Upgrade to DataFusion 18.0.0-rc1 (#664)
a3ac8575 is described below
commit a3ac85750a316a0b15921bf669b18cd457182192
Author: Andy Grove <[email protected]>
AuthorDate: Sat Feb 11 08:05:18 2023 -0700
Upgrade to DataFusion 18.0.0-rc1 (#664)
---
ballista-cli/Cargo.toml | 4 +--
ballista/client/Cargo.toml | 4 +--
ballista/client/src/context.rs | 9 +++--
ballista/core/Cargo.toml | 6 ++--
ballista/core/src/client.rs | 8 +++--
ballista/core/src/error.rs | 5 ++-
.../core/src/execution_plans/distributed_query.rs | 4 +--
.../core/src/execution_plans/shuffle_reader.rs | 6 ++--
ballista/executor/Cargo.toml | 8 ++---
ballista/executor/src/collect.rs | 6 ++--
ballista/executor/src/executor.rs | 4 +--
ballista/scheduler/Cargo.toml | 6 ++--
ballista/scheduler/src/planner.rs | 41 +++++++++-------------
benchmarks/Cargo.toml | 4 +--
examples/Cargo.toml | 2 +-
15 files changed, 59 insertions(+), 58 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 48261075..7787ba06 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0",
features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "17.0.0"
-datafusion-cli = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
+datafusion-cli = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index f57a6f2b..8e3db0f5 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional =
true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional =
true }
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 3269592d..b3b17c85 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -21,6 +21,7 @@ use datafusion::arrow::datatypes::SchemaRef;
use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
+use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
@@ -355,10 +356,14 @@ impl BallistaContext {
let state = self.state.lock();
for (name, prov) in &state.tables {
// ctx is shared between queries, check table exists or not
before register
- let table_ref = TableReference::Bare { table: name };
+ let table_ref = TableReference::Bare {
+ table: Cow::Borrowed(name),
+ };
if !ctx.table_exist(table_ref)? {
ctx.register_table(
- TableReference::Bare { table: name },
+ TableReference::Bare {
+ table: Cow::Borrowed(name),
+ },
Arc::clone(prov),
)?;
}
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 09272958..6da6b730 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false,
optional = true }
-datafusion-proto = "17.0.0"
+datafusion-proto = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
hashbrown = "0.13"
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d91c6fac..d4179b32 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -34,9 +34,10 @@ use
arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::{
datatypes::{Schema, SchemaRef},
- error::{ArrowError, Result as ArrowResult},
+ error::ArrowError,
record_batch::RecordBatch,
};
+use datafusion::error::DataFusionError;
use crate::serde::protobuf;
use crate::utils::create_grpc_client_connection;
@@ -162,7 +163,7 @@ impl FlightDataStream {
}
impl Stream for FlightDataStream {
- type Item = ArrowResult<RecordBatch>;
+ type Item = datafusion::error::Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
@@ -171,13 +172,14 @@ impl Stream for FlightDataStream {
self.stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
- .map_err(|e| ArrowError::from_external_error(Box::new(e)))
+ .map_err(|e|
ArrowError::from_external_error(Box::new(e)).into())
.and_then(|flight_data_chunk| {
flight_data_to_arrow_batch(
&flight_data_chunk,
self.schema.clone(),
&self.dictionaries_by_id,
)
+ .map_err(DataFusionError::ArrowError)
});
Some(converted_chunk)
}
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index e22ae9d6..67f75283 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -100,7 +100,10 @@ impl From<parser::ParserError> for BallistaError {
impl From<DataFusionError> for BallistaError {
fn from(e: DataFusionError) -> Self {
- BallistaError::DataFusionError(e)
+ match e {
+ DataFusionError::ArrowError(e) => Self::from(e),
+ _ => BallistaError::DataFusionError(e),
+ }
}
}
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index 75348692..ac461e14 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -25,7 +25,7 @@ use crate::serde::protobuf::{
};
use crate::utils::create_grpc_client_connection;
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
@@ -227,7 +227,7 @@ async fn execute_query(
scheduler_url: String,
session_id: String,
query: ExecuteQueryParams,
-) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
+) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {}", scheduler_url);
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
let connection = create_grpc_client_connection(scheduler_url)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 73733bcb..20c31c50 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -29,7 +29,7 @@ use crate::client::BallistaClient;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::record_batch::RecordBatch;
@@ -208,14 +208,14 @@ impl LocalShuffleStream {
}
impl Stream for LocalShuffleStream {
- type Item = ArrowResult<RecordBatch>;
+ type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.reader.next() {
- return Poll::Ready(Some(batch));
+ return Poll::Ready(Some(batch.map_err(|e| e.into())));
}
Poll::Ready(None)
}
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 7849d1e5..c2b667f5 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -38,15 +38,15 @@ default = ["mimalloc"]
[dependencies]
anyhow = "1"
-arrow = { version = "31.0.0" }
-arrow-flight = { version = "31.0.0" }
+arrow = { version = "32.0.0" }
+arrow-flight = { version = "32.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 2985e914..65e1c51c 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -22,9 +22,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};
-use datafusion::arrow::{
- datatypes::SchemaRef, error::Result as ArrowResult,
record_batch::RecordBatch,
-};
+use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
@@ -118,7 +116,7 @@ struct MergedRecordBatchStream {
}
impl Stream for MergedRecordBatchStream {
- type Item = ArrowResult<RecordBatch>;
+ type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 1c25baa3..d903db69 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -182,13 +182,13 @@ mod test {
use crate::executor::Executor;
use crate::metrics::LoggingMetricsCollector;
use arrow::datatypes::{Schema, SchemaRef};
- use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::execution::context::TaskContext;
use ballista_core::serde::scheduler::PartitionId;
+ use datafusion::error::DataFusionError;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
@@ -213,7 +213,7 @@ mod test {
}
impl Stream for NeverendingRecordBatchStream {
- type Item = Result<RecordBatch, ArrowError>;
+ type Item = Result<RecordBatch, DataFusionError>;
fn poll_next(
self: Pin<&mut Self>,
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 64da40d7..2375bfb0 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
-arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
@@ -51,8 +51,8 @@ base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index 87aaad77..dc069872 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -326,17 +326,15 @@ mod test {
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::joins::HashJoinExec;
+ use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
- use datafusion::physical_plan::{
- coalesce_partitions::CoalescePartitionsExec,
projection::ProjectionExec,
- };
+ use
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
- use std::ops::Deref;
-
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion_proto::protobuf::PhysicalPlanNode;
+ use std::ops::Deref;
use std::sync::Arc;
use uuid::Uuid;
@@ -375,19 +373,19 @@ mod test {
/* Expected result:
ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0
}], 2))
- AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag],
aggr=[SUM(l_extendedprice Multiply Int64(1))]
- CsvExec: source=Path(testdata/lineitem:
[testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]),
has_header=false
+ AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag],
aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
+ CsvExec: files={2 groups:
[[ballista/scheduler/testdata/lineitem/partition1.tbl],
[ballista/scheduler/testdata/lineitem/partition0.tbl]]}, has_header=false,
limit=None, projection=[l_extendedprice, l_returnflag]
ShuffleWriterExec: None
- ProjectionExec: expr=[l_returnflag@0 as l_returnflag,
SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
- AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
- CoalesceBatchesExec: target_batch_size=4096
- UnresolvedShuffleExec
+ SortExec: [l_returnflag@0 ASC NULLS LAST]
+ ProjectionExec: expr=[l_returnflag@0 as l_returnflag,
SUM(lineitem.l_extendedprice * Int64(1))@1 as sum_disc_price]
+ AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as
l_returnflag], aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
+ CoalesceBatchesExec: target_batch_size=8192
+ UnresolvedShuffleExec
ShuffleWriterExec: None
- SortExec: [l_returnflag@0 ASC]
- CoalescePartitionsExec
- UnresolvedShuffleExec
+ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST]
+ UnresolvedShuffleExec
*/
assert_eq!(3, stages.len());
@@ -399,7 +397,9 @@ mod test {
// verify stage 1
let stage1 = stages[1].children()[0].clone();
- let projection = downcast_exec!(stage1, ProjectionExec);
+ let sort = downcast_exec!(stage1, SortExec);
+ let projection = sort.children()[0].clone();
+ let projection = downcast_exec!(projection, ProjectionExec);
let final_hash = projection.children()[0].clone();
let final_hash = downcast_exec!(final_hash, AggregateExec);
assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
@@ -414,15 +414,8 @@ mod test {
// verify stage 2
let stage2 = stages[2].children()[0].clone();
- let sort = downcast_exec!(stage2, SortExec);
- let coalesce_partitions = sort.children()[0].clone();
- let coalesce_partitions =
- downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
- assert_eq!(
- coalesce_partitions.output_partitioning().partition_count(),
- 1
- );
- let unresolved_shuffle = coalesce_partitions.children()[0].clone();
+ let merge = downcast_exec!(stage2, SortPreservingMergeExec);
+ let unresolved_shuffle = merge.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
assert_eq!(unresolved_shuffle.stage_id, 2);
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 0f437d07..a960a0c8 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 18fbaad5..d381c029 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "17.0.0"
+datafusion = { version = "18.0.0", git =
"https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"