This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new a3ac8575 Upgrade to DataFusion 18.0.0-rc1 (#664)
a3ac8575 is described below

commit a3ac85750a316a0b15921bf669b18cd457182192
Author: Andy Grove <[email protected]>
AuthorDate: Sat Feb 11 08:05:18 2023 -0700

    Upgrade to DataFusion 18.0.0-rc1 (#664)
---
 ballista-cli/Cargo.toml                            |  4 +--
 ballista/client/Cargo.toml                         |  4 +--
 ballista/client/src/context.rs                     |  9 +++--
 ballista/core/Cargo.toml                           |  6 ++--
 ballista/core/src/client.rs                        |  8 +++--
 ballista/core/src/error.rs                         |  5 ++-
 .../core/src/execution_plans/distributed_query.rs  |  4 +--
 .../core/src/execution_plans/shuffle_reader.rs     |  6 ++--
 ballista/executor/Cargo.toml                       |  8 ++---
 ballista/executor/src/collect.rs                   |  6 ++--
 ballista/executor/src/executor.rs                  |  4 +--
 ballista/scheduler/Cargo.toml                      |  6 ++--
 ballista/scheduler/src/planner.rs                  | 41 +++++++++-------------
 benchmarks/Cargo.toml                              |  4 +--
 examples/Cargo.toml                                |  2 +-
 15 files changed, 59 insertions(+), 58 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 48261075..7787ba06 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", 
features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "17.0.0"
-datafusion-cli = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
+datafusion-cli = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 dirs = "4.0.0"
 env_logger = "0.10"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index f57a6f2b..8e3db0f5 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.63"
 ballista-core = { path = "../core", version = "0.10.0" }
 ballista-executor = { path = "../executor", version = "0.10.0", optional = 
true }
 ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = 
true }
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 3269592d..b3b17c85 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -21,6 +21,7 @@ use datafusion::arrow::datatypes::SchemaRef;
 use log::info;
 use parking_lot::Mutex;
 use sqlparser::ast::Statement;
+use std::borrow::Cow;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -355,10 +356,14 @@ impl BallistaContext {
             let state = self.state.lock();
             for (name, prov) in &state.tables {
                 // ctx is shared between queries, check table exists or not 
before register
-                let table_ref = TableReference::Bare { table: name };
+                let table_ref = TableReference::Bare {
+                    table: Cow::Borrowed(name),
+                };
                 if !ctx.table_exist(table_ref)? {
                     ctx.register_table(
-                        TableReference::Bare { table: name },
+                        TableReference::Bare {
+                            table: Cow::Borrowed(name),
+                        },
                         Arc::clone(prov),
                     )?;
                 }
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 09272958..6da6b730 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, 
optional = true }
-datafusion-proto = "17.0.0"
+datafusion-proto = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 futures = "0.3"
 hashbrown = "0.13"
 
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d91c6fac..d4179b32 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -34,9 +34,10 @@ use 
arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
 use datafusion::arrow::array::ArrayRef;
 use datafusion::arrow::{
     datatypes::{Schema, SchemaRef},
-    error::{ArrowError, Result as ArrowResult},
+    error::ArrowError,
     record_batch::RecordBatch,
 };
+use datafusion::error::DataFusionError;
 
 use crate::serde::protobuf;
 use crate::utils::create_grpc_client_connection;
@@ -162,7 +163,7 @@ impl FlightDataStream {
 }
 
 impl Stream for FlightDataStream {
-    type Item = ArrowResult<RecordBatch>;
+    type Item = datafusion::error::Result<RecordBatch>;
 
     fn poll_next(
         mut self: std::pin::Pin<&mut Self>,
@@ -171,13 +172,14 @@ impl Stream for FlightDataStream {
         self.stream.poll_next_unpin(cx).map(|x| match x {
             Some(flight_data_chunk_result) => {
                 let converted_chunk = flight_data_chunk_result
-                    .map_err(|e| ArrowError::from_external_error(Box::new(e)))
+                    .map_err(|e| 
ArrowError::from_external_error(Box::new(e)).into())
                     .and_then(|flight_data_chunk| {
                         flight_data_to_arrow_batch(
                             &flight_data_chunk,
                             self.schema.clone(),
                             &self.dictionaries_by_id,
                         )
+                        .map_err(DataFusionError::ArrowError)
                     });
                 Some(converted_chunk)
             }
diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs
index e22ae9d6..67f75283 100644
--- a/ballista/core/src/error.rs
+++ b/ballista/core/src/error.rs
@@ -100,7 +100,10 @@ impl From<parser::ParserError> for BallistaError {
 
 impl From<DataFusionError> for BallistaError {
     fn from(e: DataFusionError) -> Self {
-        BallistaError::DataFusionError(e)
+        match e {
+            DataFusionError::ArrowError(e) => Self::from(e),
+            _ => BallistaError::DataFusionError(e),
+        }
     }
 }
 
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index 75348692..ac461e14 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -25,7 +25,7 @@ use crate::serde::protobuf::{
 };
 use crate::utils::create_grpc_client_connection;
 use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
@@ -227,7 +227,7 @@ async fn execute_query(
     scheduler_url: String,
     session_id: String,
     query: ExecuteQueryParams,
-) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
+) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
     info!("Connecting to Ballista scheduler at {}", scheduler_url);
     // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
     let connection = create_grpc_client_connection(scheduler_url)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 73733bcb..20c31c50 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -29,7 +29,7 @@ use crate::client::BallistaClient;
 use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 
 use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::ipc::reader::FileReader;
 use datafusion::arrow::record_batch::RecordBatch;
 
@@ -208,14 +208,14 @@ impl LocalShuffleStream {
 }
 
 impl Stream for LocalShuffleStream {
-    type Item = ArrowResult<RecordBatch>;
+    type Item = Result<RecordBatch>;
 
     fn poll_next(
         mut self: Pin<&mut Self>,
         _: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
         if let Some(batch) = self.reader.next() {
-            return Poll::Ready(Some(batch));
+            return Poll::Ready(Some(batch.map_err(|e| e.into())));
         }
         Poll::Ready(None)
     }
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 7849d1e5..c2b667f5 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -38,15 +38,15 @@ default = ["mimalloc"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "31.0.0" }
-arrow-flight = { version = "31.0.0" }
+arrow = { version = "32.0.0" }
+arrow-flight = { version = "32.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 2985e914..65e1c51c 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -22,9 +22,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::{any::Any, pin::Pin};
 
-use datafusion::arrow::{
-    datatypes::SchemaRef, error::Result as ArrowResult, 
record_batch::RecordBatch,
-};
+use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
@@ -118,7 +116,7 @@ struct MergedRecordBatchStream {
 }
 
 impl Stream for MergedRecordBatchStream {
-    type Item = ArrowResult<RecordBatch>;
+    type Item = Result<RecordBatch>;
 
     fn poll_next(
         mut self: Pin<&mut Self>,
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 1c25baa3..d903db69 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -182,13 +182,13 @@ mod test {
     use crate::executor::Executor;
     use crate::metrics::LoggingMetricsCollector;
     use arrow::datatypes::{Schema, SchemaRef};
-    use arrow::error::ArrowError;
     use arrow::record_batch::RecordBatch;
     use ballista_core::execution_plans::ShuffleWriterExec;
     use ballista_core::serde::protobuf::ExecutorRegistration;
     use datafusion::execution::context::TaskContext;
 
     use ballista_core::serde::scheduler::PartitionId;
+    use datafusion::error::DataFusionError;
     use datafusion::physical_expr::PhysicalSortExpr;
     use datafusion::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
@@ -213,7 +213,7 @@ mod test {
     }
 
     impl Stream for NeverendingRecordBatchStream {
-        type Item = Result<RecordBatch, ArrowError>;
+        type Item = Result<RecordBatch, DataFusionError>;
 
         fn poll_next(
             self: Pin<&mut Self>,
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 64da40d7..2375bfb0 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ sled = ["sled_package", "tokio-stream"]
 
 [dependencies]
 anyhow = "1"
-arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
@@ -51,8 +51,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 etcd-client = { version = "0.10", optional = true }
 flatbuffers = { version = "22.9.29" }
 futures = "0.3"
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index 87aaad77..dc069872 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -326,17 +326,15 @@ mod test {
     use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use datafusion::physical_plan::joins::HashJoinExec;
+    use datafusion::physical_plan::projection::ProjectionExec;
     use datafusion::physical_plan::sorts::sort::SortExec;
-    use datafusion::physical_plan::{
-        coalesce_partitions::CoalescePartitionsExec, 
projection::ProjectionExec,
-    };
+    use 
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use datafusion::physical_plan::{displayable, ExecutionPlan};
     use datafusion::prelude::SessionContext;
-    use std::ops::Deref;
-
     use datafusion_proto::physical_plan::AsExecutionPlan;
     use datafusion_proto::protobuf::LogicalPlanNode;
     use datafusion_proto::protobuf::PhysicalPlanNode;
+    use std::ops::Deref;
     use std::sync::Arc;
     use uuid::Uuid;
 
@@ -375,19 +373,19 @@ mod test {
         /* Expected result:
 
         ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 
}], 2))
-          AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], 
aggr=[SUM(l_extendedprice Multiply Int64(1))]
-            CsvExec: source=Path(testdata/lineitem: 
[testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), 
has_header=false
+          AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], 
aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
+            CsvExec: files={2 groups: 
[[ballista/scheduler/testdata/lineitem/partition1.tbl], 
[ballista/scheduler/testdata/lineitem/partition0.tbl]]}, has_header=false, 
limit=None, projection=[l_extendedprice, l_returnflag]
 
         ShuffleWriterExec: None
-          ProjectionExec: expr=[l_returnflag@0 as l_returnflag, 
SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
-            AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
-              CoalesceBatchesExec: target_batch_size=4096
-                UnresolvedShuffleExec
+          SortExec: [l_returnflag@0 ASC NULLS LAST]
+            ProjectionExec: expr=[l_returnflag@0 as l_returnflag, 
SUM(lineitem.l_extendedprice * Int64(1))@1 as sum_disc_price]
+              AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as 
l_returnflag], aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
+                CoalesceBatchesExec: target_batch_size=8192
+                  UnresolvedShuffleExec
 
         ShuffleWriterExec: None
-          SortExec: [l_returnflag@0 ASC]
-            CoalescePartitionsExec
-              UnresolvedShuffleExec
+          SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST]
+            UnresolvedShuffleExec
         */
 
         assert_eq!(3, stages.len());
@@ -399,7 +397,9 @@ mod test {
 
         // verify stage 1
         let stage1 = stages[1].children()[0].clone();
-        let projection = downcast_exec!(stage1, ProjectionExec);
+        let sort = downcast_exec!(stage1, SortExec);
+        let projection = sort.children()[0].clone();
+        let projection = downcast_exec!(projection, ProjectionExec);
         let final_hash = projection.children()[0].clone();
         let final_hash = downcast_exec!(final_hash, AggregateExec);
         assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
@@ -414,15 +414,8 @@ mod test {
 
         // verify stage 2
         let stage2 = stages[2].children()[0].clone();
-        let sort = downcast_exec!(stage2, SortExec);
-        let coalesce_partitions = sort.children()[0].clone();
-        let coalesce_partitions =
-            downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
-        assert_eq!(
-            coalesce_partitions.output_partitioning().partition_count(),
-            1
-        );
-        let unresolved_shuffle = coalesce_partitions.children()[0].clone();
+        let merge = downcast_exec!(stage2, SortPreservingMergeExec);
+        let unresolved_shuffle = merge.children()[0].clone();
         let unresolved_shuffle =
             downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
         assert_eq!(unresolved_shuffle.stage_id, 2);
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 0f437d07..a960a0c8 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "17.0.0"
-datafusion-proto = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
+datafusion-proto = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 env_logger = "0.10"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 18fbaad5..d381c029 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "17.0.0"
+datafusion = { version = "18.0.0", git = 
"https://github.com/apache/arrow-datafusion";, rev = "18.0.0-rc1" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"

Reply via email to