This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 9e842a0f Use latest DataFusion (#68)
9e842a0f is described below
commit 9e842a0f4dca317990313849cdfee65626430283
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jun 19 07:55:05 2022 -0600
Use latest DataFusion (#68)
---
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 6 +-
ballista/rust/core/Cargo.toml | 8 +-
ballista/rust/core/proto/ballista.proto | 7 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 11 ++-
.../core/src/serde/physical_plan/from_proto.rs | 9 +--
ballista/rust/core/src/serde/physical_plan/mod.rs | 89 ++++++++++++++++++----
.../rust/core/src/serde/physical_plan/to_proto.rs | 2 +-
ballista/rust/executor/Cargo.toml | 8 +-
ballista/rust/scheduler/Cargo.toml | 4 +-
benchmarks/Cargo.toml | 4 +-
examples/Cargo.toml | 2 +-
12 files changed, 108 insertions(+), 46 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index f3b0949a..db0ad6b7 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index b9138c71..f20ed712 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional =
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.17"
+sqlparser = "0.18"
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 3bd9edfc..a3803ab5 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,12 +35,12 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow-flight = { version = "15.0.0" }
+arrow-flight = { version = "16.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
futures = "0.3"
hashbrown = "0.12"
@@ -53,7 +53,7 @@ parse_arg = "0.1.3"
prost = "0.10"
prost-types = "0.10"
serde = { version = "1", features = ["derive"] }
-sqlparser = "0.17"
+sqlparser = "0.18"
tokio = "1.0"
tonic = "0.7"
uuid = { version = "1.0", features = ["v4"] }
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 8eb57f72..0af951cf 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -544,6 +544,8 @@ message AggregateExecNode {
repeated string aggr_expr_name = 6;
// we need the input schema to the partial aggregate to pass to the final
aggregate
datafusion.Schema input_schema = 7;
+ repeated PhysicalExprNode null_expr = 8;
+ repeated bool groups = 9;
}
message ShuffleWriterExecNode {
@@ -567,12 +569,13 @@ message ShuffleReaderPartition {
message GlobalLimitExecNode {
PhysicalPlanNode input = 1;
- uint32 limit = 2;
+ uint32 skip = 2;
+ uint32 fetch = 3;
}
message LocalLimitExecNode {
PhysicalPlanNode input = 1;
- uint32 limit = 2;
+ uint32 fetch = 2;
}
message SortExecNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index c4532711..5952a095 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -40,15 +40,20 @@ mod roundtrip_tests {
use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
+ common::ToDFSchema,
datafusion_data_access::{
self,
object_store::{FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore},
SizedFile,
},
datasource::listing::ListingTable,
- logical_plan::{
- binary_expr, col, CreateExternalTable, Expr, FileType, LogicalPlan,
- LogicalPlanBuilder, Operator, Repartition, ToDFSchema,
+ logical_expr::{
+ binary_expr, col,
+ logical_plan::{
+ CreateExternalTable, FileType, LogicalPlan, LogicalPlanBuilder,
+ Repartition,
+ },
+ Expr, Operator,
},
prelude::*,
};
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 7cfb2084..d699a011 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -31,19 +31,16 @@ use datafusion::datafusion_data_access::{FileMeta,
SizedFile};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
+use datafusion::logical_expr::window_function::WindowFunction;
use datafusion::logical_plan::FunctionRegistry;
-
+use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::file_format::FileScanConfig;
-
-use datafusion::logical_expr::window_function::WindowFunction;
-
use datafusion::physical_plan::{
expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
IsNullExpr,
Literal, NegativeExpr, NotExpr, TryCastExpr,
DEFAULT_DATAFUSION_CAST_OPTIONS,
},
- functions::{self, ScalarFunctionExpr},
- Partitioning,
+ functions, Partitioning,
};
use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index cbe8c984..ac3cb22e 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -28,8 +28,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::window_frames::WindowFrame;
use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::aggregates::{create_aggregate_expr,
AggregateMode};
+use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
@@ -235,12 +235,22 @@ impl AsExecutionPlan for PhysicalPlanNode {
PhysicalPlanType::GlobalLimit(limit) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan!(limit.input, registry, runtime,
extension_codec)?;
- Ok(Arc::new(GlobalLimitExec::new(input, limit.limit as usize)))
+ let skip = if limit.skip > 0 {
+ Some(limit.skip as usize)
+ } else {
+ None
+ };
+ let fetch = if limit.fetch > 0 {
+ Some(limit.fetch as usize)
+ } else {
+ None
+ };
+ Ok(Arc::new(GlobalLimitExec::new(input, skip, fetch)))
}
PhysicalPlanType::LocalLimit(limit) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan!(limit.input, registry, runtime,
extension_codec)?;
- Ok(Arc::new(LocalLimitExec::new(input, limit.limit as usize)))
+ Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
}
PhysicalPlanType::Window(window_agg) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
@@ -329,7 +339,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
AggregateMode::FinalPartitioned
}
};
- let group = hash_agg
+
+ let num_expr = hash_agg.group_expr.len();
+
+ let group_expr = hash_agg
.group_expr
.iter()
.zip(hash_agg.group_expr_name.iter())
@@ -339,6 +352,26 @@ impl AsExecutionPlan for PhysicalPlanNode {
})
.collect::<Result<Vec<_>, _>>()?;
+ let null_expr = hash_agg
+ .null_expr
+ .iter()
+ .zip(hash_agg.group_expr_name.iter())
+ .map(|(expr, name)| {
+ parse_physical_expr(expr, registry)
+ .map(|expr| (expr, name.to_string()))
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+
+ let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
+ hash_agg
+ .groups
+ .chunks(num_expr)
+ .map(|g| g.to_vec())
+ .collect::<Vec<Vec<bool>>>()
+ } else {
+ vec![]
+ };
+
let input_schema = hash_agg
.input_schema
.as_ref()
@@ -396,7 +429,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(AggregateExec::try_new(
agg_mode,
- group,
+ PhysicalGroupBy::new(group_expr, null_expr, groups),
physical_aggr_expr,
input,
Arc::new((&input_schema).try_into()?),
@@ -692,7 +725,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
physical_plan_type:
Some(PhysicalPlanType::GlobalLimit(Box::new(
protobuf::GlobalLimitExecNode {
input: Some(Box::new(input)),
- limit: limit.limit() as u32,
+ skip: *limit.skip().unwrap_or(&0) as u32,
+ fetch: *limit.fetch().unwrap_or(&0) as u32,
},
))),
})
@@ -705,7 +739,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
protobuf::LocalLimitExecNode {
input: Some(Box::new(input)),
- limit: limit.limit() as u32,
+ fetch: limit.fetch() as u32,
},
))),
})
@@ -797,16 +831,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
))),
})
} else if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
- let groups = exec
+ let groups: Vec<bool> = exec
.group_expr()
+ .groups()
.iter()
- .map(|expr| expr.0.to_owned().try_into())
- .collect::<Result<Vec<_>, BallistaError>>()?;
+ .flatten()
+ .copied()
+ .collect();
+
let group_names = exec
.group_expr()
+ .expr()
.iter()
.map(|expr| expr.1.to_owned())
.collect();
+
let agg = exec
.aggr_expr()
.iter()
@@ -833,16 +872,33 @@ impl AsExecutionPlan for PhysicalPlanNode {
exec.input().to_owned(),
extension_codec,
)?;
+
+ let null_expr = exec
+ .group_expr()
+ .null_expr()
+ .iter()
+ .map(|expr| expr.0.to_owned().try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()?;
+
+ let group_expr = exec
+ .group_expr()
+ .expr()
+ .iter()
+ .map(|expr| expr.0.to_owned().try_into())
+ .collect::<Result<Vec<_>, BallistaError>>()?;
+
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
protobuf::AggregateExecNode {
- group_expr: groups,
+ group_expr,
group_expr_name: group_names,
aggr_expr: agg,
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
input_schema: Some(input_schema.as_ref().into()),
+ null_expr,
+ groups,
},
))),
})
@@ -1131,10 +1187,10 @@ mod roundtrip_tests {
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
use datafusion::logical_plan::create_udf;
+ use datafusion::physical_expr::ScalarFunctionExpr;
+ use datafusion::physical_plan::aggregates::PhysicalGroupBy;
use datafusion::physical_plan::functions;
- use datafusion::physical_plan::functions::{
- make_scalar_function, ScalarFunctionExpr,
- };
+ use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::{
arrow::{
@@ -1236,7 +1292,8 @@ mod roundtrip_tests {
fn roundtrip_global_limit() -> Result<()> {
roundtrip_test(Arc::new(GlobalLimitExec::new(
Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
- 25,
+ None,
+ Some(25),
)))
}
@@ -1294,7 +1351,7 @@ mod roundtrip_tests {
roundtrip_test(Arc::new(AggregateExec::try_new(
AggregateMode::Final,
- groups.clone(),
+ PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
Arc::new(EmptyExec::new(false, schema.clone())),
schema,
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 85aea6c4..7896ddde 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -46,7 +46,7 @@ use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use crate::serde::{protobuf, BallistaError};
use datafusion::logical_expr::BuiltinScalarFunction;
-use datafusion::physical_plan::functions::ScalarFunctionExpr;
+use datafusion::physical_expr::ScalarFunctionExpr;
impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
type Error = BallistaError;
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index 439d19d5..9b32fb8b 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
anyhow = "1"
-arrow = { version = "15.0.0" }
-arrow-flight = { version = "15.0.0" }
+arrow = { version = "16.0.0" }
+arrow-flight = { version = "16.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index aa335e61..5da74bad 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,8 +41,8 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 075633da..9b12889b 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 2f21f79a..5070aed2 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"0289bfe6a98bdae371eee29d3f257b173ddb4437" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"