This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new e4191dc7 Update to DataFusion 33 (#900)
e4191dc7 is described below

commit e4191dc7b76c00b319a7f04b0fdc452d6b2886eb
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Sun Nov 26 19:13:21 2023 +0000

    Update to DataFusion 33 (#900)
    
    * Update to DataFusion 33
    
    * Test fixes
    
    * Fix
    
    * Upgrade crates
    
    * Tests
    
    * Tests
    
    * Use parquet file instead
---
 Cargo.toml                                         | 14 ++---
 ballista/client/src/columnar_batch.rs              |  8 ++-
 ballista/client/src/context.rs                     | 34 +++++------
 ballista/core/Cargo.toml                           |  2 +-
 .../core/src/execution_plans/distributed_query.rs  |  4 +-
 .../core/src/execution_plans/shuffle_reader.rs     | 68 +++++++++++-----------
 .../core/src/execution_plans/shuffle_writer.rs     |  2 +-
 .../core/src/execution_plans/unresolved_shuffle.rs |  4 +-
 ballista/executor/src/collect.rs                   |  2 +-
 ballista/executor/src/executor.rs                  |  6 +-
 ballista/scheduler/Cargo.toml                      |  6 +-
 ballista/scheduler/src/flight_sql.rs               |  5 +-
 examples/examples/standalone-sql.rs                | 10 ++--
 13 files changed, 84 insertions(+), 81 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 6f34fa06..e4b5f324 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "47.0.0" }
-arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "47.0.0", default-features = false }
+arrow = { version = "48.0.0" }
+arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "48.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "32.0.0"
-datafusion-cli = "32.0.0"
-datafusion-proto = "32.0.0"
+datafusion = "33.0.0"
+datafusion-cli = "33.0.0"
+datafusion-proto = "33.0.0"
 object_store = "0.7.0"
-sqlparser = "0.38.0"
+sqlparser = "0.39.0"
 tonic = { version = "0.10" }
 tonic-build = { version = "0.10", default-features = false, features = [
     "transport",
diff --git a/ballista/client/src/columnar_batch.rs 
b/ballista/client/src/columnar_batch.rs
index 3431f561..5e7fe89b 100644
--- a/ballista/client/src/columnar_batch.rs
+++ b/ballista/client/src/columnar_batch.rs
@@ -147,10 +147,12 @@ impl ColumnarValue {
         }
     }
 
-    pub fn to_arrow(&self) -> ArrayRef {
+    pub fn to_arrow(&self) -> Result<ArrayRef> {
         match self {
-            ColumnarValue::Columnar(array) => array.clone(),
-            ColumnarValue::Scalar(value, n) => value.to_array_of_size(*n),
+            ColumnarValue::Columnar(array) => Ok(array.clone()),
+            ColumnarValue::Scalar(value, n) => {
+                value.to_array_of_size(*n).map_err(|x| x.into())
+            }
         }
     }
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 76c8d439..82ca1710 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -839,7 +839,7 @@ mod tests {
         let res = df.collect().await.unwrap();
         let expected = vec![
             "+-------------------+",
-            "| VARIANCE(test.id) |",
+            "| VAR(test.id)      |",
             "+-------------------+",
             "| 6.000000000000001 |",
             "+-------------------+",
@@ -852,11 +852,11 @@ mod tests {
             .unwrap();
         let res = df.collect().await.unwrap();
         let expected = vec![
-            "+-----------------------+",
-            "| VARIANCE_POP(test.id) |",
-            "+-----------------------+",
-            "| 5.250000000000001     |",
-            "+-----------------------+",
+            "+-------------------+",
+            "| VAR_POP(test.id)  |",
+            "+-------------------+",
+            "| 5.250000000000001 |",
+            "+-------------------+",
         ];
         assert_result_eq(expected, &res);
 
@@ -867,7 +867,7 @@ mod tests {
         let res = df.collect().await.unwrap();
         let expected = vec![
             "+-------------------+",
-            "| VARIANCE(test.id) |",
+            "| VAR(test.id)      |",
             "+-------------------+",
             "| 6.000000000000001 |",
             "+-------------------+",
@@ -908,11 +908,11 @@ mod tests {
             .unwrap();
         let res = df.collect().await.unwrap();
         let expected = vec![
-            "+--------------------------------------+",
-            "| COVARIANCE(test.id,test.tinyint_col) |",
-            "+--------------------------------------+",
-            "| 0.28571428571428586                  |",
-            "+--------------------------------------+",
+            "+---------------------------------+",
+            "| COVAR(test.id,test.tinyint_col) |",
+            "+---------------------------------+",
+            "| 0.28571428571428586             |",
+            "+---------------------------------+",
         ];
         assert_result_eq(expected, &res);
 
@@ -922,11 +922,11 @@ mod tests {
             .unwrap();
         let res = df.collect().await.unwrap();
         let expected = vec![
-            "+---------------------------------------+",
-            "| CORRELATION(test.id,test.tinyint_col) |",
-            "+---------------------------------------+",
-            "| 0.21821789023599245                   |",
-            "+---------------------------------------+",
+            "+--------------------------------+",
+            "| CORR(test.id,test.tinyint_col) |",
+            "+--------------------------------+",
+            "| 0.21821789023599245            |",
+            "+--------------------------------+",
         ];
         assert_result_eq(expected, &res);
 
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 31a5d495..d8d4bfcc 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -57,7 +57,7 @@ datafusion-proto = { workspace = true }
 futures = "0.3"
 hashbrown = "0.14"
 
-itertools = "0.11"
+itertools = "0.12"
 libloading = "0.8.0"
 log = "0.4"
 md-5 = { version = "^0.10.0" }
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index ccb26206..13511173 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -210,11 +210,11 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(&self) -> Result<Statistics> {
         // This execution plan sends the logical plan to the scheduler without
         // performing the node by node conversion to a full physical plan.
         // This implies that we cannot infer the statistics at this stage.
-        Statistics::default()
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
 
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index fa3f9f69..6a77a16e 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use datafusion::common::stats::Precision;
 use std::any::Any;
 use std::collections::HashMap;
 use std::fmt::Debug;
@@ -37,8 +38,8 @@ use datafusion::error::Result;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
+    RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use futures::{Stream, StreamExt, TryStreamExt};
 
@@ -172,37 +173,39 @@ impl ExecutionPlan for ShuffleReaderExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn statistics(&self) -> Statistics {
-        stats_for_partitions(
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(stats_for_partitions(
+            self.schema.fields().len(),
             self.partition
                 .iter()
                 .flatten()
                 .map(|loc| loc.partition_stats),
-        )
+        ))
     }
 }
 
 fn stats_for_partitions(
+    num_fields: usize,
     partition_stats: impl Iterator<Item = PartitionStats>,
 ) -> Statistics {
     // TODO stats: add column statistics to PartitionStats
-    partition_stats.fold(
-        Statistics {
-            is_exact: true,
-            num_rows: Some(0),
-            total_byte_size: Some(0),
-            column_statistics: None,
-        },
-        |mut acc, part| {
+    let (num_rows, total_byte_size) =
+        partition_stats.fold((Some(0), Some(0)), |(num_rows, total_byte_size), 
part| {
             // if any statistic is unkown it makes the entire statistic unkown
-            acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b 
as usize);
-            acc.total_byte_size = acc
-                .total_byte_size
+            let num_rows = num_rows.zip(part.num_rows).map(|(a, b)| a + b as 
usize);
+            let total_byte_size = total_byte_size
                 .zip(part.num_bytes)
                 .map(|(a, b)| a + b as usize);
-            acc
-        },
-    )
+            (num_rows, total_byte_size)
+        });
+
+    Statistics {
+        num_rows: num_rows.map(Precision::Exact).unwrap_or(Precision::Absent),
+        total_byte_size: total_byte_size
+            .map(Precision::Exact)
+            .unwrap_or(Precision::Absent),
+        column_statistics: vec![ColumnStatistics::new_unknown(); num_fields],
+    }
 }
 
 struct LocalShuffleStream {
@@ -445,13 +448,12 @@ mod tests {
 
     #[tokio::test]
     async fn test_stats_for_partitions_empty() {
-        let result = stats_for_partitions(std::iter::empty());
+        let result = stats_for_partitions(0, std::iter::empty());
 
         let exptected = Statistics {
-            is_exact: true,
-            num_rows: Some(0),
-            total_byte_size: Some(0),
-            column_statistics: None,
+            num_rows: Precision::Exact(0),
+            total_byte_size: Precision::Exact(0),
+            column_statistics: vec![],
         };
 
         assert_eq!(result, exptected);
@@ -472,13 +474,12 @@ mod tests {
             },
         ];
 
-        let result = stats_for_partitions(part_stats.into_iter());
+        let result = stats_for_partitions(0, part_stats.into_iter());
 
         let exptected = Statistics {
-            is_exact: true,
-            num_rows: Some(14),
-            total_byte_size: Some(149),
-            column_statistics: None,
+            num_rows: Precision::Exact(14),
+            total_byte_size: Precision::Exact(149),
+            column_statistics: vec![],
         };
 
         assert_eq!(result, exptected);
@@ -499,13 +500,12 @@ mod tests {
             },
         ];
 
-        let result = stats_for_partitions(part_stats.into_iter());
+        let result = stats_for_partitions(0, part_stats.into_iter());
 
         let exptected = Statistics {
-            is_exact: true,
-            num_rows: None,
-            total_byte_size: None,
-            column_statistics: None,
+            num_rows: Precision::Absent,
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![],
         };
 
         assert_eq!(result, exptected);
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 24869b2c..1896c206 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -418,7 +418,7 @@ impl ExecutionPlan for ShuffleWriterExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(&self) -> Result<Statistics> {
         self.plan.statistics()
     }
 }
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index 0557529f..fe3610c4 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -115,9 +115,9 @@ impl ExecutionPlan for UnresolvedShuffleExec {
         ))
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(&self) -> Result<Statistics> {
         // The full statistics are computed in the `ShuffleReaderExec` node
         // that replaces this one once the previous stage is completed.
-        Statistics::default()
+        Ok(Statistics::new_unknown(&self.schema()))
     }
 }
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 8dbccc32..22567bca 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -108,7 +108,7 @@ impl ExecutionPlan for CollectExec {
         }))
     }
 
-    fn statistics(&self) -> Statistics {
+    fn statistics(&self) -> Result<Statistics> {
         self.plan.statistics()
     }
 }
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 4ee57eb8..60db9f00 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -210,7 +210,7 @@ mod test {
 
     use crate::execution_engine::DefaultQueryStageExec;
     use ballista_core::serde::scheduler::PartitionId;
-    use datafusion::error::DataFusionError;
+    use datafusion::error::{DataFusionError, Result};
     use datafusion::physical_expr::PhysicalSortExpr;
     use datafusion::physical_plan::{
         DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
@@ -299,8 +299,8 @@ mod test {
             Ok(Box::pin(NeverendingRecordBatchStream))
         }
 
-        fn statistics(&self) -> Statistics {
-            Statistics::default()
+        fn statistics(&self) -> Result<Statistics> {
+            Ok(Statistics::new_unknown(&self.schema()))
         }
     }
 
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 470195ce..270c5f99 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -47,20 +47,20 @@ arrow-flight = { workspace = true }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] }
-base64 = { version = "0.13" }
+base64 = { version = "0.21" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = { workspace = true }
 dashmap = "5.4.0"
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
-etcd-client = { version = "0.11", optional = true }
+etcd-client = { version = "0.12", optional = true }
 flatbuffers = { version = "23.5.26" }
 futures = "0.3"
 graphviz-rust = "0.6.1"
 http = "0.2"
 http-body = "0.4"
 hyper = "0.14.4"
-itertools = "0.11.0"
+itertools = "0.12.0"
 log = "0.4"
 object_store = { workspace = true }
 once_cell = { version = "1.16.0", optional = true }
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index 23a863e2..62c96966 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -35,6 +35,7 @@ use arrow_flight::{
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, 
HandshakeRequest,
     HandshakeResponse, Location, Ticket,
 };
+use base64::Engine;
 use futures::Stream;
 use log::{debug, error, warn};
 use std::convert::TryFrom;
@@ -503,8 +504,8 @@ impl FlightSqlService for FlightSqlServiceImpl {
                 "Auth type not implemented: {authorization}"
             )))?;
         }
-        let base64 = &authorization[basic.len()..];
-        let bytes = base64::decode(base64)
+        let bytes = base64::engine::general_purpose::STANDARD
+            .decode(&authorization[basic.len()..])
             .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
         let str = String::from_utf8(bytes)
             .map_err(|_| Status::invalid_argument("authorization not 
parsable"))?;
diff --git a/examples/examples/standalone-sql.rs 
b/examples/examples/standalone-sql.rs
index b24a6350..0427caa7 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -17,7 +17,7 @@
 
 use ballista::prelude::{BallistaConfig, BallistaContext, Result};
 use ballista_examples::test_util;
-use datafusion::prelude::CsvReadOptions;
+use datafusion::execution::options::ParquetReadOptions;
 
 #[tokio::main]
 async fn main() -> Result<()> {
@@ -29,11 +29,11 @@ async fn main() -> Result<()> {
 
     let testdata = test_util::examples_test_data();
 
-    // register csv file with the execution context
-    ctx.register_csv(
+    // register parquet file with the execution context
+    ctx.register_parquet(
         "test",
-        &format!("{testdata}/aggregate_test_100.csv"),
-        CsvReadOptions::new(),
+        &format!("{testdata}/alltypes_plain.parquet"),
+        ParquetReadOptions::default(),
     )
     .await?;
 

Reply via email to