This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0125451 [ballista] support date_part and date_turnc ser/de, pass tpch
7 (#840)
0125451 is described below
commit 0125451e5fc194b1b1e4828bae5350bcd8ac24f9
Author: QP Hou <[email protected]>
AuthorDate: Mon Aug 9 10:34:41 2021 -0700
[ballista] support date_part and date_turnc ser/de, pass tpch 7 (#840)
---
ballista/rust/core/proto/ballista.proto | 17 ++--
.../rust/core/src/serde/logical_plan/from_proto.rs | 99 +++++++++-------------
.../rust/core/src/serde/logical_plan/to_proto.rs | 5 +-
.../core/src/serde/physical_plan/from_proto.rs | 1 +
benchmarks/run.sh | 2 +-
benchmarks/src/bin/tpch.rs | 1 +
datafusion/src/logical_plan/expr.rs | 19 ++++-
datafusion/src/logical_plan/mod.rs | 15 ++--
datafusion/src/physical_plan/functions.rs | 4 +-
datafusion/src/prelude.rs | 9 +-
10 files changed, 88 insertions(+), 84 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 9dbce81..2538a10 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -144,18 +144,19 @@ enum ScalarFunction {
TOTIMESTAMP = 24;
ARRAY = 25;
NULLIF = 26;
- DATETRUNC = 27;
- MD5 = 28;
- SHA224 = 29;
- SHA256 = 30;
- SHA384 = 31;
- SHA512 = 32;
- LN = 33;
+ DATEPART = 27;
+ DATETRUNC = 28;
+ MD5 = 29;
+ SHA224 = 30;
+ SHA256 = 31;
+ SHA384 = 32;
+ SHA512 = 33;
+ LN = 34;
}
message ScalarFunctionNode {
ScalarFunction fun = 1;
- repeated LogicalExprNode expr = 2;
+ repeated LogicalExprNode args = 2;
}
enum AggregateFunction {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 2665e33..31b8b6d 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -988,77 +988,58 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
expr.fun
))
})?;
+ let args = &expr.args;
+
match scalar_function {
- protobuf::ScalarFunction::Sqrt => {
- Ok(sqrt((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Sin =>
Ok(sin((&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Cos =>
Ok(cos((&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Tan =>
Ok(tan((&expr.expr[0]).try_into()?)),
- // protobuf::ScalarFunction::Asin =>
Ok(asin(&expr.expr[0]).try_into()?)),
- // protobuf::ScalarFunction::Acos =>
Ok(acos(&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Atan => {
- Ok(atan((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Exp =>
Ok(exp((&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Log2 => {
- Ok(log2((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Ln =>
Ok(ln((&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Log10 => {
- Ok(log10((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Floor => {
- Ok(floor((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Ceil => {
- Ok(ceil((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Round => {
- Ok(round((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Trunc => {
- Ok(trunc((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Abs =>
Ok(abs((&expr.expr[0]).try_into()?)),
+ protobuf::ScalarFunction::Sqrt =>
Ok(sqrt((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Sin =>
Ok(sin((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Cos =>
Ok(cos((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Tan =>
Ok(tan((&args[0]).try_into()?)),
+ // protobuf::ScalarFunction::Asin =>
Ok(asin(&args[0]).try_into()?)),
+ // protobuf::ScalarFunction::Acos =>
Ok(acos(&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Atan =>
Ok(atan((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Exp =>
Ok(exp((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Log2 =>
Ok(log2((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Ln =>
Ok(ln((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Log10 =>
Ok(log10((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Floor =>
Ok(floor((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Ceil =>
Ok(ceil((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Round =>
Ok(round((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Trunc =>
Ok(trunc((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Abs =>
Ok(abs((&args[0]).try_into()?)),
protobuf::ScalarFunction::Signum => {
- Ok(signum((&expr.expr[0]).try_into()?))
+ Ok(signum((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Octetlength => {
- Ok(length((&expr.expr[0]).try_into()?))
- }
- // // protobuf::ScalarFunction::Concat =>
Ok(concat((&expr.expr[0]).try_into()?)),
- protobuf::ScalarFunction::Lower => {
- Ok(lower((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Upper => {
- Ok(upper((&expr.expr[0]).try_into()?))
- }
- protobuf::ScalarFunction::Trim => {
- Ok(trim((&expr.expr[0]).try_into()?))
+ Ok(length((&args[0]).try_into()?))
}
- protobuf::ScalarFunction::Ltrim => {
- Ok(ltrim((&expr.expr[0]).try_into()?))
+ // // protobuf::ScalarFunction::Concat =>
Ok(concat((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Lower =>
Ok(lower((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Upper =>
Ok(upper((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Trim =>
Ok(trim((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Ltrim =>
Ok(ltrim((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Rtrim =>
Ok(rtrim((&args[0]).try_into()?)),
+ // protobuf::ScalarFunction::Totimestamp =>
Ok(to_timestamp((&args[0]).try_into()?)),
+ // protobuf::ScalarFunction::Array =>
Ok(array((&args[0]).try_into()?)),
+ // // protobuf::ScalarFunction::Nullif =>
Ok(nulli((&args[0]).try_into()?)),
+ protobuf::ScalarFunction::Datepart => {
+ Ok(date_part((&args[0]).try_into()?,
(&args[1]).try_into()?))
}
- protobuf::ScalarFunction::Rtrim => {
- Ok(rtrim((&expr.expr[0]).try_into()?))
+ protobuf::ScalarFunction::Datetrunc => {
+ Ok(date_trunc((&args[0]).try_into()?,
(&args[1]).try_into()?))
}
- // protobuf::ScalarFunction::Totimestamp =>
Ok(to_timestamp((&expr.expr[0]).try_into()?)),
- // protobuf::ScalarFunction::Array =>
Ok(array((&expr.expr[0]).try_into()?)),
- // // protobuf::ScalarFunction::Nullif =>
Ok(nulli((&expr.expr[0]).try_into()?)),
- // protobuf::ScalarFunction::Datetrunc =>
Ok(date_trunc((&expr.expr[0]).try_into()?)),
- // protobuf::ScalarFunction::Md5 =>
Ok(md5((&expr.expr[0]).try_into()?)),
+ // protobuf::ScalarFunction::Md5 =>
Ok(md5((&args[0]).try_into()?)),
protobuf::ScalarFunction::Sha224 => {
- Ok(sha224((&expr.expr[0]).try_into()?))
+ Ok(sha224((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha256 => {
- Ok(sha256((&expr.expr[0]).try_into()?))
+ Ok(sha256((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha384 => {
- Ok(sha384((&expr.expr[0]).try_into()?))
+ Ok(sha384((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha512 => {
- Ok(sha512((&expr.expr[0]).try_into()?))
+ Ok(sha512((&args[0]).try_into()?))
}
_ => Err(proto_error(
"Protobuf deserialization error: Unsupported scalar
function",
@@ -1119,10 +1100,10 @@ impl TryInto<Field> for &protobuf::Field {
}
}
-use datafusion::physical_plan::datetime_expressions::{date_trunc,
to_timestamp};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
- array, length, lower, ltrim, md5, rtrim, sha224, sha256, sha384, sha512,
trim, upper,
+ array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224,
sha256,
+ sha384, sha512, trim, upper,
};
use std::convert::TryFrom;
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 87f26a1..1a3834a 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1065,7 +1065,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::ScalarVariable(_) => unimplemented!(),
Expr::ScalarFunction { ref fun, ref args } => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
- let expr: Vec<protobuf::LogicalExprNode> = args
+ let args: Vec<protobuf::LogicalExprNode> = args
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<protobuf::LogicalExprNode>,
BallistaError>>()?;
@@ -1074,7 +1074,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
protobuf::logical_expr_node::ExprType::ScalarFunction(
protobuf::ScalarFunctionNode {
fun: fun.into(),
- expr,
+ args,
},
),
),
@@ -1374,6 +1374,7 @@ impl TryInto<protobuf::ScalarFunction> for
&BuiltinScalarFunction {
}
BuiltinScalarFunction::Array =>
Ok(protobuf::ScalarFunction::Array),
BuiltinScalarFunction::NullIf =>
Ok(protobuf::ScalarFunction::Nullif),
+ BuiltinScalarFunction::DatePart =>
Ok(protobuf::ScalarFunction::Datepart),
BuiltinScalarFunction::DateTrunc =>
Ok(protobuf::ScalarFunction::Datetrunc),
BuiltinScalarFunction::MD5 => Ok(protobuf::ScalarFunction::Md5),
BuiltinScalarFunction::SHA224 =>
Ok(protobuf::ScalarFunction::Sha224),
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 509044b..678bcde 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -501,6 +501,7 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::Totimestamp => BuiltinScalarFunction::ToTimestamp,
ScalarFunction::Array => BuiltinScalarFunction::Array,
ScalarFunction::Nullif => BuiltinScalarFunction::NullIf,
+ ScalarFunction::Datepart => BuiltinScalarFunction::DatePart,
ScalarFunction::Datetrunc => BuiltinScalarFunction::DateTrunc,
ScalarFunction::Md5 => BuiltinScalarFunction::MD5,
ScalarFunction::Sha224 => BuiltinScalarFunction::SHA224,
diff --git a/benchmarks/run.sh b/benchmarks/run.sh
index 8e36424..b1f47a2 100755
--- a/benchmarks/run.sh
+++ b/benchmarks/run.sh
@@ -20,7 +20,7 @@ set -e
# This bash script is meant to be run inside the docker-compose environment.
Check the README for instructions
cd /
-for query in 1 3 5 6 10 12
+for query in 1 3 5 6 7 10 12
do
/tpch benchmark ballista --host ballista-scheduler --port 50050 --query
$query --path /data --format tbl --iterations 1 --debug
done
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 978fbaa..10b5c2d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1140,6 +1140,7 @@ mod tests {
test_round_trip!(q3, 3);
test_round_trip!(q5, 5);
test_round_trip!(q6, 6);
+ test_round_trip!(q7, 7);
test_round_trip!(q10, 10);
test_round_trip!(q12, 12);
}
diff --git a/datafusion/src/logical_plan/expr.rs
b/datafusion/src/logical_plan/expr.rs
index 8b0e647..e495284 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1421,7 +1421,20 @@ macro_rules! unary_scalar_expr {
};
}
-// generate methods for creating the supported unary expressions
+/// Create an convenience function representing a /binaryunary scalar function
+macro_rules! binary_scalar_expr {
+ ($ENUM:ident, $FUNC:ident) => {
+ #[doc = "this scalar function is not documented yet"]
+ pub fn $FUNC(arg1: Expr, arg2: Expr) -> Expr {
+ Expr::ScalarFunction {
+ fun: functions::BuiltinScalarFunction::$ENUM,
+ args: vec![arg1, arg2],
+ }
+ }
+ };
+}
+
+// generate methods for creating the supported unary/binary expressions
// math functions
unary_scalar_expr!(Sqrt, sqrt);
@@ -1478,6 +1491,10 @@ unary_scalar_expr!(Translate, translate);
unary_scalar_expr!(Trim, trim);
unary_scalar_expr!(Upper, upper);
+// date functions
+binary_scalar_expr!(DatePart, date_part);
+binary_scalar_expr!(DateTrunc, date_trunc);
+
/// returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
diff --git a/datafusion/src/logical_plan/mod.rs
b/datafusion/src/logical_plan/mod.rs
index a021d06..7f5ac24 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -38,13 +38,14 @@ pub use display::display_schema;
pub use expr::{
abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length,
btrim, case,
ceil, character_length, chr, col, columnize_expr, combine_filters, concat,
concat_ws,
- cos, count, count_distinct, create_udaf, create_udf, exp,
exprlist_to_fields, floor,
- in_list, initcap, left, length, lit, ln, log10, log2, lower, lpad, ltrim,
max, md5,
- min, normalize_col, normalize_cols, now, octet_length, or, random,
regexp_match,
- regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad,
rtrim,
- sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt,
starts_with, strpos,
- substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col,
unnormalize_cols,
- upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal,
Recursion,
+ cos, count, count_distinct, create_udaf, create_udf, date_part,
date_trunc, exp,
+ exprlist_to_fields, floor, in_list, initcap, left, length, lit, ln, log10,
log2,
+ lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols, now,
octet_length,
+ or, random, regexp_match, regexp_replace, repeat, replace, replace_col,
reverse,
+ right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin,
split_part,
+ sqrt, starts_with, strpos, substr, sum, tan, to_hex, translate, trim,
trunc,
+ unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter,
+ ExpressionVisitor, Literal, Recursion,
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
diff --git a/datafusion/src/physical_plan/functions.rs
b/datafusion/src/physical_plan/functions.rs
index 7bb3cb4..a005f56 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -277,8 +277,8 @@ impl FromStr for BuiltinScalarFunction {
"concat" => BuiltinScalarFunction::Concat,
"concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
"chr" => BuiltinScalarFunction::Chr,
- "date_part" => BuiltinScalarFunction::DatePart,
- "date_trunc" => BuiltinScalarFunction::DateTrunc,
+ "date_part" | "datepart" => BuiltinScalarFunction::DatePart,
+ "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
"initcap" => BuiltinScalarFunction::InitCap,
"left" => BuiltinScalarFunction::Left,
"length" => BuiltinScalarFunction::CharacterLength,
diff --git a/datafusion/src/prelude.rs b/datafusion/src/prelude.rs
index e7ad04e..168e1d5 100644
--- a/datafusion/src/prelude.rs
+++ b/datafusion/src/prelude.rs
@@ -29,9 +29,10 @@ pub use crate::dataframe::DataFrame;
pub use crate::execution::context::{ExecutionConfig, ExecutionContext};
pub use crate::logical_plan::{
array, ascii, avg, bit_length, btrim, character_length, chr, col, concat,
concat_ws,
- count, create_udf, in_list, initcap, left, length, lit, lower, lpad,
ltrim, max, md5,
- min, now, octet_length, random, regexp_replace, repeat, replace, reverse,
right,
- rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with,
strpos, substr,
- sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
+ count, create_udf, date_part, date_trunc, in_list, initcap, left, length,
lit, lower,
+ lpad, ltrim, max, md5, min, now, octet_length, random, regexp_replace,
repeat,
+ replace, reverse, right, rpad, rtrim, sha224, sha256, sha384, sha512,
split_part,
+ starts_with, strpos, substr, sum, to_hex, translate, trim, upper, Column,
JoinType,
+ Partitioning,
};
pub use crate::physical_plan::csv::CsvReadOptions;