This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new d34d9d5c Use DataFusion 12.0.0-rc1 (and add support for
DateTimeIntervalExpr and more binary operators) (#200)
d34d9d5c is described below
commit d34d9d5cd0045c10644b4203b789d93dc9e3085c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Sep 12 15:36:01 2022 -0600
Use DataFusion 12.0.0-rc1 (and add support for DateTimeIntervalExpr and
more binary operators) (#200)
* Add serde support for DateTimeInterval
* Use try_into for schema and types
* bump versions again
* use 12.0.0-rc1
* Update Python module
---
ballista-cli/Cargo.toml | 4 +--
ballista/rust/client/Cargo.toml | 4 +--
ballista/rust/core/Cargo.toml | 6 ++--
ballista/rust/core/proto/ballista.proto | 8 +++++
.../core/src/serde/physical_plan/from_proto.rs | 7 ++++
ballista/rust/core/src/serde/physical_plan/mod.rs | 40 ++++++++++++++++++----
.../rust/core/src/serde/physical_plan/to_proto.rs | 25 +++++++++++---
ballista/rust/executor/Cargo.toml | 4 +--
ballista/rust/scheduler/Cargo.toml | 6 ++--
.../rust/scheduler/src/scheduler_server/grpc.rs | 8 +++--
benchmarks/Cargo.toml | 4 +--
examples/Cargo.toml | 2 +-
python/Cargo.toml | 2 +-
python/src/functions.rs | 1 +
python/src/udaf.rs | 2 +-
15 files changed, 93 insertions(+), 30 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 7e363a5f..1b871db1 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 7f870735..2bd34c7d 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional =
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 026f0560..d6c35cda 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,14 +39,14 @@ arrow-flight = { version = "22.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
futures = "0.3"
hashbrown = "0.12"
libloading = "0.7.3"
log = "0.4"
-object_store = "0.4.0"
+object_store = "0.5.0"
once_cell = "1.9.0"
parking_lot = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index cb42d0ae..7b7f748c 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -118,6 +118,8 @@ message PhysicalExprNode {
PhysicalWindowExprNode window_expr = 15;
PhysicalScalarUdfNode scalar_udf = 16;
+
+ PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
}
}
@@ -164,6 +166,12 @@ message PhysicalBinaryExprNode {
string op = 3;
}
+message PhysicalDateTimeIntervalExprNode {
+ PhysicalExprNode l = 1;
+ PhysicalExprNode r = 2;
+ string op = 3;
+}
+
message PhysicalSortExprNode {
PhysicalExprNode expr = 1;
bool asc = 2;
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6abb5f12..4b4bea5c 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -28,6 +28,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::window_function::WindowFunction;
use datafusion::logical_plan::FunctionRegistry;
+use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{
@@ -84,6 +85,12 @@ pub(crate) fn parse_physical_expr(
input_schema,
)?,
)),
+ ExprType::DateTimeIntervalExpr(expr) =>
Arc::new(DateTimeIntervalExpr::try_new(
+ parse_required_physical_box_expr(&expr.l, registry, "left",
input_schema)?,
+ from_proto_binary_op(&expr.op)?,
+ parse_required_physical_box_expr(&expr.r, registry, "right",
input_schema)?,
+ input_schema,
+ )?),
ExprType::AggregateExpr(_) => {
return Err(BallistaError::General(
"Cannot convert aggregate expr node to physical
expression".to_owned(),
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 167a9f35..b260b5d9 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -685,7 +685,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
- schema: Some(exec.schema().as_ref().into()),
+ schema: Some(exec.schema().as_ref().try_into()?),
stringified_plans: exec
.stringified_plans()
.iter()
@@ -797,7 +797,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
}
})
.collect();
- let schema = f.schema().into();
+ let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
@@ -910,14 +910,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
- input_schema: Some(input_schema.as_ref().into()),
+ input_schema: Some(input_schema.as_ref().try_into()?),
null_expr,
groups,
},
))),
})
} else if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
- let schema = empty.schema().as_ref().into();
+ let schema = empty.schema().as_ref().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Empty(
protobuf::EmptyExecNode {
@@ -985,7 +985,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
protobuf::ShuffleReaderExecNode {
partition,
- schema: Some(exec.schema().as_ref().into()),
+ schema: Some(exec.schema().as_ref().try_into()?),
},
)),
})
@@ -1102,7 +1102,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Unresolved(
protobuf::UnresolvedShuffleExecNode {
stage_id: exec.stage_id as u32,
- schema: Some(exec.schema().as_ref().into()),
+ schema: Some(exec.schema().as_ref().try_into()?),
input_partition_count: exec.input_partition_count as
u32,
output_partition_count: exec.output_partition_count as
u32,
},
@@ -1197,10 +1197,12 @@ mod roundtrip_tests {
use std::sync::Arc;
use datafusion::arrow::array::ArrayRef;
+ use datafusion::arrow::datatypes::IntervalUnit;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
use datafusion::logical_plan::create_udf;
+ use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
use datafusion::physical_plan::functions;
@@ -1294,6 +1296,32 @@ mod roundtrip_tests {
roundtrip_test(Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))))
}
+ #[test]
+ fn roundtrip_date_time_interval() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("some_date", DataType::Date32, false),
+ Field::new(
+ "some_interval",
+ DataType::Interval(IntervalUnit::DayTime),
+ false,
+ ),
+ ]);
+ let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+ let date_expr = col("some_date", &schema)?;
+ let literal_expr = col("some_interval", &schema)?;
+ let date_time_interval_expr = Arc::new(DateTimeIntervalExpr::try_new(
+ date_expr,
+ Operator::Plus,
+ literal_expr,
+ &schema,
+ )?);
+ let plan = Arc::new(ProjectionExec::try_new(
+ vec![(date_time_interval_expr, "result".to_string())],
+ input,
+ )?);
+ roundtrip_test(plan)
+ }
+
#[test]
fn roundtrip_local_limit() -> Result<()> {
roundtrip_test(Arc::new(LocalLimitExec::new(
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 16195377..a6a7a694 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -46,6 +46,7 @@ use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use crate::serde::{protobuf, BallistaError};
use datafusion::logical_expr::BuiltinScalarFunction;
+use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
@@ -279,7 +280,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
protobuf::PhysicalCastNode {
expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().into()),
+ arrow_type: Some(cast.cast_type().try_into()?),
},
))),
})
@@ -288,7 +289,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(
Box::new(protobuf::PhysicalTryCastNode {
expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().into()),
+ arrow_type: Some(cast.cast_type().try_into()?),
}),
)),
})
@@ -309,7 +310,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
name: expr.name().to_string(),
fun: fun.into(),
args,
- return_type: Some(expr.return_type().into()),
+ return_type:
Some(expr.return_type().try_into()?),
},
),
),
@@ -320,11 +321,25 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for
protobuf::PhysicalExprNode {
protobuf::PhysicalScalarUdfNode {
name: expr.name().to_string(),
args,
- return_type: Some(expr.return_type().into()),
+ return_type: Some(expr.return_type().try_into()?),
},
)),
})
}
+ } else if let Some(expr) = expr.downcast_ref::<DateTimeIntervalExpr>()
{
+ let dti_expr = Box::new(protobuf::PhysicalDateTimeIntervalExprNode
{
+ l: Some(Box::new(expr.lhs().to_owned().try_into()?)),
+ r: Some(Box::new(expr.rhs().to_owned().try_into()?)),
+ op: format!("{:?}", expr.op()),
+ });
+
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+
protobuf::physical_expr_node::ExprType::DateTimeIntervalExpr(
+ dti_expr,
+ ),
+ ),
+ })
} else {
Err(BallistaError::General(format!(
"physical_plan::to_proto() unsupported expression {:?}",
@@ -435,7 +450,7 @@ impl TryFrom<&FileScanConfig> for
protobuf::FileScanExecConf {
.iter()
.map(|n| *n as u32)
.collect(),
- schema: Some(conf.file_schema.as_ref().into()),
+ schema: Some(conf.file_schema.as_ref().try_into()?),
table_partition_cols: conf.table_partition_cols.to_vec(),
object_store_url: conf.object_store_url.to_string(),
})
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index 7f0ffdbd..232a365c 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,8 +40,8 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index a6f5e062..24616ad8 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -45,8 +45,8 @@ ballista-core = { path = "../core", version = "0.7.0" }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
etcd-client = { version = "0.9", optional = true }
flatbuffers = { version = "2.1.2" }
futures = "0.3"
@@ -55,7 +55,7 @@ http-body = "0.4"
hyper = "0.14.4"
itertools = "0.10.3"
log = "0.4"
-object_store = "0.4.0"
+object_store = "0.5.0"
parking_lot = "0.12"
parse_arg = "0.1.3"
prost = "0.11"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 66ab3c93..f57262eb 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -40,9 +40,9 @@ use futures::TryStreamExt;
use log::{debug, error, info, warn};
// use http_body::Body;
+use std::convert::TryInto;
use std::ops::Deref;
use std::sync::Arc;
-
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
@@ -336,7 +336,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
})?;
Ok(Response::new(GetFileMetadataResult {
- schema: Some(schema.as_ref().into()),
+ schema: Some(schema.as_ref().try_into().map_err(|e| {
+ let msg = format!("Error inferring schema: {}", e);
+ error!("{}", msg);
+ tonic::Status::internal(msg)
+ })?),
}))
}
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 003e1717..cc0d41ae 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 440c33d2..14155ee6 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index d2316d93..81bd6d1e 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"12.0.0-rc1", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3",
"abi3-py37"] }
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 2bb28016..2d0550d2 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -169,6 +169,7 @@ macro_rules! aggregate_function {
fun: AggregateFunction::$FUNC,
args: args.into_iter().map(|e| e.into()).collect(),
distinct,
+ filter: None,
};
expr.into()
}
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3432b8f3..3b93048b 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -97,7 +97,7 @@ impl Accumulator for RustAccumulator {
}
pub fn to_rust_accumulator(accum: PyObject) ->
AccumulatorFunctionImplementation {
- Arc::new(move || -> Result<Box<dyn Accumulator>> {
+ Arc::new(move |_| -> Result<Box<dyn Accumulator>> {
let accum = Python::with_gil(|py| {
accum
.call0(py)