This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 97f2e4fd5 Implement `current_time` scalar function (#4054)
97f2e4fd5 is described below
commit 97f2e4fd5517c762b0862d22b81f957db511e22e
Author: 哇呜哇呜呀咦耶 <[email protected]>
AuthorDate: Wed Nov 2 04:06:05 2022 +0800
Implement `current_time` scalar function (#4054)
* implement `current_time`
* edit test case
* fix nanosecond after midnight
* fix: fmt
Co-authored-by: pingao <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/sql/timestamp.rs | 31 ++++++++++++++++++++++
datafusion/expr/src/built_in_function.rs | 5 ++++
datafusion/expr/src/expr_fn.rs | 8 ++++++
datafusion/expr/src/function.rs | 1 +
.../physical-expr/src/datetime_expressions.rs | 13 +++++++++
datafusion/physical-expr/src/functions.rs | 6 +++++
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/from_proto.rs | 1 +
datafusion/proto/src/generated/pbjson.rs | 3 +++
datafusion/proto/src/generated/prost.rs | 2 ++
datafusion/proto/src/to_proto.rs | 1 +
11 files changed, 72 insertions(+)
diff --git a/datafusion/core/tests/sql/timestamp.rs
b/datafusion/core/tests/sql/timestamp.rs
index 890950200..5192b6cb5 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -1651,3 +1651,34 @@ async fn test_current_date() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_current_time() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select current_time() dt";
+ let results = execute_to_batches(&ctx, sql).await;
+ assert_eq!(
+ results[0]
+ .schema()
+ .field_with_name("dt")
+ .unwrap()
+ .data_type()
+ .to_owned(),
+ DataType::Time64(TimeUnit::Nanosecond)
+ );
+
+ let sql = "select case when current_time() = (now()::bigint %
86400000000000)::time then 'OK' else 'FAIL' end result";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+--------+",
+ "| result |",
+ "+--------+",
+ "| OK |",
+ "+--------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index 796f0c909..c8e144718 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -158,6 +158,8 @@ pub enum BuiltinScalarFunction {
Now,
///current_date
CurrentDate,
+ /// current_time
+ CurrentTime,
/// translate
Translate,
/// trim
@@ -181,6 +183,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Random
| BuiltinScalarFunction::Now
| BuiltinScalarFunction::CurrentDate
+ | BuiltinScalarFunction::CurrentTime
)
}
/// Returns the [Volatility] of the builtin function.
@@ -259,6 +262,7 @@ impl BuiltinScalarFunction {
// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
BuiltinScalarFunction::CurrentDate => Volatility::Stable,
+ BuiltinScalarFunction::CurrentTime => Volatility::Stable,
// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
@@ -315,6 +319,7 @@ impl FromStr for BuiltinScalarFunction {
"concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
"chr" => BuiltinScalarFunction::Chr,
"current_date" => BuiltinScalarFunction::CurrentDate,
+ "current_time" => BuiltinScalarFunction::CurrentTime,
"date_part" | "datepart" => BuiltinScalarFunction::DatePart,
"date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
"date_bin" => BuiltinScalarFunction::DateBin,
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index cfd043a3a..006bcac5e 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -460,6 +460,14 @@ pub fn current_date() -> Expr {
}
}
+/// Returns current UTC time as a [`DataType::Time64`] value
+pub fn current_time() -> Expr {
+ Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::CurrentTime,
+ args: vec![],
+ }
+}
+
/// Create a CASE WHEN statement with literal WHEN expressions for comparison
to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index 4a47d8209..de5ab0e1a 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -222,6 +222,7 @@ pub fn return_type(
Some("UTC".to_owned()),
)),
BuiltinScalarFunction::CurrentDate => Ok(DataType::Date32),
+ BuiltinScalarFunction::CurrentTime =>
Ok(DataType::Time64(TimeUnit::Nanosecond)),
BuiltinScalarFunction::Translate => {
utf8_to_str_type(&input_expr_types[0], "translate")
}
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs
b/datafusion/physical-expr/src/datetime_expressions.rs
index 2a0cf3012..48f093a2f 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -200,6 +200,19 @@ pub fn make_current_date(
move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Date32(days)))
}
+/// Create an implementation of `current_time()` that always returns the
+/// specified current time.
+///
+/// The semantics of `current_time()` require it to return the same value
+/// wherever it appears within a single statement. This value is
+/// chosen during planning time.
+pub fn make_current_time(
+ now_ts: DateTime<Utc>,
+) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
+ let nano = Some(now_ts.timestamp_nanos() % 86400000000000);
+ move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64(nano)))
+}
+
fn quarter_month(date: &NaiveDateTime) -> u32 {
1 + 3 * ((date.month() - 1) / 3)
}
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index f7c3cbef9..cd1d31544 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -433,6 +433,12 @@ pub fn create_physical_fun(
execution_props.query_execution_start_time,
))
}
+ BuiltinScalarFunction::CurrentTime => {
+ // bind value for current_time at plan time
+ Arc::new(datetime_expressions::make_current_time(
+ execution_props.query_execution_start_time,
+ ))
+ }
BuiltinScalarFunction::InitCap => Arc::new(|args| match
args[0].data_type() {
DataType::Utf8 => {
make_scalar_function(string_expressions::initcap::<i32>)(args)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 1ff2952df..083f24502 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -496,6 +496,7 @@ enum ScalarFunction {
DateBin=68;
ArrowTypeof=69;
CurrentDate=70;
+ CurrentTime=71;
}
message ScalarFunctionNode {
diff --git a/datafusion/proto/src/from_proto.rs
b/datafusion/proto/src/from_proto.rs
index cd61bba7a..708fb51ec 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -430,6 +430,7 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds,
ScalarFunction::Now => Self::Now,
ScalarFunction::CurrentDate => Self::CurrentDate,
+ ScalarFunction::CurrentTime => Self::CurrentTime,
ScalarFunction::Translate => Self::Translate,
ScalarFunction::RegexpMatch => Self::RegexpMatch,
ScalarFunction::Coalesce => Self::Coalesce,
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 941cc461f..3557dee46 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -9621,6 +9621,7 @@ impl serde::Serialize for ScalarFunction {
Self::DateBin => "DateBin",
Self::ArrowTypeof => "ArrowTypeof",
Self::CurrentDate => "CurrentDate",
+ Self::CurrentTime => "CurrentTime",
};
serializer.serialize_str(variant)
}
@@ -9703,6 +9704,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"DateBin",
"ArrowTypeof",
"CurrentDate",
+ "CurrentTime",
];
struct GeneratedVisitor;
@@ -9816,6 +9818,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"DateBin" => Ok(ScalarFunction::DateBin),
"ArrowTypeof" => Ok(ScalarFunction::ArrowTypeof),
"CurrentDate" => Ok(ScalarFunction::CurrentDate),
+ "CurrentTime" => Ok(ScalarFunction::CurrentTime),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 038195c94..d404f0275 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1225,6 +1225,7 @@ pub enum ScalarFunction {
DateBin = 68,
ArrowTypeof = 69,
CurrentDate = 70,
+ CurrentTime = 71,
}
impl ScalarFunction {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -1304,6 +1305,7 @@ impl ScalarFunction {
ScalarFunction::DateBin => "DateBin",
ScalarFunction::ArrowTypeof => "ArrowTypeof",
ScalarFunction::CurrentDate => "CurrentDate",
+ ScalarFunction::CurrentTime => "CurrentTime",
}
}
}
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index a089f0768..1aea38f3a 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1179,6 +1179,7 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::ToTimestampSeconds =>
Self::ToTimestampSeconds,
BuiltinScalarFunction::Now => Self::Now,
BuiltinScalarFunction::CurrentDate => Self::CurrentDate,
+ BuiltinScalarFunction::CurrentTime => Self::CurrentTime,
BuiltinScalarFunction::Translate => Self::Translate,
BuiltinScalarFunction::RegexpMatch => Self::RegexpMatch,
BuiltinScalarFunction::Coalesce => Self::Coalesce,