This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e842a0f Use latest DataFusion (#68)
9e842a0f is described below

commit 9e842a0f4dca317990313849cdfee65626430283
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jun 19 07:55:05 2022 -0600

    Use latest DataFusion (#68)
---
 ballista-cli/Cargo.toml                            |  4 +-
 ballista/rust/client/Cargo.toml                    |  6 +-
 ballista/rust/core/Cargo.toml                      |  8 +-
 ballista/rust/core/proto/ballista.proto            |  7 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 11 ++-
 .../core/src/serde/physical_plan/from_proto.rs     |  9 +--
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 89 ++++++++++++++++++----
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  2 +-
 ballista/rust/executor/Cargo.toml                  |  8 +-
 ballista/rust/scheduler/Cargo.toml                 |  4 +-
 benchmarks/Cargo.toml                              |  4 +-
 examples/Cargo.toml                                |  2 +-
 12 files changed, 108 insertions(+), 46 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index f3b0949a..db0ad6b7 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index b9138c71..f20ed712 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true 
}
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = 
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.17"
+sqlparser = "0.18"
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 3bd9edfc..a3803ab5 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,12 +35,12 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.7", default-features = false }
 
-arrow-flight = { version = "15.0.0" }
+arrow-flight = { version = "16.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 futures = "0.3"
 hashbrown = "0.12"
 
@@ -53,7 +53,7 @@ parse_arg = "0.1.3"
 prost = "0.10"
 prost-types = "0.10"
 serde = { version = "1", features = ["derive"] }
-sqlparser = "0.17"
+sqlparser = "0.18"
 tokio = "1.0"
 tonic = "0.7"
 uuid = { version = "1.0", features = ["v4"] }
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 8eb57f72..0af951cf 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -544,6 +544,8 @@ message AggregateExecNode {
   repeated string aggr_expr_name = 6;
   // we need the input schema to the partial aggregate to pass to the final 
aggregate
   datafusion.Schema input_schema = 7;
+  repeated PhysicalExprNode null_expr = 8;
+  repeated bool groups = 9;
 }
 
 message ShuffleWriterExecNode {
@@ -567,12 +569,13 @@ message ShuffleReaderPartition {
 
 message GlobalLimitExecNode {
   PhysicalPlanNode input = 1;
-  uint32 limit = 2;
+  uint32 skip = 2;
+  uint32 fetch = 3;
 }
 
 message LocalLimitExecNode {
   PhysicalPlanNode input = 1;
-  uint32 limit = 2;
+  uint32 fetch = 2;
 }
 
 message SortExecNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index c4532711..5952a095 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -40,15 +40,20 @@ mod roundtrip_tests {
     use datafusion::logical_plan::source_as_provider;
     use datafusion::{
         arrow::datatypes::{DataType, Field, Schema},
+        common::ToDFSchema,
         datafusion_data_access::{
             self,
             object_store::{FileMetaStream, ListEntryStream, ObjectReader, 
ObjectStore},
             SizedFile,
         },
         datasource::listing::ListingTable,
-        logical_plan::{
-            binary_expr, col, CreateExternalTable, Expr, FileType, LogicalPlan,
-            LogicalPlanBuilder, Operator, Repartition, ToDFSchema,
+        logical_expr::{
+            binary_expr, col,
+            logical_plan::{
+                CreateExternalTable, FileType, LogicalPlan, LogicalPlanBuilder,
+                Repartition,
+            },
+            Expr, Operator,
         },
         prelude::*,
     };
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 7cfb2084..d699a011 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -31,19 +31,16 @@ use datafusion::datafusion_data_access::{FileMeta, 
SizedFile};
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
+use datafusion::logical_expr::window_function::WindowFunction;
 use datafusion::logical_plan::FunctionRegistry;
-
+use datafusion::physical_expr::ScalarFunctionExpr;
 use datafusion::physical_plan::file_format::FileScanConfig;
-
-use datafusion::logical_expr::window_function::WindowFunction;
-
 use datafusion::physical_plan::{
     expressions::{
         BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, 
IsNullExpr,
         Literal, NegativeExpr, NotExpr, TryCastExpr, 
DEFAULT_DATAFUSION_CAST_OPTIONS,
     },
-    functions::{self, ScalarFunctionExpr},
-    Partitioning,
+    functions, Partitioning,
 };
 use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
 
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index cbe8c984..ac3cb22e 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -28,8 +28,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_plan::window_frames::WindowFrame;
 use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, 
AggregateMode};
+use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::cross_join::CrossJoinExec;
@@ -235,12 +235,22 @@ impl AsExecutionPlan for PhysicalPlanNode {
             PhysicalPlanType::GlobalLimit(limit) => {
                 let input: Arc<dyn ExecutionPlan> =
                     into_physical_plan!(limit.input, registry, runtime, 
extension_codec)?;
-                Ok(Arc::new(GlobalLimitExec::new(input, limit.limit as usize)))
+                let skip = if limit.skip > 0 {
+                    Some(limit.skip as usize)
+                } else {
+                    None
+                };
+                let fetch = if limit.fetch > 0 {
+                    Some(limit.fetch as usize)
+                } else {
+                    None
+                };
+                Ok(Arc::new(GlobalLimitExec::new(input, skip, fetch)))
             }
             PhysicalPlanType::LocalLimit(limit) => {
                 let input: Arc<dyn ExecutionPlan> =
                     into_physical_plan!(limit.input, registry, runtime, 
extension_codec)?;
-                Ok(Arc::new(LocalLimitExec::new(input, limit.limit as usize)))
+                Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
             }
             PhysicalPlanType::Window(window_agg) => {
                 let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
@@ -329,7 +339,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         AggregateMode::FinalPartitioned
                     }
                 };
-                let group = hash_agg
+
+                let num_expr = hash_agg.group_expr.len();
+
+                let group_expr = hash_agg
                     .group_expr
                     .iter()
                     .zip(hash_agg.group_expr_name.iter())
@@ -339,6 +352,26 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     })
                     .collect::<Result<Vec<_>, _>>()?;
 
+                let null_expr = hash_agg
+                    .null_expr
+                    .iter()
+                    .zip(hash_agg.group_expr_name.iter())
+                    .map(|(expr, name)| {
+                        parse_physical_expr(expr, registry)
+                            .map(|expr| (expr, name.to_string()))
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+
+                let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
+                    hash_agg
+                        .groups
+                        .chunks(num_expr)
+                        .map(|g| g.to_vec())
+                        .collect::<Vec<Vec<bool>>>()
+                } else {
+                    vec![]
+                };
+
                 let input_schema = hash_agg
                     .input_schema
                     .as_ref()
@@ -396,7 +429,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
 
                 Ok(Arc::new(AggregateExec::try_new(
                     agg_mode,
-                    group,
+                    PhysicalGroupBy::new(group_expr, null_expr, groups),
                     physical_aggr_expr,
                     input,
                     Arc::new((&input_schema).try_into()?),
@@ -692,7 +725,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 physical_plan_type: 
Some(PhysicalPlanType::GlobalLimit(Box::new(
                     protobuf::GlobalLimitExecNode {
                         input: Some(Box::new(input)),
-                        limit: limit.limit() as u32,
+                        skip: *limit.skip().unwrap_or(&0) as u32,
+                        fetch: *limit.fetch().unwrap_or(&0) as u32,
                     },
                 ))),
             })
@@ -705,7 +739,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
                     protobuf::LocalLimitExecNode {
                         input: Some(Box::new(input)),
-                        limit: limit.limit() as u32,
+                        fetch: limit.fetch() as u32,
                     },
                 ))),
             })
@@ -797,16 +831,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 ))),
             })
         } else if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
-            let groups = exec
+            let groups: Vec<bool> = exec
                 .group_expr()
+                .groups()
                 .iter()
-                .map(|expr| expr.0.to_owned().try_into())
-                .collect::<Result<Vec<_>, BallistaError>>()?;
+                .flatten()
+                .copied()
+                .collect();
+
             let group_names = exec
                 .group_expr()
+                .expr()
                 .iter()
                 .map(|expr| expr.1.to_owned())
                 .collect();
+
             let agg = exec
                 .aggr_expr()
                 .iter()
@@ -833,16 +872,33 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 exec.input().to_owned(),
                 extension_codec,
             )?;
+
+            let null_expr = exec
+                .group_expr()
+                .null_expr()
+                .iter()
+                .map(|expr| expr.0.to_owned().try_into())
+                .collect::<Result<Vec<_>, BallistaError>>()?;
+
+            let group_expr = exec
+                .group_expr()
+                .expr()
+                .iter()
+                .map(|expr| expr.0.to_owned().try_into())
+                .collect::<Result<Vec<_>, BallistaError>>()?;
+
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
                     protobuf::AggregateExecNode {
-                        group_expr: groups,
+                        group_expr,
                         group_expr_name: group_names,
                         aggr_expr: agg,
                         aggr_expr_name: agg_names,
                         mode: agg_mode as i32,
                         input: Some(Box::new(input)),
                         input_schema: Some(input_schema.as_ref().into()),
+                        null_expr,
+                        groups,
                     },
                 ))),
             })
@@ -1131,10 +1187,10 @@ mod roundtrip_tests {
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
     use datafusion::logical_plan::create_udf;
+    use datafusion::physical_expr::ScalarFunctionExpr;
+    use datafusion::physical_plan::aggregates::PhysicalGroupBy;
     use datafusion::physical_plan::functions;
-    use datafusion::physical_plan::functions::{
-        make_scalar_function, ScalarFunctionExpr,
-    };
+    use datafusion::physical_plan::functions::make_scalar_function;
     use datafusion::physical_plan::projection::ProjectionExec;
     use datafusion::{
         arrow::{
@@ -1236,7 +1292,8 @@ mod roundtrip_tests {
     fn roundtrip_global_limit() -> Result<()> {
         roundtrip_test(Arc::new(GlobalLimitExec::new(
             Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
-            25,
+            None,
+            Some(25),
         )))
     }
 
@@ -1294,7 +1351,7 @@ mod roundtrip_tests {
 
         roundtrip_test(Arc::new(AggregateExec::try_new(
             AggregateMode::Final,
-            groups.clone(),
+            PhysicalGroupBy::new_single(groups.clone()),
             aggregates.clone(),
             Arc::new(EmptyExec::new(false, schema.clone())),
             schema,
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 85aea6c4..7896ddde 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -46,7 +46,7 @@ use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 use crate::serde::{protobuf, BallistaError};
 
 use datafusion::logical_expr::BuiltinScalarFunction;
-use datafusion::physical_plan::functions::ScalarFunctionExpr;
+use datafusion::physical_expr::ScalarFunctionExpr;
 
 impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
     type Error = BallistaError;
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index 439d19d5..9b32fb8b 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "15.0.0" }
-arrow-flight = { version = "15.0.0" }
+arrow = { version = "16.0.0" }
+arrow-flight = { version = "16.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml 
b/ballista/rust/scheduler/Cargo.toml
index aa335e61..5da74bad 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,8 +41,8 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 075633da..9b12889b 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 2f21f79a..5070aed2 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.10"

Reply via email to