This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new d2e58ce1 feat: Upgrade to DataFusion 39.0.0 (#1052)
d2e58ce1 is described below
commit d2e58ce14255eaa2d3cedb4e42c8c8af18421bca
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 13 10:13:01 2024 -0600
feat: Upgrade to DataFusion 39.0.0 (#1052)
---
Cargo.toml | 17 +++++------
ballista/client/src/context.rs | 33 +++++++++++++---------
ballista/core/Cargo.toml | 1 +
ballista/core/src/cache_layer/object_store/file.rs | 17 ++++++-----
ballista/core/src/cache_layer/object_store/mod.rs | 19 ++++++-------
ballista/core/src/cache_layer/policy/file.rs | 15 ++++++----
ballista/core/src/error.rs | 8 +++---
.../core/src/execution_plans/distributed_query.rs | 2 +-
.../core/src/execution_plans/shuffle_reader.rs | 2 +-
.../core/src/execution_plans/shuffle_writer.rs | 4 +--
.../core/src/execution_plans/unresolved_shuffle.rs | 2 +-
ballista/core/src/serde/mod.rs | 2 +-
ballista/core/src/serde/scheduler/from_proto.rs | 3 +-
ballista/executor/src/collect.rs | 4 +--
ballista/executor/src/execution_loop.rs | 10 +++++++
ballista/executor/src/executor.rs | 2 +-
ballista/scheduler/src/flight_sql.rs | 5 ++--
ballista/scheduler/src/planner.rs | 9 ++++--
ballista/scheduler/src/scheduler_server/mod.rs | 3 +-
.../src/scheduler_server/query_stage_scheduler.rs | 3 +-
ballista/scheduler/src/test_utils.rs | 3 +-
21 files changed, 96 insertions(+), 68 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index bd3629f1..3c451885 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,16 +21,17 @@ members = ["ballista-cli", "ballista/cache",
"ballista/client", "ballista/core",
resolver = "2"
[workspace.dependencies]
-arrow = { version = "51.0.0", features = ["ipc_compression"] }
-arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "51.0.0", default-features = false }
+arrow = { version = "52.0.0", features = ["ipc_compression"] }
+arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "52.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-datafusion = "38.0.0"
-datafusion-cli = "38.0.0"
-datafusion-proto = "38.0.0"
-object_store = "0.9.0"
-sqlparser = "0.45.0"
+datafusion = "39.0.0"
+datafusion-cli = "39.0.0"
+datafusion-proto = "39.0.0"
+datafusion-proto-common = "39.0.0"
+object_store = "0.10.1"
+sqlparser = "0.47.0"
tonic = { version = "0.11" }
tonic-build = { version = "0.11", default-features = false, features = [
"transport",
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index d084b6c6..de22b777 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -387,17 +387,16 @@ impl BallistaContext {
let plan = ctx.state().create_logical_plan(sql).await?;
- match plan {
+ match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
- ref has_header,
- ref delimiter,
ref table_partition_cols,
ref if_not_exists,
+ options,
..
},
)) => {
@@ -416,9 +415,17 @@ impl BallistaContext {
match (if_not_exists, table_exists) {
(_, false) => match file_type.to_lowercase().as_str() {
"csv" => {
+ let has_header = match
options.get("format.has_header") {
+ Some(str) => str.parse::<bool>().unwrap(),
+ None => false,
+ };
+ let delimiter = match
options.get("format.delimiter") {
+ Some(str) => str.chars().next().unwrap(),
+ None => ',',
+ };
let mut options = CsvReadOptions::new()
- .has_header(*has_header)
- .delimiter(*delimiter as u8)
+ .has_header(has_header)
+ .delimiter(delimiter as u8)
.table_partition_cols(table_partition_cols.to_vec());
if !schema.fields().is_empty() {
options = options.schema(&schema);
@@ -565,6 +572,7 @@ mod standalone_tests {
)
STORED AS CSV
LOCATION '{}'
+ OPTIONS ('has_header' 'false', 'delimiter' ',')
",
file_path.to_str().expect("path is utf8")
);
@@ -892,7 +900,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
- "| VAR(test.id) |",
+ "| var(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
@@ -920,7 +928,7 @@ mod standalone_tests {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
- "| VAR(test.id) |",
+ "| var(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
@@ -960,7 +968,6 @@ mod standalone_tests {
assert_result_eq(expected, &res);
}
#[tokio::test]
- #[ignore] // TODO fix this test - it never completes
async fn test_aggregate_covar() {
let context = create_test_context().await;
@@ -970,11 +977,11 @@ mod standalone_tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
- "+---------------------------------+",
- "| COVAR(test.id,test.tinyint_col) |",
- "+---------------------------------+",
- "| 0.28571428571428586 |",
- "+---------------------------------+",
+ "+--------------------------------------+",
+ "| covar_samp(test.id,test.tinyint_col) |",
+ "+--------------------------------------+",
+ "| 0.28571428571428586 |",
+ "+--------------------------------------+",
];
assert_result_eq(expected, &res);
}
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 43620ecc..fccdd0ec 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -55,6 +55,7 @@ clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { workspace = true }
datafusion-objectstore-hdfs = { version = "0.1.4", default-features = false,
optional = true }
datafusion-proto = { workspace = true }
+datafusion-proto-common = { workspace = true }
futures = "0.3"
hashbrown = "0.14"
diff --git a/ballista/core/src/cache_layer/object_store/file.rs
b/ballista/core/src/cache_layer/object_store/file.rs
index 169d2b5c..8229af1c 100644
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ b/ballista/core/src/cache_layer/object_store/file.rs
@@ -26,13 +26,12 @@ use futures::stream::{self, BoxStream, StreamExt};
use log::info;
use object_store::path::Path;
use object_store::{
- Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore,
- PutOptions, PutResult,
+ Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
-use tokio::io::AsyncWrite;
#[derive(Debug)]
pub struct FileCacheObjectStore<M>
@@ -77,7 +76,7 @@ where
async fn put(
&self,
_location: &Path,
- _bytes: Bytes,
+ _bytes: PutPayload,
) -> object_store::Result<PutResult> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
@@ -89,7 +88,7 @@ where
async fn put_opts(
&self,
_location: &Path,
- _bytes: Bytes,
+ _bytes: PutPayload,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
Err(Error::NotSupported {
@@ -102,7 +101,7 @@ where
async fn put_multipart(
&self,
_location: &Path,
- ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin +
Send>)> {
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"Write path is not supported".to_string(),
@@ -110,11 +109,11 @@ where
})
}
- async fn abort_multipart(
+ async fn put_multipart_opts(
&self,
_location: &Path,
- _multipart_id: &MultipartId,
- ) -> object_store::Result<()> {
+ _opts: PutMultipartOpts,
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"Write path is not supported".to_string(),
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs
b/ballista/core/src/cache_layer/object_store/mod.rs
index 6d754eca..71a3464e 100644
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ b/ballista/core/src/cache_layer/object_store/mod.rs
@@ -22,13 +22,12 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions,
- PutResult,
+ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
use std::sync::Arc;
-use tokio::io::AsyncWrite;
/// An [`ObjectStore`] wrapper with a specific key which is used for
registration in [`ObjectStoreRegistry`].
///
@@ -64,7 +63,7 @@ impl ObjectStore for ObjectStoreWithKey {
async fn put(
&self,
location: &Path,
- bytes: Bytes,
+ bytes: PutPayload,
) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}
@@ -72,7 +71,7 @@ impl ObjectStore for ObjectStoreWithKey {
async fn put_opts(
&self,
location: &Path,
- bytes: Bytes,
+ bytes: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
@@ -81,16 +80,16 @@ impl ObjectStore for ObjectStoreWithKey {
async fn put_multipart(
&self,
location: &Path,
- ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin +
Send>)> {
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
- async fn abort_multipart(
+ async fn put_multipart_opts(
&self,
location: &Path,
- multipart_id: &MultipartId,
- ) -> object_store::Result<()> {
- self.inner.abort_multipart(location, multipart_id).await
+ opts: PutMultipartOpts,
+ ) -> object_store::Result<Box<dyn MultipartUpload>> {
+ self.inner.put_multipart_opts(location, opts).await
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
diff --git a/ballista/core/src/cache_layer/policy/file.rs
b/ballista/core/src/cache_layer/policy/file.rs
index dfc6c83f..af4865d9 100644
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ b/ballista/core/src/cache_layer/policy/file.rs
@@ -30,7 +30,7 @@ use ballista_cache::{
};
use log::{error, info, warn};
use object_store::path::Path;
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::{ObjectMeta, ObjectStore, PutPayload};
use std::ops::Range;
use std::sync::Arc;
use tokio::runtime::Runtime;
@@ -181,11 +181,14 @@ where
data.len(),
source_location
);
- cache_store.put(&cache_location, data).await.map_err(|e| {
- BallistaError::General(format!(
- "Fail to write out data to {cache_location} due to {e}"
- ))
- })?;
+ cache_store
+ .put(&cache_location, PutPayload::from_bytes(data))
+ .await
+ .map_err(|e| {
+ BallistaError::General(format!(
+ "Fail to write out data to {cache_location} due to {e}"
+ ))
+ })?;
info!(
"Object {} has already been cached to {}",
source_location, cache_location
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index b56107c9..1ef795df 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -161,14 +161,14 @@ impl From<tokio::task::JoinError> for BallistaError {
}
}
-impl From<datafusion_proto::logical_plan::from_proto::Error> for BallistaError
{
- fn from(e: datafusion_proto::logical_plan::from_proto::Error) -> Self {
+impl From<datafusion_proto_common::from_proto::Error> for BallistaError {
+ fn from(e: datafusion_proto_common::from_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}
-impl From<datafusion_proto::logical_plan::to_proto::Error> for BallistaError {
- fn from(e: datafusion_proto::logical_plan::to_proto::Error) -> Self {
+impl From<datafusion_proto_common::to_proto::Error> for BallistaError {
+ fn from(e: datafusion_proto_common::to_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index 4e7a1c2b..b96367bb 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -166,7 +166,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index babd8f64..79dfe296 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -118,7 +118,7 @@ impl ExecutionPlan for ShuffleReaderExec {
fn properties(&self) -> &PlanProperties {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 4bff4365..7f21b18b 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -367,8 +367,8 @@ impl ExecutionPlan for ShuffleWriterExec {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.plan.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.plan]
}
fn with_new_children(
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index bb097017..b3c30c0d 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -94,7 +94,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 0293561d..08208eed 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -24,8 +24,8 @@ use arrow_flight::sql::ProstMessageExt;
use datafusion::common::DataFusionError;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
-use datafusion_proto::common::proto_error;
use
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use datafusion_proto::protobuf::proto_error;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use datafusion_proto::{
convert_required,
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs
b/ballista/core/src/serde/scheduler/from_proto.rs
index f3597ea3..4821eab2 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -421,7 +421,8 @@ fn reset_metrics_for_execution_plan(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
plan.transform(&|plan: Arc<dyn ExecutionPlan>| {
- let children = plan.children().clone();
+ let children: Vec<Arc<dyn ExecutionPlan>> =
+ plan.children().into_iter().cloned().collect();
plan.with_new_children(children).map(Transformed::yes)
})
.data()
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 4456e9c5..eb96e314 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -79,8 +79,8 @@ impl ExecutionPlan for CollectExec {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.plan.clone()]
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.plan]
}
fn with_new_children(
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 6b897b56..111120bd 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -32,6 +32,9 @@ use ballista_core::error::BallistaError;
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::TaskContext;
+use datafusion::functions_aggregate::covariance::{covar_pop_udaf,
covar_samp_udaf};
+use datafusion::functions_aggregate::sum::sum_udaf;
+use datafusion::functions_aggregate::variance::var_samp_udaf;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use futures::FutureExt;
@@ -189,6 +192,13 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
for agg_func in executor.aggregate_functions.clone() {
task_aggregate_functions.insert(agg_func.0, agg_func.1);
}
+ // since DataFusion 38 some internal functions were converted to UDAF, so
+ // we have to register them manually
+ task_aggregate_functions.insert("var".to_string(), var_samp_udaf());
+ task_aggregate_functions.insert("covar_samp".to_string(),
covar_samp_udaf());
+ task_aggregate_functions.insert("covar_pop".to_string(), covar_pop_udaf());
+ task_aggregate_functions.insert("SUM".to_string(), sum_udaf());
+
for window_func in executor.window_functions.clone() {
task_window_functions.insert(window_func.0, window_func.1);
}
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index b7c3a4d1..ccc7f273 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -289,7 +289,7 @@ mod test {
&self.properties
}
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index c3fb99a7..c6297d78 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -29,7 +29,8 @@ use arrow_flight::sql::{
CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
- CommandStatementSubstraitPlan, CommandStatementUpdate, SqlInfo,
TicketStatementQuery,
+ CommandStatementSubstraitPlan, CommandStatementUpdate,
DoPutPreparedStatementResult,
+ SqlInfo, TicketStatementQuery,
};
use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
@@ -861,7 +862,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
- ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
+ ) -> Result<DoPutPreparedStatementResult, Status> {
debug!("do_put_prepared_statement_query");
Err(Status::unimplemented(
"Implement do_put_prepared_statement_query",
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index dfaa00aa..3da9f339 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -193,7 +193,7 @@ pub fn find_unresolved_shuffles(
} else {
Ok(plan
.children()
- .iter()
+ .into_iter()
.map(find_unresolved_shuffles)
.collect::<Result<Vec<_>>>()?
.into_iter()
@@ -248,7 +248,10 @@ pub fn remove_unresolved_shuffles(
unresolved_shuffle.schema().clone(),
)?))
} else {
- new_children.push(remove_unresolved_shuffles(child,
partition_locations)?);
+ new_children.push(remove_unresolved_shuffles(
+ child.clone(),
+ partition_locations,
+ )?);
}
}
Ok(with_new_children_if_necessary(stage, new_children)?)
@@ -276,7 +279,7 @@ pub fn rollback_resolved_shuffles(
));
new_children.push(unresolved_shuffle);
} else {
- new_children.push(rollback_resolved_shuffles(child)?);
+ new_children.push(rollback_resolved_shuffles(child.clone())?);
}
}
Ok(with_new_children_if_necessary(stage, new_children)?)
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 25fa4296..e6525f18 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -342,7 +342,8 @@ mod test {
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::logical_expr::{col, sum, LogicalPlan};
+ use datafusion::functions_aggregate::sum::sum;
+ use datafusion::logical_expr::{col, LogicalPlan};
use datafusion::test_util::scan_empty;
use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 138f7090..df74ad6b 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -354,7 +354,8 @@ mod tests {
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::logical_expr::{col, sum, LogicalPlan};
+ use datafusion::functions_aggregate::sum::sum;
+ use datafusion::logical_expr::{col, LogicalPlan};
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index f1253c92..59e6a875 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -45,11 +45,12 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema,
SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext,
SessionState};
+use datafusion::functions_aggregate::sum::sum;
use datafusion::logical_expr::expr::Sort;
use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{col, count, sum, CsvReadOptions, JoinType};
+use datafusion::prelude::{col, count, CsvReadOptions, JoinType};
use datafusion::test_util::scan_empty;
use crate::cluster::BallistaCluster;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]