This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new d2e58ce1 feat: Upgrade to DataFusion 39.0.0 (#1052)
d2e58ce1 is described below

commit d2e58ce14255eaa2d3cedb4e42c8c8af18421bca
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 13 10:13:01 2024 -0600

    feat: Upgrade to DataFusion 39.0.0 (#1052)
---
 Cargo.toml                                         | 17 +++++------
 ballista/client/src/context.rs                     | 33 +++++++++++++---------
 ballista/core/Cargo.toml                           |  1 +
 ballista/core/src/cache_layer/object_store/file.rs | 17 ++++++-----
 ballista/core/src/cache_layer/object_store/mod.rs  | 19 ++++++-------
 ballista/core/src/cache_layer/policy/file.rs       | 15 ++++++----
 ballista/core/src/error.rs                         |  8 +++---
 .../core/src/execution_plans/distributed_query.rs  |  2 +-
 .../core/src/execution_plans/shuffle_reader.rs     |  2 +-
 .../core/src/execution_plans/shuffle_writer.rs     |  4 +--
 .../core/src/execution_plans/unresolved_shuffle.rs |  2 +-
 ballista/core/src/serde/mod.rs                     |  2 +-
 ballista/core/src/serde/scheduler/from_proto.rs    |  3 +-
 ballista/executor/src/collect.rs                   |  4 +--
 ballista/executor/src/execution_loop.rs            | 10 +++++++
 ballista/executor/src/executor.rs                  |  2 +-
 ballista/scheduler/src/flight_sql.rs               |  5 ++--
 ballista/scheduler/src/planner.rs                  |  9 ++++--
 ballista/scheduler/src/scheduler_server/mod.rs     |  3 +-
 .../src/scheduler_server/query_stage_scheduler.rs  |  3 +-
 ballista/scheduler/src/test_utils.rs               |  3 +-
 21 files changed, 96 insertions(+), 68 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index bd3629f1..3c451885 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,16 +21,17 @@ members = ["ballista-cli", "ballista/cache", 
"ballista/client", "ballista/core",
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "51.0.0", features = ["ipc_compression"] }
-arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "51.0.0", default-features = false }
+arrow = { version = "52.0.0", features = ["ipc_compression"] }
+arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "52.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "38.0.0"
-datafusion-cli = "38.0.0"
-datafusion-proto = "38.0.0"
-object_store = "0.9.0"
-sqlparser = "0.45.0"
+datafusion = "39.0.0"
+datafusion-cli = "39.0.0"
+datafusion-proto = "39.0.0"
+datafusion-proto-common = "39.0.0"
+object_store = "0.10.1"
+sqlparser = "0.47.0"
 tonic = { version = "0.11" }
 tonic-build = { version = "0.11", default-features = false, features = [
     "transport",
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index d084b6c6..de22b777 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -387,17 +387,16 @@ impl BallistaContext {
 
         let plan = ctx.state().create_logical_plan(sql).await?;
 
-        match plan {
+        match &plan {
             LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
                 CreateExternalTable {
                     ref schema,
                     ref name,
                     ref location,
                     ref file_type,
-                    ref has_header,
-                    ref delimiter,
                     ref table_partition_cols,
                     ref if_not_exists,
+                    options,
                     ..
                 },
             )) => {
@@ -416,9 +415,17 @@ impl BallistaContext {
                 match (if_not_exists, table_exists) {
                     (_, false) => match file_type.to_lowercase().as_str() {
                         "csv" => {
+                            let has_header = match 
options.get("format.has_header") {
+                                Some(str) => str.parse::<bool>().unwrap(),
+                                None => false,
+                            };
+                            let delimiter = match 
options.get("format.delimiter") {
+                                Some(str) => str.chars().next().unwrap(),
+                                None => ',',
+                            };
                             let mut options = CsvReadOptions::new()
-                                .has_header(*has_header)
-                                .delimiter(*delimiter as u8)
+                                .has_header(has_header)
+                                .delimiter(delimiter as u8)
                                 
.table_partition_cols(table_partition_cols.to_vec());
                             if !schema.fields().is_empty() {
                                 options = options.schema(&schema);
@@ -565,6 +572,7 @@ mod standalone_tests {
               )
               STORED AS CSV
               LOCATION '{}'
+              OPTIONS ('has_header' 'false', 'delimiter' ',')
               ",
             file_path.to_str().expect("path is utf8")
         );
@@ -892,7 +900,7 @@ mod standalone_tests {
         let res = df.collect().await.unwrap();
         let expected = vec![
             "+-------------------+",
-            "| VAR(test.id)      |",
+            "| var(test.id)      |",
             "+-------------------+",
             "| 6.000000000000001 |",
             "+-------------------+",
@@ -920,7 +928,7 @@ mod standalone_tests {
         let res = df.collect().await.unwrap();
         let expected = vec![
             "+-------------------+",
-            "| VAR(test.id)      |",
+            "| var(test.id)      |",
             "+-------------------+",
             "| 6.000000000000001 |",
             "+-------------------+",
@@ -960,7 +968,6 @@ mod standalone_tests {
         assert_result_eq(expected, &res);
     }
     #[tokio::test]
-    #[ignore] // TODO fix this test - it never completes
     async fn test_aggregate_covar() {
         let context = create_test_context().await;
 
@@ -970,11 +977,11 @@ mod standalone_tests {
             .unwrap();
         let res = df.collect().await.unwrap();
         let expected = vec![
-            "+---------------------------------+",
-            "| COVAR(test.id,test.tinyint_col) |",
-            "+---------------------------------+",
-            "| 0.28571428571428586             |",
-            "+---------------------------------+",
+            "+--------------------------------------+",
+            "| covar_samp(test.id,test.tinyint_col) |",
+            "+--------------------------------------+",
+            "| 0.28571428571428586                  |",
+            "+--------------------------------------+",
         ];
         assert_result_eq(expected, &res);
     }
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 43620ecc..fccdd0ec 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -55,6 +55,7 @@ clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { workspace = true }
 datafusion-objectstore-hdfs = { version = "0.1.4", default-features = false, 
optional = true }
 datafusion-proto = { workspace = true }
+datafusion-proto-common = { workspace = true }
 futures = "0.3"
 hashbrown = "0.14"
 
diff --git a/ballista/core/src/cache_layer/object_store/file.rs 
b/ballista/core/src/cache_layer/object_store/file.rs
index 169d2b5c..8229af1c 100644
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ b/ballista/core/src/cache_layer/object_store/file.rs
@@ -26,13 +26,12 @@ use futures::stream::{self, BoxStream, StreamExt};
 use log::info;
 use object_store::path::Path;
 use object_store::{
-    Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore,
-    PutOptions, PutResult,
+    Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult,
 };
 use std::fmt::{Debug, Display, Formatter};
 use std::ops::Range;
 use std::sync::Arc;
-use tokio::io::AsyncWrite;
 
 #[derive(Debug)]
 pub struct FileCacheObjectStore<M>
@@ -77,7 +76,7 @@ where
     async fn put(
         &self,
         _location: &Path,
-        _bytes: Bytes,
+        _bytes: PutPayload,
     ) -> object_store::Result<PutResult> {
         Err(Error::NotSupported {
             source: Box::new(BallistaError::General(
@@ -89,7 +88,7 @@ where
     async fn put_opts(
         &self,
         _location: &Path,
-        _bytes: Bytes,
+        _bytes: PutPayload,
         _opts: PutOptions,
     ) -> object_store::Result<PutResult> {
         Err(Error::NotSupported {
@@ -102,7 +101,7 @@ where
     async fn put_multipart(
         &self,
         _location: &Path,
-    ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + 
Send>)> {
+    ) -> object_store::Result<Box<dyn MultipartUpload>> {
         Err(Error::NotSupported {
             source: Box::new(BallistaError::General(
                 "Write path is not supported".to_string(),
@@ -110,11 +109,11 @@ where
         })
     }
 
-    async fn abort_multipart(
+    async fn put_multipart_opts(
         &self,
         _location: &Path,
-        _multipart_id: &MultipartId,
-    ) -> object_store::Result<()> {
+        _opts: PutMultipartOpts,
+    ) -> object_store::Result<Box<dyn MultipartUpload>> {
         Err(Error::NotSupported {
             source: Box::new(BallistaError::General(
                 "Write path is not supported".to_string(),
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs 
b/ballista/core/src/cache_layer/object_store/mod.rs
index 6d754eca..71a3464e 100644
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ b/ballista/core/src/cache_layer/object_store/mod.rs
@@ -22,13 +22,12 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use object_store::path::Path;
 use object_store::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions,
-    PutResult,
+    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, 
ObjectStore,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult,
 };
 use std::fmt::{Debug, Display, Formatter};
 use std::ops::Range;
 use std::sync::Arc;
-use tokio::io::AsyncWrite;
 
 /// An [`ObjectStore`] wrapper with a specific key which is used for 
registration in [`ObjectStoreRegistry`].
 ///
@@ -64,7 +63,7 @@ impl ObjectStore for ObjectStoreWithKey {
     async fn put(
         &self,
         location: &Path,
-        bytes: Bytes,
+        bytes: PutPayload,
     ) -> object_store::Result<PutResult> {
         self.inner.put(location, bytes).await
     }
@@ -72,7 +71,7 @@ impl ObjectStore for ObjectStoreWithKey {
     async fn put_opts(
         &self,
         location: &Path,
-        bytes: Bytes,
+        bytes: PutPayload,
         opts: PutOptions,
     ) -> object_store::Result<PutResult> {
         self.inner.put_opts(location, bytes, opts).await
@@ -81,16 +80,16 @@ impl ObjectStore for ObjectStoreWithKey {
     async fn put_multipart(
         &self,
         location: &Path,
-    ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + 
Send>)> {
+    ) -> object_store::Result<Box<dyn MultipartUpload>> {
         self.inner.put_multipart(location).await
     }
 
-    async fn abort_multipart(
+    async fn put_multipart_opts(
         &self,
         location: &Path,
-        multipart_id: &MultipartId,
-    ) -> object_store::Result<()> {
-        self.inner.abort_multipart(location, multipart_id).await
+        opts: PutMultipartOpts,
+    ) -> object_store::Result<Box<dyn MultipartUpload>> {
+        self.inner.put_multipart_opts(location, opts).await
     }
 
     async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
diff --git a/ballista/core/src/cache_layer/policy/file.rs 
b/ballista/core/src/cache_layer/policy/file.rs
index dfc6c83f..af4865d9 100644
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ b/ballista/core/src/cache_layer/policy/file.rs
@@ -30,7 +30,7 @@ use ballista_cache::{
 };
 use log::{error, info, warn};
 use object_store::path::Path;
-use object_store::{ObjectMeta, ObjectStore};
+use object_store::{ObjectMeta, ObjectStore, PutPayload};
 use std::ops::Range;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
@@ -181,11 +181,14 @@ where
         data.len(),
         source_location
     );
-    cache_store.put(&cache_location, data).await.map_err(|e| {
-        BallistaError::General(format!(
-            "Fail to write out data to {cache_location} due to {e}"
-        ))
-    })?;
+    cache_store
+        .put(&cache_location, PutPayload::from_bytes(data))
+        .await
+        .map_err(|e| {
+            BallistaError::General(format!(
+                "Fail to write out data to {cache_location} due to {e}"
+            ))
+        })?;
     info!(
         "Object {} has already been cached to {}",
         source_location, cache_location
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index b56107c9..1ef795df 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -161,14 +161,14 @@ impl From<tokio::task::JoinError> for BallistaError {
     }
 }
 
-impl From<datafusion_proto::logical_plan::from_proto::Error> for BallistaError 
{
-    fn from(e: datafusion_proto::logical_plan::from_proto::Error) -> Self {
+impl From<datafusion_proto_common::from_proto::Error> for BallistaError {
+    fn from(e: datafusion_proto_common::from_proto::Error) -> Self {
         BallistaError::General(e.to_string())
     }
 }
 
-impl From<datafusion_proto::logical_plan::to_proto::Error> for BallistaError {
-    fn from(e: datafusion_proto::logical_plan::to_proto::Error) -> Self {
+impl From<datafusion_proto_common::to_proto::Error> for BallistaError {
+    fn from(e: datafusion_proto_common::to_proto::Error) -> Self {
         BallistaError::General(e.to_string())
     }
 }
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index 4e7a1c2b..b96367bb 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -166,7 +166,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         &self.properties
     }
 
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index babd8f64..79dfe296 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -118,7 +118,7 @@ impl ExecutionPlan for ShuffleReaderExec {
     fn properties(&self) -> &PlanProperties {
         &self.properties
     }
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 4bff4365..7f21b18b 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -367,8 +367,8 @@ impl ExecutionPlan for ShuffleWriterExec {
         &self.properties
     }
 
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.plan.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.plan]
     }
 
     fn with_new_children(
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index bb097017..b3c30c0d 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -94,7 +94,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
         &self.properties
     }
 
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
         vec![]
     }
 
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 0293561d..08208eed 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -24,8 +24,8 @@ use arrow_flight::sql::ProstMessageExt;
 use datafusion::common::DataFusionError;
 use datafusion::execution::FunctionRegistry;
 use datafusion::physical_plan::{ExecutionPlan, Partitioning};
-use datafusion_proto::common::proto_error;
 use 
datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use datafusion_proto::protobuf::proto_error;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
 use datafusion_proto::{
     convert_required,
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs 
b/ballista/core/src/serde/scheduler/from_proto.rs
index f3597ea3..4821eab2 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -421,7 +421,8 @@ fn reset_metrics_for_execution_plan(
     plan: Arc<dyn ExecutionPlan>,
 ) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
     plan.transform(&|plan: Arc<dyn ExecutionPlan>| {
-        let children = plan.children().clone();
+        let children: Vec<Arc<dyn ExecutionPlan>> =
+            plan.children().into_iter().cloned().collect();
         plan.with_new_children(children).map(Transformed::yes)
     })
     .data()
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 4456e9c5..eb96e314 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -79,8 +79,8 @@ impl ExecutionPlan for CollectExec {
         &self.properties
     }
 
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.plan.clone()]
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.plan]
     }
 
     fn with_new_children(
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 6b897b56..111120bd 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -32,6 +32,9 @@ use ballista_core::error::BallistaError;
 use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
 use ballista_core::serde::BallistaCodec;
 use datafusion::execution::context::TaskContext;
+use datafusion::functions_aggregate::covariance::{covar_pop_udaf, 
covar_samp_udaf};
+use datafusion::functions_aggregate::sum::sum_udaf;
+use datafusion::functions_aggregate::variance::var_samp_udaf;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
 use futures::FutureExt;
@@ -189,6 +192,13 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     for agg_func in executor.aggregate_functions.clone() {
         task_aggregate_functions.insert(agg_func.0, agg_func.1);
     }
+    // since DataFusion 38 some internal functions were converted to UDAF, so
+    // we have to register them manually
+    task_aggregate_functions.insert("var".to_string(), var_samp_udaf());
+    task_aggregate_functions.insert("covar_samp".to_string(), 
covar_samp_udaf());
+    task_aggregate_functions.insert("covar_pop".to_string(), covar_pop_udaf());
+    task_aggregate_functions.insert("SUM".to_string(), sum_udaf());
+
     for window_func in executor.window_functions.clone() {
         task_window_functions.insert(window_func.0, window_func.1);
     }
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index b7c3a4d1..ccc7f273 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -289,7 +289,7 @@ mod test {
             &self.properties
         }
 
-        fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
             vec![]
         }
 
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index c3fb99a7..c6297d78 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -29,7 +29,8 @@ use arrow_flight::sql::{
     CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
     CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
     CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
CommandStatementQuery,
-    CommandStatementSubstraitPlan, CommandStatementUpdate, SqlInfo, 
TicketStatementQuery,
+    CommandStatementSubstraitPlan, CommandStatementUpdate, 
DoPutPreparedStatementResult,
+    SqlInfo, TicketStatementQuery,
 };
 use arrow_flight::{
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, 
HandshakeRequest,
@@ -861,7 +862,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         &self,
         _query: CommandPreparedStatementQuery,
         _request: Request<PeekableFlightDataStream>,
-    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
+    ) -> Result<DoPutPreparedStatementResult, Status> {
         debug!("do_put_prepared_statement_query");
         Err(Status::unimplemented(
             "Implement do_put_prepared_statement_query",
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index dfaa00aa..3da9f339 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -193,7 +193,7 @@ pub fn find_unresolved_shuffles(
     } else {
         Ok(plan
             .children()
-            .iter()
+            .into_iter()
             .map(find_unresolved_shuffles)
             .collect::<Result<Vec<_>>>()?
             .into_iter()
@@ -248,7 +248,10 @@ pub fn remove_unresolved_shuffles(
                 unresolved_shuffle.schema().clone(),
             )?))
         } else {
-            new_children.push(remove_unresolved_shuffles(child, 
partition_locations)?);
+            new_children.push(remove_unresolved_shuffles(
+                child.clone(),
+                partition_locations,
+            )?);
         }
     }
     Ok(with_new_children_if_necessary(stage, new_children)?)
@@ -276,7 +279,7 @@ pub fn rollback_resolved_shuffles(
             ));
             new_children.push(unresolved_shuffle);
         } else {
-            new_children.push(rollback_resolved_shuffles(child)?);
+            new_children.push(rollback_resolved_shuffles(child.clone())?);
         }
     }
     Ok(with_new_children_if_necessary(stage, new_children)?)
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 25fa4296..e6525f18 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -342,7 +342,8 @@ mod test {
     use std::sync::Arc;
 
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::logical_expr::{col, sum, LogicalPlan};
+    use datafusion::functions_aggregate::sum::sum;
+    use datafusion::logical_expr::{col, LogicalPlan};
 
     use datafusion::test_util::scan_empty;
     use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 138f7090..df74ad6b 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -354,7 +354,8 @@ mod tests {
     use ballista_core::config::TaskSchedulingPolicy;
     use ballista_core::error::Result;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::logical_expr::{col, sum, LogicalPlan};
+    use datafusion::functions_aggregate::sum::sum;
+    use datafusion::logical_expr::{col, LogicalPlan};
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index f1253c92..59e6a875 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -45,11 +45,12 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, 
SchemaRef};
 use datafusion::common::DataFusionError;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::execution::context::{SessionConfig, SessionContext, 
SessionState};
+use datafusion::functions_aggregate::sum::sum;
 use datafusion::logical_expr::expr::Sort;
 use datafusion::logical_expr::{Expr, LogicalPlan};
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::{col, count, sum, CsvReadOptions, JoinType};
+use datafusion::prelude::{col, count, CsvReadOptions, JoinType};
 use datafusion::test_util::scan_empty;
 
 use crate::cluster::BallistaCluster;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to