This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 17f069df4 Add `InList` support for timestamp type. (#3449) (#3450)
17f069df4 is described below
commit 17f069df4227b837cf2741a545c39a8b68d5fd76
Author: Yang Jiang <[email protected]>
AuthorDate: Tue Sep 13 04:09:18 2022 +0800
Add `InList` support for timestamp type. (#3449) (#3450)
* Add `InList` support for timestamp type. (#3449)
* Update datafusion/physical-expr/src/expressions/in_list.rs
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../physical-expr/src/expressions/in_list.rs | 209 ++++++++++++++++++++-
1 file changed, 203 insertions(+), 6 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index fcaa3c3f2..184847316 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -24,8 +24,9 @@ use std::sync::Arc;
use arrow::array::GenericStringArray;
use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
- Int64Array, Int8Array, OffsetSizeTrait, UInt16Array, UInt32Array,
UInt64Array,
- UInt8Array,
+ Int64Array, Int8Array, OffsetSizeTrait, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+ UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::{
datatypes::{DataType, Schema},
@@ -34,10 +35,12 @@ use arrow::{
use crate::PhysicalExpr;
use arrow::array::*;
+use arrow::datatypes::TimeUnit;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::{
Binary, Boolean, Date32, Date64, Decimal128, Int16, Int32, Int64, Int8,
LargeBinary,
- LargeUtf8, UInt16, UInt32, UInt64, UInt8, Utf8,
+ LargeUtf8, TimestampMicrosecond, TimestampMillisecond, TimestampNanosecond,
+ TimestampSecond, UInt16, UInt32, UInt64, UInt8, Utf8,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
@@ -111,8 +114,8 @@ macro_rules! make_contains_primitive {
.iter()
.flat_map(|expr| match expr {
ColumnarValue::Scalar(s) => match s {
- ScalarValue::$SCALAR_VALUE(Some(v)) => Some(*v),
- ScalarValue::$SCALAR_VALUE(None) => None,
+ ScalarValue::$SCALAR_VALUE(Some(v), ..) => Some(*v),
+ ScalarValue::$SCALAR_VALUE(None, ..) => None,
datatype => unreachable!("InList can't reach other data
type {} for {}.", datatype, s),
},
ColumnarValue::Array(_) => {
@@ -175,7 +178,7 @@ macro_rules! set_contains_for_primitive {
let native_set = $SET_VALUES
.iter()
.flat_map(|v| match v {
- $SCALAR_VALUE(value) => *value,
+ $SCALAR_VALUE(value, ..) => *value,
datatype => {
unreachable!(
"InList can't reach other data type {} for {}.",
@@ -691,6 +694,60 @@ impl PhysicalExpr for InListExpr {
let array =
array.as_any().downcast_ref::<Decimal128Array>().unwrap();
Ok(make_set_contains_decimal(array, set, self.negated))
}
+ DataType::Timestamp(unit, _) => match unit {
+ TimeUnit::Second => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap();
+ Ok(set_contains_for_primitive!(
+ array,
+ set,
+ TimestampSecond,
+ self.negated,
+ i64
+ ))
+ }
+ TimeUnit::Millisecond => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap();
+ Ok(set_contains_for_primitive!(
+ array,
+ set,
+ TimestampMillisecond,
+ self.negated,
+ i64
+ ))
+ }
+ TimeUnit::Microsecond => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ Ok(set_contains_for_primitive!(
+ array,
+ set,
+ TimestampMicrosecond,
+ self.negated,
+ i64
+ ))
+ }
+ TimeUnit::Nanosecond => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ Ok(set_contains_for_primitive!(
+ array,
+ set,
+ TimestampNanosecond,
+ self.negated,
+ i64
+ ))
+ }
+ },
datatype =>
Result::Err(DataFusionError::NotImplemented(format!(
"InSet does not support datatype {:?}.",
datatype
@@ -849,6 +906,44 @@ impl PhysicalExpr for InListExpr {
self.negated,
))
}
+ DataType::Timestamp(unit, _) => match unit {
+ TimeUnit::Second => {
+ make_contains_primitive!(
+ array,
+ list_values,
+ self.negated,
+ TimestampSecond,
+ TimestampSecondArray
+ )
+ }
+ TimeUnit::Millisecond => {
+ make_contains_primitive!(
+ array,
+ list_values,
+ self.negated,
+ TimestampMillisecond,
+ TimestampMillisecondArray
+ )
+ }
+ TimeUnit::Microsecond => {
+ make_contains_primitive!(
+ array,
+ list_values,
+ self.negated,
+ TimestampMicrosecond,
+ TimestampMicrosecondArray
+ )
+ }
+ TimeUnit::Nanosecond => {
+ make_contains_primitive!(
+ array,
+ list_values,
+ self.negated,
+ TimestampNanosecond,
+ TimestampNanosecondArray
+ )
+ }
+ },
datatype =>
Result::Err(DataFusionError::NotImplemented(format!(
"InList does not support datatype {:?}.",
datatype
@@ -1659,4 +1754,106 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn in_list_set_timestamp() -> Result<()> {
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )]);
+ let a = TimestampMicrosecondArray::from(vec![
+ Some(1388588401000000000),
+ Some(1288588501000000000),
+ None,
+ ]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ let mut list = vec![
+ lit(ScalarValue::TimestampMicrosecond(
+ Some(1388588401000000000),
+ None,
+ )),
+ lit(ScalarValue::TimestampMicrosecond(None, None)),
+ lit(ScalarValue::TimestampMicrosecond(
+ Some(1388588401000000001),
+ None,
+ )),
+ ];
+ let start_ts = 1388588401000000001;
+ for v in start_ts..(start_ts + OPTIMIZER_INSET_THRESHOLD + 4) {
+ list.push(lit(ScalarValue::TimestampMicrosecond(Some(v as i64),
None)));
+ }
+
+ in_list!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), None, None],
+ col_a.clone(),
+ &schema
+ );
+
+ in_list!(
+ batch,
+ list.clone(),
+ &true,
+ vec![Some(false), None, None],
+ col_a.clone(),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn in_list_timestamp() -> Result<()> {
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )]);
+ let a = TimestampMicrosecondArray::from(vec![
+ Some(1388588401000000000),
+ Some(1288588501000000000),
+ None,
+ ]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ let list = vec![
+ lit(ScalarValue::TimestampMicrosecond(
+ Some(1388588401000000000),
+ None,
+ )),
+ lit(ScalarValue::TimestampMicrosecond(
+ Some(1388588401000000001),
+ None,
+ )),
+ lit(ScalarValue::TimestampMicrosecond(
+ Some(1388588401000000002),
+ None,
+ )),
+ ];
+
+ in_list!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), Some(false), None],
+ col_a.clone(),
+ &schema
+ );
+
+ in_list!(
+ batch,
+ list.clone(),
+ &true,
+ vec![Some(false), Some(true), None],
+ col_a.clone(),
+ &schema
+ );
+ Ok(())
+ }
}