This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4b7cdcb ARROW-11055: [Rust] [DataFusion] Support date_trunc function
4b7cdcb is described below
commit 4b7cdcb9220b6d94b251aef32c21ef9b4097ecfa
Author: Pavel Tiunov <[email protected]>
AuthorDate: Sat Jan 2 07:53:06 2021 -0500
ARROW-11055: [Rust] [DataFusion] Support date_trunc function
`date_trunc` SQL function implementation and GROUP BY timestamp support.
Closes #9040 from paveltiunov/date-trunc
Authored-by: Pavel Tiunov <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/arrow/src/csv/reader.rs | 34 +++++
rust/datafusion/src/execution/context.rs | 51 +++++++
.../src/physical_plan/datetime_expressions.rs | 161 +++++++++++++++++++++
rust/datafusion/src/physical_plan/functions.rs | 13 ++
rust/datafusion/src/physical_plan/group_scalar.rs | 4 +
.../datafusion/src/physical_plan/hash_aggregate.rs | 23 ++-
rust/datafusion/src/physical_plan/hash_join.rs | 17 ++-
rust/datafusion/src/physical_plan/type_coercion.rs | 3 +-
rust/datafusion/src/scalar.rs | 26 +++-
rust/datafusion/src/test/mod.rs | 20 ++-
10 files changed, 346 insertions(+), 6 deletions(-)
diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs
index 3fca7b2..c9f97cd 100644
--- a/rust/arrow/src/csv/reader.rs
+++ b/rust/arrow/src/csv/reader.rs
@@ -449,6 +449,16 @@ fn parse(
&DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
+ &DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ build_primitive_array::<TimestampMicrosecondType>(
+ line_number,
+ rows,
+ i,
+ )
+ }
+ &DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+
build_primitive_array::<TimestampNanosecondType>(line_number, rows, i)
+ }
&DataType::Utf8 => Ok(Arc::new(
rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
) as ArrayRef),
@@ -531,6 +541,30 @@ impl Parser for Date64Type {
}
}
+impl Parser for TimestampNanosecondType {
+ fn parse(string: &str) -> Option<i64> {
+ match Self::DATA_TYPE {
+ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
+ Self::Native::from_i64(date_time.timestamp_nanos())
+ }
+ _ => None,
+ }
+ }
+}
+
+impl Parser for TimestampMicrosecondType {
+ fn parse(string: &str) -> Option<i64> {
+ match Self::DATA_TYPE {
+ DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
+ Self::Native::from_i64(date_time.timestamp_nanos() / 1000)
+ }
+ _ => None,
+ }
+ }
+}
+
fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
T::parse(string)
}
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index dae9c90..ebfc887 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -1055,6 +1055,57 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn group_by_date_trunc() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let mut ctx = ExecutionContext::new();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c2", DataType::UInt64, false),
+ Field::new(
+ "t1",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..4 {
+ let filename = format!("partition-{}.{}", partition, "csv");
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for i in 0..10 {
+ let data = format!("{},2020-12-{}T00:00:00.000\n", i, i + 10);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).has_header(false),
+ )?;
+
+ let results = plan_and_collect(
+ &mut ctx,
+ "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY
date_trunc('week', t1)"
+ ).await?;
+ assert_eq!(results.len(), 1);
+
+ let batch = &results[0];
+
+ assert_eq!(field_names(batch), vec!["week", "SUM(c2)"]);
+
+ let expected: Vec<&str> =
+ vec!["2020-12-07T00:00:00,24", "2020-12-14T00:00:00,156"];
+ let mut rows = test::format_batch(&batch);
+ rows.sort();
+ assert_eq!(rows, expected);
+
+ Ok(())
+ }
+
async fn run_count_distinct_integers_aggregated_scenario(
partitions: Vec<Vec<(&str, u64)>>,
) -> Result<Vec<RecordBatch>> {
diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs
b/rust/datafusion/src/physical_plan/datetime_expressions.rs
index a12b00c..247f899 100644
--- a/rust/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs
@@ -25,6 +25,7 @@ use arrow::{
buffer::Buffer,
datatypes::{DataType, TimeUnit, ToByteSlice},
};
+use chrono::Duration;
use chrono::{prelude::*, LocalResult};
#[inline]
@@ -205,6 +206,108 @@ pub fn to_timestamp(args: &[ArrayRef]) ->
Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from(Arc::new(data)))
}
+/// date_trunc SQL function
+pub fn date_trunc(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
+ let granularity_array =
+ &args[0]
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "Could not cast date_trunc granularity input to
StringArray"
+ .to_string(),
+ )
+ })?;
+
+ let array = &args[1]
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "Could not cast date_trunc array input to
TimestampNanosecondArray"
+ .to_string(),
+ )
+ })?;
+
+ let range = 0..array.len();
+ let result = range
+ .map(|i| {
+ if array.is_null(i) {
+ Ok(0_i64)
+ } else {
+ let date_time = match granularity_array.value(i) {
+ "second" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0)),
+ "minute" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0)),
+ "hour" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0))
+ .and_then(|d| d.with_minute(0)),
+ "day" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0))
+ .and_then(|d| d.with_minute(0))
+ .and_then(|d| d.with_hour(0)),
+ "week" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0))
+ .and_then(|d| d.with_minute(0))
+ .and_then(|d| d.with_hour(0))
+ .map(|d| {
+ d - Duration::seconds(60 * 60 * 24 * d.weekday()
as i64)
+ }),
+ "month" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0))
+ .and_then(|d| d.with_minute(0))
+ .and_then(|d| d.with_hour(0))
+ .and_then(|d| d.with_day0(0)),
+ "year" => array
+ .value_as_datetime(i)
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0))
+ .and_then(|d| d.with_minute(0))
+ .and_then(|d| d.with_hour(0))
+ .and_then(|d| d.with_day0(0))
+ .and_then(|d| d.with_month0(0)),
+ unsupported => {
+ return Err(DataFusionError::Execution(format!(
+ "Unsupported date_trunc granularity: {}",
+ unsupported
+ )))
+ }
+ };
+ date_time.map(|d| d.timestamp_nanos()).ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "Can't truncate date time: {:?}",
+ array.value_as_datetime(i)
+ ))
+ })
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let data = ArrayData::new(
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ array.len(),
+ Some(array.null_count()),
+ array.data().null_buffer().cloned(),
+ 0,
+ vec![Buffer::from(result.to_byte_slice())],
+ vec![],
+ );
+
+ Ok(TimestampNanosecondArray::from(Arc::new(data)))
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -379,6 +482,64 @@ mod tests {
}
#[test]
+ fn date_trunc_test() -> Result<()> {
+ let mut ts_builder = StringBuilder::new(2);
+ let mut truncated_builder = StringBuilder::new(2);
+ let mut string_builder = StringBuilder::new(2);
+
+ ts_builder.append_null()?;
+ truncated_builder.append_null()?;
+ string_builder.append_value("second")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-08T13:42:29.000000Z")?;
+ string_builder.append_value("second")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-08T13:42:00.000000Z")?;
+ string_builder.append_value("minute")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-08T13:00:00.000000Z")?;
+ string_builder.append_value("hour")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-08T00:00:00.000000Z")?;
+ string_builder.append_value("day")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-07T00:00:00.000000Z")?;
+ string_builder.append_value("week")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-09-01T00:00:00.000000Z")?;
+ string_builder.append_value("month")?;
+
+ ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-01-01T00:00:00.000000Z")?;
+ string_builder.append_value("year")?;
+
+ ts_builder.append_value("2021-01-01T13:42:29.190855Z")?;
+ truncated_builder.append_value("2020-12-28T00:00:00.000000Z")?;
+ string_builder.append_value("week")?;
+
+ ts_builder.append_value("2020-01-01T13:42:29.190855Z")?;
+ truncated_builder.append_value("2019-12-30T00:00:00.000000Z")?;
+ string_builder.append_value("week")?;
+
+ let string_array = Arc::new(string_builder.finish());
+ let ts_array =
Arc::new(to_timestamp(&[Arc::new(ts_builder.finish())]).unwrap());
+ let date_trunc_array = date_trunc(&[string_array, ts_array])
+ .expect("that to_timestamp parsed values without error");
+
+ let expected_timestamps =
+ to_timestamp(&[Arc::new(truncated_builder.finish())]).unwrap();
+
+ assert_eq!(date_trunc_array, expected_timestamps);
+ Ok(())
+ }
+
+ #[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
// that we get an error.
diff --git a/rust/datafusion/src/physical_plan/functions.rs
b/rust/datafusion/src/physical_plan/functions.rs
index 0023fbf..316586d 100644
--- a/rust/datafusion/src/physical_plan/functions.rs
+++ b/rust/datafusion/src/physical_plan/functions.rs
@@ -130,6 +130,8 @@ pub enum BuiltinScalarFunction {
Array,
/// SQL NULLIF()
NullIf,
+ /// Date truncate
+ DateTrunc,
}
impl fmt::Display for BuiltinScalarFunction {
@@ -168,6 +170,7 @@ impl FromStr for BuiltinScalarFunction {
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+ "date_trunc" => BuiltinScalarFunction::DateTrunc,
"array" => BuiltinScalarFunction::Array,
"nullif" => BuiltinScalarFunction::NullIf,
_ => {
@@ -247,6 +250,9 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestamp => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
+ BuiltinScalarFunction::DateTrunc => {
+ Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
+ }
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", arg_types[0].clone(), true)),
arg_types.len() as i32,
@@ -317,6 +323,9 @@ pub fn create_physical_expr(
BuiltinScalarFunction::ToTimestamp => {
|args| Ok(Arc::new(datetime_expressions::to_timestamp(args)?))
}
+ BuiltinScalarFunction::DateTrunc => {
+ |args| Ok(Arc::new(datetime_expressions::date_trunc(args)?))
+ }
BuiltinScalarFunction::Array => |args|
Ok(array_expressions::array(args)?),
});
// coerce
@@ -355,6 +364,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8])
}
BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1,
vec![DataType::Utf8]),
+ BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
+ DataType::Utf8,
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ ]),
BuiltinScalarFunction::Array => {
Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec())
}
diff --git a/rust/datafusion/src/physical_plan/group_scalar.rs
b/rust/datafusion/src/physical_plan/group_scalar.rs
index 8c11a6b..295eb46 100644
--- a/rust/datafusion/src/physical_plan/group_scalar.rs
+++ b/rust/datafusion/src/physical_plan/group_scalar.rs
@@ -35,6 +35,8 @@ pub(crate) enum GroupByScalar {
Int32(i32),
Int64(i64),
Utf8(Box<String>),
+ TimeMicrosecond(i64),
+ TimeNanosecond(i64),
}
impl TryFrom<&ScalarValue> for GroupByScalar {
@@ -87,6 +89,8 @@ impl From<&GroupByScalar> for ScalarValue {
GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)),
GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)),
GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())),
+ GroupByScalar::TimeMicrosecond(v) =>
ScalarValue::TimeMicrosecond(Some(*v)),
+ GroupByScalar::TimeNanosecond(v) =>
ScalarValue::TimeNanosecond(Some(*v)),
}
}
}
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs
b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 9677246..864bc78 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -30,7 +30,7 @@ use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning,
PhysicalExpr};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use arrow::{
@@ -49,6 +49,7 @@ use super::{
use ahash::RandomState;
use hashbrown::HashMap;
+use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray};
use async_trait::async_trait;
/// Hash aggregate modes
@@ -672,6 +673,12 @@ fn create_batch_from_map(
GroupByScalar::Utf8(str) => {
Arc::new(StringArray::from(vec![&***str]))
}
+ GroupByScalar::TimeMicrosecond(n) => {
+ Arc::new(TimestampMicrosecondArray::from(vec![*n]))
+ }
+ GroupByScalar::TimeNanosecond(n) => {
+ Arc::new(TimestampNanosecondArray::from_vec(vec![*n],
None))
+ }
})
.collect::<Vec<ArrayRef>>();
@@ -780,6 +787,20 @@ pub(crate) fn create_group_by_values(
let array =
col.as_any().downcast_ref::<StringArray>().unwrap();
vec[i] = GroupByScalar::Utf8(Box::new(array.value(row).into()))
}
+ DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ let array = col
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ vec[i] = GroupByScalar::TimeMicrosecond(array.value(row))
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ let array = col
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ vec[i] = GroupByScalar::TimeNanosecond(array.value(row))
+ }
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs
b/rust/datafusion/src/physical_plan/hash_join.rs
index 9ac7447..8a3ee4b 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -18,6 +18,7 @@
//! Defines the join plan for executing partitions in parallel and then
joining the results
//! into a set of partitions.
+use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray};
use arrow::{array::ArrayRef, compute};
use std::sync::Arc;
use std::{any::Any, collections::HashSet};
@@ -28,7 +29,7 @@ use hashbrown::HashMap;
use tokio::sync::Mutex;
use arrow::array::{make_array, Array, MutableArrayData};
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, TimeUnit};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
@@ -373,6 +374,20 @@ pub(crate) fn create_key(
let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
+ DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ let array = col
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ vec.extend(array.value(row).to_le_bytes().iter());
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ let array = col
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ vec.extend(array.value(row).to_le_bytes().iter());
+ }
DataType::Utf8 => {
let array =
col.as_any().downcast_ref::<StringArray>().unwrap();
let value = array.value(row);
diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs
b/rust/datafusion/src/physical_plan/type_coercion.rs
index 91eaf65..090729a 100644
--- a/rust/datafusion/src/physical_plan/type_coercion.rs
+++ b/rust/datafusion/src/physical_plan/type_coercion.rs
@@ -31,7 +31,7 @@
use std::sync::Arc;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, TimeUnit};
use super::{functions::Signature, PhysicalExpr};
use crate::error::{DataFusionError, Result};
@@ -176,6 +176,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from:
&DataType) -> bool {
| Float32
| Float64
),
+ Timestamp(TimeUnit::Nanosecond, None) => matches!(type_from,
Timestamp(_, None)),
Utf8 => true,
_ => false,
}
diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs
index 5afcabd..64e5bcc 100644
--- a/rust/datafusion/src/scalar.rs
+++ b/rust/datafusion/src/scalar.rs
@@ -20,8 +20,9 @@
use std::{convert::TryFrom, fmt, sync::Arc};
use arrow::array::{
- Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
UInt16Builder,
- UInt32Builder, UInt64Builder, UInt8Builder,
+ Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
+ TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder,
UInt32Builder,
+ UInt64Builder, UInt8Builder,
};
use arrow::{
array::ArrayRef,
@@ -37,6 +38,7 @@ use arrow::{
};
use crate::error::{DataFusionError, Result};
+use arrow::datatypes::TimeUnit;
/// Represents a dynamically typed, nullable single value.
/// This is the single-valued counter-part of arrow’s `Array`.
@@ -72,6 +74,10 @@ pub enum ScalarValue {
List(Option<Vec<ScalarValue>>, DataType),
/// Date stored as a signed 32bit int
Date32(Option<i32>),
+ /// Timestamp Microseconds
+ TimeMicrosecond(Option<i64>),
+ /// Timestamp Nanoseconds
+ TimeNanosecond(Option<i64>),
}
macro_rules! typed_cast {
@@ -131,6 +137,12 @@ impl ScalarValue {
ScalarValue::Int16(_) => DataType::Int16,
ScalarValue::Int32(_) => DataType::Int32,
ScalarValue::Int64(_) => DataType::Int64,
+ ScalarValue::TimeMicrosecond(_) => {
+ DataType::Timestamp(TimeUnit::Microsecond, None)
+ }
+ ScalarValue::TimeNanosecond(_) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, None)
+ }
ScalarValue::Float32(_) => DataType::Float32,
ScalarValue::Float64(_) => DataType::Float64,
ScalarValue::Utf8(_) => DataType::Utf8,
@@ -205,6 +217,12 @@ impl ScalarValue {
ScalarValue::UInt16(e) => Arc::new(UInt16Array::from(vec![*e;
size])),
ScalarValue::UInt32(e) => Arc::new(UInt32Array::from(vec![*e;
size])),
ScalarValue::UInt64(e) => Arc::new(UInt64Array::from(vec![*e;
size])),
+ ScalarValue::TimeMicrosecond(e) => {
+ Arc::new(TimestampMicrosecondArray::from(vec![*e]))
+ }
+ ScalarValue::TimeNanosecond(e) => {
+ Arc::new(TimestampNanosecondArray::from_opt_vec(vec![*e],
None))
+ }
ScalarValue::Utf8(e) =>
Arc::new(StringArray::from(vec![e.as_deref(); size])),
ScalarValue::LargeUtf8(e) => {
Arc::new(LargeStringArray::from(vec![e.as_deref(); size]))
@@ -440,6 +458,8 @@ impl fmt::Display for ScalarValue {
ScalarValue::UInt16(e) => format_option!(f, e)?,
ScalarValue::UInt32(e) => format_option!(f, e)?,
ScalarValue::UInt64(e) => format_option!(f, e)?,
+ ScalarValue::TimeMicrosecond(e) => format_option!(f, e)?,
+ ScalarValue::TimeNanosecond(e) => format_option!(f, e)?,
ScalarValue::Utf8(e) => format_option!(f, e)?,
ScalarValue::LargeUtf8(e) => format_option!(f, e)?,
ScalarValue::List(e, _) => match e {
@@ -473,6 +493,8 @@ impl fmt::Debug for ScalarValue {
ScalarValue::UInt16(_) => write!(f, "UInt16({})", self),
ScalarValue::UInt32(_) => write!(f, "UInt32({})", self),
ScalarValue::UInt64(_) => write!(f, "UInt64({})", self),
+ ScalarValue::TimeMicrosecond(_) => write!(f,
"TimeMicrosecond({})", self),
+ ScalarValue::TimeNanosecond(_) => write!(f, "TimeNanosecond({})",
self),
ScalarValue::Utf8(None) => write!(f, "Utf8({})", self),
ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self),
ScalarValue::LargeUtf8(None) => write!(f, "LargeUtf8({})", self),
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index db0ef3b..e589834 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -21,7 +21,7 @@ use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use arrow::array::{self, Int32Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use std::fs::File;
use std::io::prelude::*;
@@ -210,6 +210,24 @@ pub fn format_batch(batch: &RecordBatch) -> Vec<String> {
.unwrap()
.value(row_index)
)),
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
s.push_str(&format!(
+ "{:?}",
+ array
+ .as_any()
+ .downcast_ref::<array::TimestampMicrosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap()
+ )),
+ DataType::Timestamp(TimeUnit::Nanosecond, _) =>
s.push_str(&format!(
+ "{:?}",
+ array
+ .as_any()
+ .downcast_ref::<array::TimestampNanosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap()
+ )),
_ => s.push('?'),
}
}