This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0125451  [ballista] support date_part and date_turnc ser/de, pass tpch 
7 (#840)
0125451 is described below

commit 0125451e5fc194b1b1e4828bae5350bcd8ac24f9
Author: QP Hou <[email protected]>
AuthorDate: Mon Aug 9 10:34:41 2021 -0700

    [ballista] support date_part and date_turnc ser/de, pass tpch 7 (#840)
---
 ballista/rust/core/proto/ballista.proto            | 17 ++--
 .../rust/core/src/serde/logical_plan/from_proto.rs | 99 +++++++++-------------
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  5 +-
 .../core/src/serde/physical_plan/from_proto.rs     |  1 +
 benchmarks/run.sh                                  |  2 +-
 benchmarks/src/bin/tpch.rs                         |  1 +
 datafusion/src/logical_plan/expr.rs                | 19 ++++-
 datafusion/src/logical_plan/mod.rs                 | 15 ++--
 datafusion/src/physical_plan/functions.rs          |  4 +-
 datafusion/src/prelude.rs                          |  9 +-
 10 files changed, 88 insertions(+), 84 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 9dbce81..2538a10 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -144,18 +144,19 @@ enum ScalarFunction {
   TOTIMESTAMP = 24;
   ARRAY = 25;
   NULLIF = 26;
-  DATETRUNC = 27;
-  MD5 = 28;
-  SHA224 = 29;
-  SHA256 = 30;
-  SHA384 = 31;
-  SHA512 = 32;
-  LN = 33;
+  DATEPART = 27;
+  DATETRUNC = 28;
+  MD5 = 29;
+  SHA224 = 30;
+  SHA256 = 31;
+  SHA384 = 32;
+  SHA512 = 33;
+  LN = 34;
 }
 
 message ScalarFunctionNode {
   ScalarFunction fun = 1;
-  repeated LogicalExprNode expr = 2;
+  repeated LogicalExprNode args = 2;
 }
 
 enum AggregateFunction {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 2665e33..31b8b6d 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -988,77 +988,58 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
                             expr.fun
                         ))
                     })?;
+                let args = &expr.args;
+
                 match scalar_function {
-                    protobuf::ScalarFunction::Sqrt => {
-                        Ok(sqrt((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Sin => 
Ok(sin((&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Cos => 
Ok(cos((&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Tan => 
Ok(tan((&expr.expr[0]).try_into()?)),
-                    // protobuf::ScalarFunction::Asin => 
Ok(asin(&expr.expr[0]).try_into()?)),
-                    // protobuf::ScalarFunction::Acos => 
Ok(acos(&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Atan => {
-                        Ok(atan((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Exp => 
Ok(exp((&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Log2 => {
-                        Ok(log2((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Ln => 
Ok(ln((&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Log10 => {
-                        Ok(log10((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Floor => {
-                        Ok(floor((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Ceil => {
-                        Ok(ceil((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Round => {
-                        Ok(round((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Trunc => {
-                        Ok(trunc((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Abs => 
Ok(abs((&expr.expr[0]).try_into()?)),
+                    protobuf::ScalarFunction::Sqrt => 
Ok(sqrt((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Sin => 
Ok(sin((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Cos => 
Ok(cos((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Tan => 
Ok(tan((&args[0]).try_into()?)),
+                    // protobuf::ScalarFunction::Asin => 
Ok(asin(&args[0]).try_into()?)),
+                    // protobuf::ScalarFunction::Acos => 
Ok(acos(&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Atan => 
Ok(atan((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Exp => 
Ok(exp((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Log2 => 
Ok(log2((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Ln => 
Ok(ln((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Log10 => 
Ok(log10((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Floor => 
Ok(floor((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Ceil => 
Ok(ceil((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Round => 
Ok(round((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Trunc => 
Ok(trunc((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Abs => 
Ok(abs((&args[0]).try_into()?)),
                     protobuf::ScalarFunction::Signum => {
-                        Ok(signum((&expr.expr[0]).try_into()?))
+                        Ok(signum((&args[0]).try_into()?))
                     }
                     protobuf::ScalarFunction::Octetlength => {
-                        Ok(length((&expr.expr[0]).try_into()?))
-                    }
-                    // // protobuf::ScalarFunction::Concat => 
Ok(concat((&expr.expr[0]).try_into()?)),
-                    protobuf::ScalarFunction::Lower => {
-                        Ok(lower((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Upper => {
-                        Ok(upper((&expr.expr[0]).try_into()?))
-                    }
-                    protobuf::ScalarFunction::Trim => {
-                        Ok(trim((&expr.expr[0]).try_into()?))
+                        Ok(length((&args[0]).try_into()?))
                     }
-                    protobuf::ScalarFunction::Ltrim => {
-                        Ok(ltrim((&expr.expr[0]).try_into()?))
+                    // // protobuf::ScalarFunction::Concat => 
Ok(concat((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Lower => 
Ok(lower((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Upper => 
Ok(upper((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Trim => 
Ok(trim((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Ltrim => 
Ok(ltrim((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Rtrim => 
Ok(rtrim((&args[0]).try_into()?)),
+                    // protobuf::ScalarFunction::Totimestamp => 
Ok(to_timestamp((&args[0]).try_into()?)),
+                    // protobuf::ScalarFunction::Array => 
Ok(array((&args[0]).try_into()?)),
+                    // // protobuf::ScalarFunction::Nullif => 
Ok(nulli((&args[0]).try_into()?)),
+                    protobuf::ScalarFunction::Datepart => {
+                        Ok(date_part((&args[0]).try_into()?, 
(&args[1]).try_into()?))
                     }
-                    protobuf::ScalarFunction::Rtrim => {
-                        Ok(rtrim((&expr.expr[0]).try_into()?))
+                    protobuf::ScalarFunction::Datetrunc => {
+                        Ok(date_trunc((&args[0]).try_into()?, 
(&args[1]).try_into()?))
                     }
-                    // protobuf::ScalarFunction::Totimestamp => 
Ok(to_timestamp((&expr.expr[0]).try_into()?)),
-                    // protobuf::ScalarFunction::Array => 
Ok(array((&expr.expr[0]).try_into()?)),
-                    // // protobuf::ScalarFunction::Nullif => 
Ok(nulli((&expr.expr[0]).try_into()?)),
-                    // protobuf::ScalarFunction::Datetrunc => 
Ok(date_trunc((&expr.expr[0]).try_into()?)),
-                    // protobuf::ScalarFunction::Md5 => 
Ok(md5((&expr.expr[0]).try_into()?)),
+                    // protobuf::ScalarFunction::Md5 => 
Ok(md5((&args[0]).try_into()?)),
                     protobuf::ScalarFunction::Sha224 => {
-                        Ok(sha224((&expr.expr[0]).try_into()?))
+                        Ok(sha224((&args[0]).try_into()?))
                     }
                     protobuf::ScalarFunction::Sha256 => {
-                        Ok(sha256((&expr.expr[0]).try_into()?))
+                        Ok(sha256((&args[0]).try_into()?))
                     }
                     protobuf::ScalarFunction::Sha384 => {
-                        Ok(sha384((&expr.expr[0]).try_into()?))
+                        Ok(sha384((&args[0]).try_into()?))
                     }
                     protobuf::ScalarFunction::Sha512 => {
-                        Ok(sha512((&expr.expr[0]).try_into()?))
+                        Ok(sha512((&args[0]).try_into()?))
                     }
                     _ => Err(proto_error(
                         "Protobuf deserialization error: Unsupported scalar 
function",
@@ -1119,10 +1100,10 @@ impl TryInto<Field> for &protobuf::Field {
     }
 }
 
-use datafusion::physical_plan::datetime_expressions::{date_trunc, 
to_timestamp};
 use datafusion::physical_plan::{aggregates, windows};
 use datafusion::prelude::{
-    array, length, lower, ltrim, md5, rtrim, sha224, sha256, sha384, sha512, 
trim, upper,
+    array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, 
sha256,
+    sha384, sha512, trim, upper,
 };
 use std::convert::TryFrom;
 
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 87f26a1..1a3834a 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -1065,7 +1065,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
             Expr::ScalarVariable(_) => unimplemented!(),
             Expr::ScalarFunction { ref fun, ref args } => {
                 let fun: protobuf::ScalarFunction = fun.try_into()?;
-                let expr: Vec<protobuf::LogicalExprNode> = args
+                let args: Vec<protobuf::LogicalExprNode> = args
                     .iter()
                     .map(|e| e.try_into())
                     .collect::<Result<Vec<protobuf::LogicalExprNode>, 
BallistaError>>()?;
@@ -1074,7 +1074,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
                         protobuf::logical_expr_node::ExprType::ScalarFunction(
                             protobuf::ScalarFunctionNode {
                                 fun: fun.into(),
-                                expr,
+                                args,
                             },
                         ),
                     ),
@@ -1374,6 +1374,7 @@ impl TryInto<protobuf::ScalarFunction> for 
&BuiltinScalarFunction {
             }
             BuiltinScalarFunction::Array => 
Ok(protobuf::ScalarFunction::Array),
             BuiltinScalarFunction::NullIf => 
Ok(protobuf::ScalarFunction::Nullif),
+            BuiltinScalarFunction::DatePart => 
Ok(protobuf::ScalarFunction::Datepart),
             BuiltinScalarFunction::DateTrunc => 
Ok(protobuf::ScalarFunction::Datetrunc),
             BuiltinScalarFunction::MD5 => Ok(protobuf::ScalarFunction::Md5),
             BuiltinScalarFunction::SHA224 => 
Ok(protobuf::ScalarFunction::Sha224),
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 509044b..678bcde 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -501,6 +501,7 @@ impl From<&protobuf::ScalarFunction> for 
BuiltinScalarFunction {
             ScalarFunction::Totimestamp => BuiltinScalarFunction::ToTimestamp,
             ScalarFunction::Array => BuiltinScalarFunction::Array,
             ScalarFunction::Nullif => BuiltinScalarFunction::NullIf,
+            ScalarFunction::Datepart => BuiltinScalarFunction::DatePart,
             ScalarFunction::Datetrunc => BuiltinScalarFunction::DateTrunc,
             ScalarFunction::Md5 => BuiltinScalarFunction::MD5,
             ScalarFunction::Sha224 => BuiltinScalarFunction::SHA224,
diff --git a/benchmarks/run.sh b/benchmarks/run.sh
index 8e36424..b1f47a2 100755
--- a/benchmarks/run.sh
+++ b/benchmarks/run.sh
@@ -20,7 +20,7 @@ set -e
 # This bash script is meant to be run inside the docker-compose environment. 
Check the README for instructions
 
 cd /
-for query in 1 3 5 6 10 12
+for query in 1 3 5 6 7 10 12
 do
   /tpch benchmark ballista --host ballista-scheduler --port 50050 --query 
$query --path /data --format tbl --iterations 1 --debug
 done
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 978fbaa..10b5c2d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1140,6 +1140,7 @@ mod tests {
         test_round_trip!(q3, 3);
         test_round_trip!(q5, 5);
         test_round_trip!(q6, 6);
+        test_round_trip!(q7, 7);
         test_round_trip!(q10, 10);
         test_round_trip!(q12, 12);
     }
diff --git a/datafusion/src/logical_plan/expr.rs 
b/datafusion/src/logical_plan/expr.rs
index 8b0e647..e495284 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1421,7 +1421,20 @@ macro_rules! unary_scalar_expr {
     };
 }
 
-// generate methods for creating the supported unary expressions
+/// Create an convenience function representing a /binaryunary scalar function
+macro_rules! binary_scalar_expr {
+    ($ENUM:ident, $FUNC:ident) => {
+        #[doc = "this scalar function is not documented yet"]
+        pub fn $FUNC(arg1: Expr, arg2: Expr) -> Expr {
+            Expr::ScalarFunction {
+                fun: functions::BuiltinScalarFunction::$ENUM,
+                args: vec![arg1, arg2],
+            }
+        }
+    };
+}
+
+// generate methods for creating the supported unary/binary expressions
 
 // math functions
 unary_scalar_expr!(Sqrt, sqrt);
@@ -1478,6 +1491,10 @@ unary_scalar_expr!(Translate, translate);
 unary_scalar_expr!(Trim, trim);
 unary_scalar_expr!(Upper, upper);
 
+// date functions
+binary_scalar_expr!(DatePart, date_part);
+binary_scalar_expr!(DateTrunc, date_trunc);
+
 /// returns an array of fixed size with each argument on it.
 pub fn array(args: Vec<Expr>) -> Expr {
     Expr::ScalarFunction {
diff --git a/datafusion/src/logical_plan/mod.rs 
b/datafusion/src/logical_plan/mod.rs
index a021d06..7f5ac24 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -38,13 +38,14 @@ pub use display::display_schema;
 pub use expr::{
     abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, 
btrim, case,
     ceil, character_length, chr, col, columnize_expr, combine_filters, concat, 
concat_ws,
-    cos, count, count_distinct, create_udaf, create_udf, exp, 
exprlist_to_fields, floor,
-    in_list, initcap, left, length, lit, ln, log10, log2, lower, lpad, ltrim, 
max, md5,
-    min, normalize_col, normalize_cols, now, octet_length, or, random, 
regexp_match,
-    regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, 
rtrim,
-    sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, 
starts_with, strpos,
-    substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, 
unnormalize_cols,
-    upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, 
Recursion,
+    cos, count, count_distinct, create_udaf, create_udf, date_part, 
date_trunc, exp,
+    exprlist_to_fields, floor, in_list, initcap, left, length, lit, ln, log10, 
log2,
+    lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols, now, 
octet_length,
+    or, random, regexp_match, regexp_replace, repeat, replace, replace_col, 
reverse,
+    right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, 
split_part,
+    sqrt, starts_with, strpos, substr, sum, tan, to_hex, translate, trim, 
trunc,
+    unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter,
+    ExpressionVisitor, Literal, Recursion,
 };
 pub use extension::UserDefinedLogicalNode;
 pub use operators::Operator;
diff --git a/datafusion/src/physical_plan/functions.rs 
b/datafusion/src/physical_plan/functions.rs
index 7bb3cb4..a005f56 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -277,8 +277,8 @@ impl FromStr for BuiltinScalarFunction {
             "concat" => BuiltinScalarFunction::Concat,
             "concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
             "chr" => BuiltinScalarFunction::Chr,
-            "date_part" => BuiltinScalarFunction::DatePart,
-            "date_trunc" => BuiltinScalarFunction::DateTrunc,
+            "date_part" | "datepart" => BuiltinScalarFunction::DatePart,
+            "date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
             "initcap" => BuiltinScalarFunction::InitCap,
             "left" => BuiltinScalarFunction::Left,
             "length" => BuiltinScalarFunction::CharacterLength,
diff --git a/datafusion/src/prelude.rs b/datafusion/src/prelude.rs
index e7ad04e..168e1d5 100644
--- a/datafusion/src/prelude.rs
+++ b/datafusion/src/prelude.rs
@@ -29,9 +29,10 @@ pub use crate::dataframe::DataFrame;
 pub use crate::execution::context::{ExecutionConfig, ExecutionContext};
 pub use crate::logical_plan::{
     array, ascii, avg, bit_length, btrim, character_length, chr, col, concat, 
concat_ws,
-    count, create_udf, in_list, initcap, left, length, lit, lower, lpad, 
ltrim, max, md5,
-    min, now, octet_length, random, regexp_replace, repeat, replace, reverse, 
right,
-    rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, 
strpos, substr,
-    sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
+    count, create_udf, date_part, date_trunc, in_list, initcap, left, length, 
lit, lower,
+    lpad, ltrim, max, md5, min, now, octet_length, random, regexp_replace, 
repeat,
+    replace, reverse, right, rpad, rtrim, sha224, sha256, sha384, sha512, 
split_part,
+    starts_with, strpos, substr, sum, to_hex, translate, trim, upper, Column, 
JoinType,
+    Partitioning,
 };
 pub use crate::physical_plan::csv::CsvReadOptions;

Reply via email to